blob: 22412569092228b1726af10ae0ed9f39c7653803 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020064from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Brett Cannonec6ce872016-09-06 15:50:29 -070088class _PathLike(os.PathLike):
89
90 def __init__(self, path=""):
91 self.path = path
92
93 def __str__(self):
94 return str(self.path)
95
96 def __fspath__(self):
97 if isinstance(self.path, BaseException):
98 raise self.path
99 else:
100 return self.path
101
102
Victor Stinnerae39d232016-03-24 17:12:55 +0100103def create_file(filename, content=b'content'):
104 with open(filename, "xb", 0) as fp:
105 fp.write(content)
106
107
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000108# Tests creating TESTFN
109class FileTests(unittest.TestCase):
110 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000111 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000112 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000113 tearDown = setUp
114
115 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000116 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000118 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000119
Christian Heimesfdab48e2008-01-20 09:06:41 +0000120 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000121 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
122 # We must allocate two consecutive file descriptors, otherwise
123 # it will mess up other file descriptors (perhaps even the three
124 # standard ones).
125 second = os.dup(first)
126 try:
127 retries = 0
128 while second != first + 1:
129 os.close(first)
130 retries += 1
131 if retries > 10:
132 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000133 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000134 first, second = second, os.dup(second)
135 finally:
136 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000137 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000138 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000139 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000140
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000141 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000142 def test_rename(self):
143 path = support.TESTFN
144 old = sys.getrefcount(path)
145 self.assertRaises(TypeError, os.rename, path, 0)
146 new = sys.getrefcount(path)
147 self.assertEqual(old, new)
148
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000149 def test_read(self):
150 with open(support.TESTFN, "w+b") as fobj:
151 fobj.write(b"spam")
152 fobj.flush()
153 fd = fobj.fileno()
154 os.lseek(fd, 0, 0)
155 s = os.read(fd, 4)
156 self.assertEqual(type(s), bytes)
157 self.assertEqual(s, b"spam")
158
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200159 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200160 # Skip the test on 32-bit platforms: the number of bytes must fit in a
161 # Py_ssize_t type
162 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
163 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200164 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
165 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200166 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100167 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200168
169 # Issue #21932: Make sure that os.read() does not raise an
170 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200171 with open(support.TESTFN, "rb") as fp:
172 data = os.read(fp.fileno(), size)
173
Victor Stinner8c663fd2017-11-08 14:44:44 -0800174 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200175 # operating system is free to return less bytes than requested.
176 self.assertEqual(data, b'test')
177
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000178 def test_write(self):
179 # os.write() accepts bytes- and buffer-like objects but not strings
180 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
181 self.assertRaises(TypeError, os.write, fd, "beans")
182 os.write(fd, b"bacon\n")
183 os.write(fd, bytearray(b"eggs\n"))
184 os.write(fd, memoryview(b"spam\n"))
185 os.close(fd)
186 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000187 self.assertEqual(fobj.read().splitlines(),
188 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000189
Victor Stinnere0daff12011-03-20 23:36:35 +0100190 def write_windows_console(self, *args):
191 retcode = subprocess.call(args,
192 # use a new console to not flood the test output
193 creationflags=subprocess.CREATE_NEW_CONSOLE,
194 # use a shell to hide the console window (SW_HIDE)
195 shell=True)
196 self.assertEqual(retcode, 0)
197
198 @unittest.skipUnless(sys.platform == 'win32',
199 'test specific to the Windows console')
200 def test_write_windows_console(self):
201 # Issue #11395: the Windows console returns an error (12: not enough
202 # space error) on writing into stdout if stdout mode is binary and the
203 # length is greater than 66,000 bytes (or less, depending on heap
204 # usage).
205 code = "print('x' * 100000)"
206 self.write_windows_console(sys.executable, "-c", code)
207 self.write_windows_console(sys.executable, "-u", "-c", code)
208
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000209 def fdopen_helper(self, *args):
210 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200211 f = os.fdopen(fd, *args)
212 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000213
214 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200215 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
216 os.close(fd)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 self.fdopen_helper()
219 self.fdopen_helper('r')
220 self.fdopen_helper('r', 100)
221
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100222 def test_replace(self):
223 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100224 self.addCleanup(support.unlink, support.TESTFN)
225 self.addCleanup(support.unlink, TESTFN2)
226
227 create_file(support.TESTFN, b"1")
228 create_file(TESTFN2, b"2")
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 os.replace(support.TESTFN, TESTFN2)
231 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
232 with open(TESTFN2, 'r') as f:
233 self.assertEqual(f.read(), "1")
234
Martin Panterbf19d162015-09-09 01:01:13 +0000235 def test_open_keywords(self):
236 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
237 dir_fd=None)
238 os.close(f)
239
240 def test_symlink_keywords(self):
241 symlink = support.get_attribute(os, "symlink")
242 try:
243 symlink(src='target', dst=support.TESTFN,
244 target_is_directory=False, dir_fd=None)
245 except (NotImplementedError, OSError):
246 pass # No OS support or unprivileged user
247
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249# Test attributes on return values from os.*stat* family.
250class StatAttributeTests(unittest.TestCase):
251 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200252 self.fname = support.TESTFN
253 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100254 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255
Serhiy Storchaka43767632013-11-03 21:31:38 +0200256 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000257 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
260 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(result[stat.ST_SIZE], 3)
262 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 # Make sure all the attributes are there
265 members = dir(result)
266 for name in dir(stat):
267 if name[:3] == 'ST_':
268 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000269 if name.endswith("TIME"):
270 def trunc(x): return int(x)
271 else:
272 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
Larry Hastings6fe20b32012-04-19 15:07:49 -0700277 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700278 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700279 for name in 'st_atime st_mtime st_ctime'.split():
280 floaty = int(getattr(result, name) * 100000)
281 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700282 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700283
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 try:
285 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287 except IndexError:
288 pass
289
290 # Make sure that assignment fails
291 try:
292 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200293 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000294 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 pass
296
297 try:
298 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200299 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000300 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 pass
302
303 try:
304 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200305 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000306 except AttributeError:
307 pass
308
309 # Use the stat_result constructor with a too-short tuple.
310 try:
311 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200312 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000313 except TypeError:
314 pass
315
Ezio Melotti42da6632011-03-15 05:18:48 +0200316 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000317 try:
318 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
319 except TypeError:
320 pass
321
Antoine Pitrou38425292010-09-21 18:19:07 +0000322 def test_stat_attributes(self):
323 self.check_stat_attributes(self.fname)
324
325 def test_stat_attributes_bytes(self):
326 try:
327 fname = self.fname.encode(sys.getfilesystemencoding())
328 except UnicodeEncodeError:
329 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700330 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000331
Christian Heimes25827622013-10-12 01:27:08 +0200332 def test_stat_result_pickle(self):
333 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200334 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
335 p = pickle.dumps(result, proto)
336 self.assertIn(b'stat_result', p)
337 if proto < 4:
338 self.assertIn(b'cos\nstat_result\n', p)
339 unpickled = pickle.loads(p)
340 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200341
Serhiy Storchaka43767632013-11-03 21:31:38 +0200342 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000343 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700344 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345
346 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000347 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000348
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000349 # Make sure all the attributes are there.
350 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
351 'ffree', 'favail', 'flag', 'namemax')
352 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000353 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354
355 # Make sure that assignment really fails
356 try:
357 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200358 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000359 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000360 pass
361
362 try:
363 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200364 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 except AttributeError:
366 pass
367
368 # Use the constructor with a too-short tuple.
369 try:
370 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000372 except TypeError:
373 pass
374
Ezio Melotti42da6632011-03-15 05:18:48 +0200375 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000376 try:
377 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
378 except TypeError:
379 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000380
Christian Heimes25827622013-10-12 01:27:08 +0200381 @unittest.skipUnless(hasattr(os, 'statvfs'),
382 "need os.statvfs()")
383 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700384 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200385
Serhiy Storchakabad12572014-12-15 14:03:42 +0200386 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
387 p = pickle.dumps(result, proto)
388 self.assertIn(b'statvfs_result', p)
389 if proto < 4:
390 self.assertIn(b'cos\nstatvfs_result\n', p)
391 unpickled = pickle.loads(p)
392 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 def test_1686475(self):
396 # Verify that an open file can be stat'ed
397 try:
398 os.stat(r"c:\pagefile.sys")
399 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600400 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200401 except OSError as e:
402 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000403
Serhiy Storchaka43767632013-11-03 21:31:38 +0200404 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
405 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
406 def test_15261(self):
407 # Verify that stat'ing a closed fd does not cause crash
408 r, w = os.pipe()
409 try:
410 os.stat(r) # should not raise error
411 finally:
412 os.close(r)
413 os.close(w)
414 with self.assertRaises(OSError) as ctx:
415 os.stat(r)
416 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100417
Zachary Ware63f277b2014-06-19 09:46:37 -0500418 def check_file_attributes(self, result):
419 self.assertTrue(hasattr(result, 'st_file_attributes'))
420 self.assertTrue(isinstance(result.st_file_attributes, int))
421 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
422
423 @unittest.skipUnless(sys.platform == "win32",
424 "st_file_attributes is Win32 specific")
425 def test_file_attributes(self):
426 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
427 result = os.stat(self.fname)
428 self.check_file_attributes(result)
429 self.assertEqual(
430 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
431 0)
432
433 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200434 dirname = support.TESTFN + "dir"
435 os.mkdir(dirname)
436 self.addCleanup(os.rmdir, dirname)
437
438 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500439 self.check_file_attributes(result)
440 self.assertEqual(
441 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
442 stat.FILE_ATTRIBUTE_DIRECTORY)
443
Berker Peksag0b4dc482016-09-17 15:49:59 +0300444 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
445 def test_access_denied(self):
446 # Default to FindFirstFile WIN32_FIND_DATA when access is
447 # denied. See issue 28075.
448 # os.environ['TEMP'] should be located on a volume that
449 # supports file ACLs.
450 fname = os.path.join(os.environ['TEMP'], self.fname)
451 self.addCleanup(support.unlink, fname)
452 create_file(fname, b'ABC')
453 # Deny the right to [S]YNCHRONIZE on the file to
454 # force CreateFile to fail with ERROR_ACCESS_DENIED.
455 DETACHED_PROCESS = 8
456 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500457 # bpo-30584: Use security identifier *S-1-5-32-545 instead
458 # of localized "Users" to not depend on the locale.
459 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300460 creationflags=DETACHED_PROCESS
461 )
462 result = os.stat(fname)
463 self.assertNotEqual(result.st_size, 0)
464
Victor Stinner47aacc82015-06-12 17:26:23 +0200465
466class UtimeTests(unittest.TestCase):
467 def setUp(self):
468 self.dirname = support.TESTFN
469 self.fname = os.path.join(self.dirname, "f1")
470
471 self.addCleanup(support.rmtree, self.dirname)
472 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100473 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200474
Victor Stinner47aacc82015-06-12 17:26:23 +0200475 def support_subsecond(self, filename):
476 # Heuristic to check if the filesystem supports timestamp with
477 # subsecond resolution: check if float and int timestamps are different
478 st = os.stat(filename)
479 return ((st.st_atime != st[7])
480 or (st.st_mtime != st[8])
481 or (st.st_ctime != st[9]))
482
483 def _test_utime(self, set_time, filename=None):
484 if not filename:
485 filename = self.fname
486
487 support_subsecond = self.support_subsecond(filename)
488 if support_subsecond:
489 # Timestamp with a resolution of 1 microsecond (10^-6).
490 #
491 # The resolution of the C internal function used by os.utime()
492 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
493 # test with a resolution of 1 ns requires more work:
494 # see the issue #15745.
495 atime_ns = 1002003000 # 1.002003 seconds
496 mtime_ns = 4005006000 # 4.005006 seconds
497 else:
498 # use a resolution of 1 second
499 atime_ns = 5 * 10**9
500 mtime_ns = 8 * 10**9
501
502 set_time(filename, (atime_ns, mtime_ns))
503 st = os.stat(filename)
504
505 if support_subsecond:
506 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
507 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
508 else:
509 self.assertEqual(st.st_atime, atime_ns * 1e-9)
510 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
511 self.assertEqual(st.st_atime_ns, atime_ns)
512 self.assertEqual(st.st_mtime_ns, mtime_ns)
513
514 def test_utime(self):
515 def set_time(filename, ns):
516 # test the ns keyword parameter
517 os.utime(filename, ns=ns)
518 self._test_utime(set_time)
519
520 @staticmethod
521 def ns_to_sec(ns):
522 # Convert a number of nanosecond (int) to a number of seconds (float).
523 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
524 # issue, os.utime() rounds towards minus infinity.
525 return (ns * 1e-9) + 0.5e-9
526
527 def test_utime_by_indexed(self):
528 # pass times as floating point seconds as the second indexed parameter
529 def set_time(filename, ns):
530 atime_ns, mtime_ns = ns
531 atime = self.ns_to_sec(atime_ns)
532 mtime = self.ns_to_sec(mtime_ns)
533 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
534 # or utime(time_t)
535 os.utime(filename, (atime, mtime))
536 self._test_utime(set_time)
537
538 def test_utime_by_times(self):
539 def set_time(filename, ns):
540 atime_ns, mtime_ns = ns
541 atime = self.ns_to_sec(atime_ns)
542 mtime = self.ns_to_sec(mtime_ns)
543 # test the times keyword parameter
544 os.utime(filename, times=(atime, mtime))
545 self._test_utime(set_time)
546
547 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
548 "follow_symlinks support for utime required "
549 "for this test.")
550 def test_utime_nofollow_symlinks(self):
551 def set_time(filename, ns):
552 # use follow_symlinks=False to test utimensat(timespec)
553 # or lutimes(timeval)
554 os.utime(filename, ns=ns, follow_symlinks=False)
555 self._test_utime(set_time)
556
557 @unittest.skipUnless(os.utime in os.supports_fd,
558 "fd support for utime required for this test.")
559 def test_utime_fd(self):
560 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100561 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200562 # use a file descriptor to test futimens(timespec)
563 # or futimes(timeval)
564 os.utime(fp.fileno(), ns=ns)
565 self._test_utime(set_time)
566
567 @unittest.skipUnless(os.utime in os.supports_dir_fd,
568 "dir_fd support for utime required for this test.")
569 def test_utime_dir_fd(self):
570 def set_time(filename, ns):
571 dirname, name = os.path.split(filename)
572 dirfd = os.open(dirname, os.O_RDONLY)
573 try:
574 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
575 os.utime(name, dir_fd=dirfd, ns=ns)
576 finally:
577 os.close(dirfd)
578 self._test_utime(set_time)
579
580 def test_utime_directory(self):
581 def set_time(filename, ns):
582 # test calling os.utime() on a directory
583 os.utime(filename, ns=ns)
584 self._test_utime(set_time, filename=self.dirname)
585
586 def _test_utime_current(self, set_time):
587 # Get the system clock
588 current = time.time()
589
590 # Call os.utime() to set the timestamp to the current system clock
591 set_time(self.fname)
592
593 if not self.support_subsecond(self.fname):
594 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700595 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200596 # On Windows, the usual resolution of time.time() is 15.6 ms.
597 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700598 #
599 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
600 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200601 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200602 st = os.stat(self.fname)
603 msg = ("st_time=%r, current=%r, dt=%r"
604 % (st.st_mtime, current, st.st_mtime - current))
605 self.assertAlmostEqual(st.st_mtime, current,
606 delta=delta, msg=msg)
607
608 def test_utime_current(self):
609 def set_time(filename):
610 # Set to the current time in the new way
611 os.utime(self.fname)
612 self._test_utime_current(set_time)
613
614 def test_utime_current_old(self):
615 def set_time(filename):
616 # Set to the current time in the old explicit way.
617 os.utime(self.fname, None)
618 self._test_utime_current(set_time)
619
620 def get_file_system(self, path):
621 if sys.platform == 'win32':
622 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
623 import ctypes
624 kernel32 = ctypes.windll.kernel32
625 buf = ctypes.create_unicode_buffer("", 100)
626 ok = kernel32.GetVolumeInformationW(root, None, 0,
627 None, None, None,
628 buf, len(buf))
629 if ok:
630 return buf.value
631 # return None if the filesystem is unknown
632
633 def test_large_time(self):
634 # Many filesystems are limited to the year 2038. At least, the test
635 # pass with NTFS filesystem.
636 if self.get_file_system(self.dirname) != "NTFS":
637 self.skipTest("requires NTFS")
638
639 large = 5000000000 # some day in 2128
640 os.utime(self.fname, (large, large))
641 self.assertEqual(os.stat(self.fname).st_mtime, large)
642
643 def test_utime_invalid_arguments(self):
644 # seconds and nanoseconds parameters are mutually exclusive
645 with self.assertRaises(ValueError):
646 os.utime(self.fname, (5, 5), ns=(5, 5))
647
648
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000649from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000650
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000651class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000652 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000653 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000654
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000655 def setUp(self):
656 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000657 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000658 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000659 for key, value in self._reference().items():
660 os.environ[key] = value
661
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000662 def tearDown(self):
663 os.environ.clear()
664 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000665 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000666 os.environb.clear()
667 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000668
Christian Heimes90333392007-11-01 19:08:42 +0000669 def _reference(self):
670 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
671
672 def _empty_mapping(self):
673 os.environ.clear()
674 return os.environ
675
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000676 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200677 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
678 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000679 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000680 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300681 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200682 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300683 value = popen.read().strip()
684 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000685
Xavier de Gayed1415312016-07-22 12:15:29 +0200686 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
687 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000688 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200689 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
690 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300691 it = iter(popen)
692 self.assertEqual(next(it), "line1\n")
693 self.assertEqual(next(it), "line2\n")
694 self.assertEqual(next(it), "line3\n")
695 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000696
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000697 # Verify environ keys and values from the OS are of the
698 # correct str type.
699 def test_keyvalue_types(self):
700 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000701 self.assertEqual(type(key), str)
702 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000703
Christian Heimes90333392007-11-01 19:08:42 +0000704 def test_items(self):
705 for key, value in self._reference().items():
706 self.assertEqual(os.environ.get(key), value)
707
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000708 # Issue 7310
709 def test___repr__(self):
710 """Check that the repr() of os.environ looks like environ({...})."""
711 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000712 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
713 '{!r}: {!r}'.format(key, value)
714 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000715
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000716 def test_get_exec_path(self):
717 defpath_list = os.defpath.split(os.pathsep)
718 test_path = ['/monty', '/python', '', '/flying/circus']
719 test_env = {'PATH': os.pathsep.join(test_path)}
720
721 saved_environ = os.environ
722 try:
723 os.environ = dict(test_env)
724 # Test that defaulting to os.environ works.
725 self.assertSequenceEqual(test_path, os.get_exec_path())
726 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
727 finally:
728 os.environ = saved_environ
729
730 # No PATH environment variable
731 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
732 # Empty PATH environment variable
733 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
734 # Supplied PATH environment variable
735 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
736
Victor Stinnerb745a742010-05-18 17:17:23 +0000737 if os.supports_bytes_environ:
738 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000739 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000740 # ignore BytesWarning warning
741 with warnings.catch_warnings(record=True):
742 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000743 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000744 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000745 pass
746 else:
747 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000748
749 # bytes key and/or value
750 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
751 ['abc'])
752 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
753 ['abc'])
754 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
755 ['abc'])
756
757 @unittest.skipUnless(os.supports_bytes_environ,
758 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000759 def test_environb(self):
760 # os.environ -> os.environb
761 value = 'euro\u20ac'
762 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000763 value_bytes = value.encode(sys.getfilesystemencoding(),
764 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000765 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000766 msg = "U+20AC character is not encodable to %s" % (
767 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000768 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000769 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000770 self.assertEqual(os.environ['unicode'], value)
771 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000772
773 # os.environb -> os.environ
774 value = b'\xff'
775 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000776 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000777 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000778 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000779
Charles-François Natali2966f102011-11-26 11:32:46 +0100780 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
781 # #13415).
782 @support.requires_freebsd_version(7)
783 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100784 def test_unset_error(self):
785 if sys.platform == "win32":
786 # an environment variable is limited to 32,767 characters
787 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100788 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100789 else:
790 # "=" is not allowed in a variable name
791 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100792 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100793
Victor Stinner6d101392013-04-14 16:35:04 +0200794 def test_key_type(self):
795 missing = 'missingkey'
796 self.assertNotIn(missing, os.environ)
797
Victor Stinner839e5ea2013-04-14 16:43:03 +0200798 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200799 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200800 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200801 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200802
Victor Stinner839e5ea2013-04-14 16:43:03 +0200803 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200804 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200805 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200806 self.assertTrue(cm.exception.__suppress_context__)
807
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300808 def _test_environ_iteration(self, collection):
809 iterator = iter(collection)
810 new_key = "__new_key__"
811
812 next(iterator) # start iteration over os.environ.items
813
814 # add a new key in os.environ mapping
815 os.environ[new_key] = "test_environ_iteration"
816
817 try:
818 next(iterator) # force iteration over modified mapping
819 self.assertEqual(os.environ[new_key], "test_environ_iteration")
820 finally:
821 del os.environ[new_key]
822
823 def test_iter_error_when_changing_os_environ(self):
824 self._test_environ_iteration(os.environ)
825
826 def test_iter_error_when_changing_os_environ_items(self):
827 self._test_environ_iteration(os.environ.items())
828
829 def test_iter_error_when_changing_os_environ_values(self):
830 self._test_environ_iteration(os.environ.values())
831
Victor Stinner6d101392013-04-14 16:35:04 +0200832
Tim Petersc4e09402003-04-25 07:11:48 +0000833class WalkTests(unittest.TestCase):
834 """Tests for os.walk()."""
835
Victor Stinner0561c532015-03-12 10:28:24 +0100836 # Wrapper to hide minor differences between os.walk and os.fwalk
837 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200838 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200839 if 'follow_symlinks' in kwargs:
840 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200841 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100842
Charles-François Natali7372b062012-02-05 15:15:38 +0100843 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100844 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100845 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000846
847 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000848 # TESTFN/
849 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000850 # tmp1
851 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000852 # tmp2
853 # SUB11/ no kids
854 # SUB2/ a file kid and a dirsymlink kid
855 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300856 # SUB21/ not readable
857 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000858 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200859 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300860 # broken_link2
861 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000862 # TEST2/
863 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100864 self.walk_path = join(support.TESTFN, "TEST1")
865 self.sub1_path = join(self.walk_path, "SUB1")
866 self.sub11_path = join(self.sub1_path, "SUB11")
867 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300868 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100869 tmp1_path = join(self.walk_path, "tmp1")
870 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000871 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300872 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100873 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000874 t2_path = join(support.TESTFN, "TEST2")
875 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200876 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300877 broken_link2_path = join(sub2_path, "broken_link2")
878 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000879
880 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100881 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000882 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300883 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000884 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100885
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300886 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100887 with open(path, "x") as f:
888 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000889
Victor Stinner0561c532015-03-12 10:28:24 +0100890 if support.can_symlink():
891 os.symlink(os.path.abspath(t2_path), self.link_path)
892 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300893 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
894 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300895 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300896 ["broken_link", "broken_link2", "broken_link3",
897 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100898 else:
899 self.sub2_tree = (sub2_path, [], ["tmp3"])
900
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300901 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300902 try:
903 os.listdir(sub21_path)
904 except PermissionError:
905 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
906 else:
907 os.chmod(sub21_path, stat.S_IRWXU)
908 os.unlink(tmp5_path)
909 os.rmdir(sub21_path)
910 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300911
Victor Stinner0561c532015-03-12 10:28:24 +0100912 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000913 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300914 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100915
Tim Petersc4e09402003-04-25 07:11:48 +0000916 self.assertEqual(len(all), 4)
917 # We can't know which order SUB1 and SUB2 will appear in.
918 # Not flipped: TESTFN, SUB1, SUB11, SUB2
919 # flipped: TESTFN, SUB2, SUB1, SUB11
920 flipped = all[0][1][0] != "SUB1"
921 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200922 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300923 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100924 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
925 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
926 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
927 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000928
Brett Cannon3f9183b2016-08-26 14:44:48 -0700929 def test_walk_prune(self, walk_path=None):
930 if walk_path is None:
931 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000932 # Prune the search.
933 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700934 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000935 all.append((root, dirs, files))
936 # Don't descend into SUB1.
937 if 'SUB1' in dirs:
938 # Note that this also mutates the dirs we appended to all!
939 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000940
Victor Stinner0561c532015-03-12 10:28:24 +0100941 self.assertEqual(len(all), 2)
942 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700943 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100944
945 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300946 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100947 self.assertEqual(all[1], self.sub2_tree)
948
Brett Cannon3f9183b2016-08-26 14:44:48 -0700949 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700950 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700951
Victor Stinner0561c532015-03-12 10:28:24 +0100952 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000953 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100954 all = list(self.walk(self.walk_path, topdown=False))
955
Victor Stinner53b0a412016-03-26 01:12:36 +0100956 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000957 # We can't know which order SUB1 and SUB2 will appear in.
958 # Not flipped: SUB11, SUB1, SUB2, TESTFN
959 # flipped: SUB2, SUB11, SUB1, TESTFN
960 flipped = all[3][1][0] != "SUB1"
961 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200962 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300963 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100964 self.assertEqual(all[3],
965 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
966 self.assertEqual(all[flipped],
967 (self.sub11_path, [], []))
968 self.assertEqual(all[flipped + 1],
969 (self.sub1_path, ["SUB11"], ["tmp2"]))
970 self.assertEqual(all[2 - 2 * flipped],
971 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000972
Victor Stinner0561c532015-03-12 10:28:24 +0100973 def test_walk_symlink(self):
974 if not support.can_symlink():
975 self.skipTest("need symlink support")
976
977 # Walk, following symlinks.
978 walk_it = self.walk(self.walk_path, follow_symlinks=True)
979 for root, dirs, files in walk_it:
980 if root == self.link_path:
981 self.assertEqual(dirs, [])
982 self.assertEqual(files, ["tmp4"])
983 break
984 else:
985 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000986
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200987 def test_walk_bad_dir(self):
988 # Walk top-down.
989 errors = []
990 walk_it = self.walk(self.walk_path, onerror=errors.append)
991 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300992 self.assertEqual(errors, [])
993 dir1 = 'SUB1'
994 path1 = os.path.join(root, dir1)
995 path1new = os.path.join(root, dir1 + '.new')
996 os.rename(path1, path1new)
997 try:
998 roots = [r for r, d, f in walk_it]
999 self.assertTrue(errors)
1000 self.assertNotIn(path1, roots)
1001 self.assertNotIn(path1new, roots)
1002 for dir2 in dirs:
1003 if dir2 != dir1:
1004 self.assertIn(os.path.join(root, dir2), roots)
1005 finally:
1006 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001007
Charles-François Natali7372b062012-02-05 15:15:38 +01001008
1009@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1010class FwalkTests(WalkTests):
1011 """Tests for os.fwalk()."""
1012
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001013 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001014 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001015 yield (root, dirs, files)
1016
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001017 def fwalk(self, *args, **kwargs):
1018 return os.fwalk(*args, **kwargs)
1019
Larry Hastingsc48fe982012-06-25 04:49:05 -07001020 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1021 """
1022 compare with walk() results.
1023 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001024 walk_kwargs = walk_kwargs.copy()
1025 fwalk_kwargs = fwalk_kwargs.copy()
1026 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1027 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1028 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001029
Charles-François Natali7372b062012-02-05 15:15:38 +01001030 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001031 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001032 expected[root] = (set(dirs), set(files))
1033
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001034 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001035 self.assertIn(root, expected)
1036 self.assertEqual(expected[root], (set(dirs), set(files)))
1037
Larry Hastingsc48fe982012-06-25 04:49:05 -07001038 def test_compare_to_walk(self):
1039 kwargs = {'top': support.TESTFN}
1040 self._compare_to_walk(kwargs, kwargs)
1041
Charles-François Natali7372b062012-02-05 15:15:38 +01001042 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001043 try:
1044 fd = os.open(".", os.O_RDONLY)
1045 walk_kwargs = {'top': support.TESTFN}
1046 fwalk_kwargs = walk_kwargs.copy()
1047 fwalk_kwargs['dir_fd'] = fd
1048 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1049 finally:
1050 os.close(fd)
1051
1052 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001053 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001054 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1055 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001056 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001057 # check that the FD is valid
1058 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001059 # redundant check
1060 os.stat(rootfd)
1061 # check that listdir() returns consistent information
1062 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001063
1064 def test_fd_leak(self):
1065 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1066 # we both check that calling fwalk() a large number of times doesn't
1067 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1068 minfd = os.dup(1)
1069 os.close(minfd)
1070 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001071 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001072 pass
1073 newfd = os.dup(1)
1074 self.addCleanup(os.close, newfd)
1075 self.assertEqual(newfd, minfd)
1076
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001077class BytesWalkTests(WalkTests):
1078 """Tests for os.walk() with bytes."""
1079 def walk(self, top, **kwargs):
1080 if 'follow_symlinks' in kwargs:
1081 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1082 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1083 root = os.fsdecode(broot)
1084 dirs = list(map(os.fsdecode, bdirs))
1085 files = list(map(os.fsdecode, bfiles))
1086 yield (root, dirs, files)
1087 bdirs[:] = list(map(os.fsencode, dirs))
1088 bfiles[:] = list(map(os.fsencode, files))
1089
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001090@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1091class BytesFwalkTests(FwalkTests):
1092 """Tests for os.walk() with bytes."""
1093 def fwalk(self, top='.', *args, **kwargs):
1094 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1095 root = os.fsdecode(broot)
1096 dirs = list(map(os.fsdecode, bdirs))
1097 files = list(map(os.fsdecode, bfiles))
1098 yield (root, dirs, files, topfd)
1099 bdirs[:] = list(map(os.fsencode, dirs))
1100 bfiles[:] = list(map(os.fsencode, files))
1101
Charles-François Natali7372b062012-02-05 15:15:38 +01001102
Guido van Rossume7ba4952007-06-06 23:52:48 +00001103class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001104 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001105 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001106
1107 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1110 os.makedirs(path) # Should work
1111 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1112 os.makedirs(path)
1113
1114 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001115 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001116 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1117 os.makedirs(path)
1118 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1119 'dir5', 'dir6')
1120 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001121
Serhiy Storchakae304e332017-03-24 13:27:42 +02001122 def test_mode(self):
1123 with support.temp_umask(0o002):
1124 base = support.TESTFN
1125 parent = os.path.join(base, 'dir1')
1126 path = os.path.join(parent, 'dir2')
1127 os.makedirs(path, 0o555)
1128 self.assertTrue(os.path.exists(path))
1129 self.assertTrue(os.path.isdir(path))
1130 if os.name != 'nt':
1131 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1132 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1133
Terry Reedy5a22b652010-12-02 07:05:56 +00001134 def test_exist_ok_existing_directory(self):
1135 path = os.path.join(support.TESTFN, 'dir1')
1136 mode = 0o777
1137 old_mask = os.umask(0o022)
1138 os.makedirs(path, mode)
1139 self.assertRaises(OSError, os.makedirs, path, mode)
1140 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001141 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001142 os.makedirs(path, mode=mode, exist_ok=True)
1143 os.umask(old_mask)
1144
Martin Pantera82642f2015-11-19 04:48:44 +00001145 # Issue #25583: A drive root could raise PermissionError on Windows
1146 os.makedirs(os.path.abspath('/'), exist_ok=True)
1147
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001148 def test_exist_ok_s_isgid_directory(self):
1149 path = os.path.join(support.TESTFN, 'dir1')
1150 S_ISGID = stat.S_ISGID
1151 mode = 0o777
1152 old_mask = os.umask(0o022)
1153 try:
1154 existing_testfn_mode = stat.S_IMODE(
1155 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001156 try:
1157 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001158 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001159 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001160 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1161 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1162 # The os should apply S_ISGID from the parent dir for us, but
1163 # this test need not depend on that behavior. Be explicit.
1164 os.makedirs(path, mode | S_ISGID)
1165 # http://bugs.python.org/issue14992
1166 # Should not fail when the bit is already set.
1167 os.makedirs(path, mode, exist_ok=True)
1168 # remove the bit.
1169 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001170 # May work even when the bit is not already set when demanded.
1171 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001172 finally:
1173 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001174
1175 def test_exist_ok_existing_regular_file(self):
1176 base = support.TESTFN
1177 path = os.path.join(support.TESTFN, 'dir1')
1178 f = open(path, 'w')
1179 f.write('abc')
1180 f.close()
1181 self.assertRaises(OSError, os.makedirs, path)
1182 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1183 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1184 os.remove(path)
1185
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001186 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001187 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001188 'dir4', 'dir5', 'dir6')
1189 # If the tests failed, the bottom-most directory ('../dir6')
1190 # may not have been created, so we look for the outermost directory
1191 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001192 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001193 path = os.path.dirname(path)
1194
1195 os.removedirs(path)
1196
Andrew Svetlov405faed2012-12-25 12:18:09 +02001197
R David Murrayf2ad1732014-12-25 18:36:56 -05001198@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1199class ChownFileTests(unittest.TestCase):
1200
Berker Peksag036a71b2015-07-21 09:29:48 +03001201 @classmethod
1202 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001203 os.mkdir(support.TESTFN)
1204
1205 def test_chown_uid_gid_arguments_must_be_index(self):
1206 stat = os.stat(support.TESTFN)
1207 uid = stat.st_uid
1208 gid = stat.st_gid
1209 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1210 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1211 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1212 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1213 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1214
1215 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1216 def test_chown(self):
1217 gid_1, gid_2 = groups[:2]
1218 uid = os.stat(support.TESTFN).st_uid
1219 os.chown(support.TESTFN, uid, gid_1)
1220 gid = os.stat(support.TESTFN).st_gid
1221 self.assertEqual(gid, gid_1)
1222 os.chown(support.TESTFN, uid, gid_2)
1223 gid = os.stat(support.TESTFN).st_gid
1224 self.assertEqual(gid, gid_2)
1225
1226 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1227 "test needs root privilege and more than one user")
1228 def test_chown_with_root(self):
1229 uid_1, uid_2 = all_users[:2]
1230 gid = os.stat(support.TESTFN).st_gid
1231 os.chown(support.TESTFN, uid_1, gid)
1232 uid = os.stat(support.TESTFN).st_uid
1233 self.assertEqual(uid, uid_1)
1234 os.chown(support.TESTFN, uid_2, gid)
1235 uid = os.stat(support.TESTFN).st_uid
1236 self.assertEqual(uid, uid_2)
1237
1238 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1239 "test needs non-root account and more than one user")
1240 def test_chown_without_permission(self):
1241 uid_1, uid_2 = all_users[:2]
1242 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001243 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001244 os.chown(support.TESTFN, uid_1, gid)
1245 os.chown(support.TESTFN, uid_2, gid)
1246
Berker Peksag036a71b2015-07-21 09:29:48 +03001247 @classmethod
1248 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001249 os.rmdir(support.TESTFN)
1250
1251
Andrew Svetlov405faed2012-12-25 12:18:09 +02001252class RemoveDirsTests(unittest.TestCase):
1253 def setUp(self):
1254 os.makedirs(support.TESTFN)
1255
1256 def tearDown(self):
1257 support.rmtree(support.TESTFN)
1258
1259 def test_remove_all(self):
1260 dira = os.path.join(support.TESTFN, 'dira')
1261 os.mkdir(dira)
1262 dirb = os.path.join(dira, 'dirb')
1263 os.mkdir(dirb)
1264 os.removedirs(dirb)
1265 self.assertFalse(os.path.exists(dirb))
1266 self.assertFalse(os.path.exists(dira))
1267 self.assertFalse(os.path.exists(support.TESTFN))
1268
1269 def test_remove_partial(self):
1270 dira = os.path.join(support.TESTFN, 'dira')
1271 os.mkdir(dira)
1272 dirb = os.path.join(dira, 'dirb')
1273 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001274 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001275 os.removedirs(dirb)
1276 self.assertFalse(os.path.exists(dirb))
1277 self.assertTrue(os.path.exists(dira))
1278 self.assertTrue(os.path.exists(support.TESTFN))
1279
1280 def test_remove_nothing(self):
1281 dira = os.path.join(support.TESTFN, 'dira')
1282 os.mkdir(dira)
1283 dirb = os.path.join(dira, 'dirb')
1284 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001285 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001286 with self.assertRaises(OSError):
1287 os.removedirs(dirb)
1288 self.assertTrue(os.path.exists(dirb))
1289 self.assertTrue(os.path.exists(dira))
1290 self.assertTrue(os.path.exists(support.TESTFN))
1291
1292
Guido van Rossume7ba4952007-06-06 23:52:48 +00001293class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001294 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001295 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001296 f.write(b'hello')
1297 f.close()
1298 with open(os.devnull, 'rb') as f:
1299 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001300
Andrew Svetlov405faed2012-12-25 12:18:09 +02001301
Guido van Rossume7ba4952007-06-06 23:52:48 +00001302class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001303 def test_urandom_length(self):
1304 self.assertEqual(len(os.urandom(0)), 0)
1305 self.assertEqual(len(os.urandom(1)), 1)
1306 self.assertEqual(len(os.urandom(10)), 10)
1307 self.assertEqual(len(os.urandom(100)), 100)
1308 self.assertEqual(len(os.urandom(1000)), 1000)
1309
1310 def test_urandom_value(self):
1311 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001312 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001313 data2 = os.urandom(16)
1314 self.assertNotEqual(data1, data2)
1315
1316 def get_urandom_subprocess(self, count):
1317 code = '\n'.join((
1318 'import os, sys',
1319 'data = os.urandom(%s)' % count,
1320 'sys.stdout.buffer.write(data)',
1321 'sys.stdout.buffer.flush()'))
1322 out = assert_python_ok('-c', code)
1323 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001324 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001325 return stdout
1326
1327 def test_urandom_subprocess(self):
1328 data1 = self.get_urandom_subprocess(16)
1329 data2 = self.get_urandom_subprocess(16)
1330 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001331
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001332
Victor Stinner9b1f4742016-09-06 16:18:52 -07001333@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1334class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001335 @classmethod
1336 def setUpClass(cls):
1337 try:
1338 os.getrandom(1)
1339 except OSError as exc:
1340 if exc.errno == errno.ENOSYS:
1341 # Python compiled on a more recent Linux version
1342 # than the current Linux kernel
1343 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1344 else:
1345 raise
1346
Victor Stinner9b1f4742016-09-06 16:18:52 -07001347 def test_getrandom_type(self):
1348 data = os.getrandom(16)
1349 self.assertIsInstance(data, bytes)
1350 self.assertEqual(len(data), 16)
1351
1352 def test_getrandom0(self):
1353 empty = os.getrandom(0)
1354 self.assertEqual(empty, b'')
1355
1356 def test_getrandom_random(self):
1357 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1358
1359 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1360 # resource /dev/random
1361
1362 def test_getrandom_nonblock(self):
1363 # The call must not fail. Check also that the flag exists
1364 try:
1365 os.getrandom(1, os.GRND_NONBLOCK)
1366 except BlockingIOError:
1367 # System urandom is not initialized yet
1368 pass
1369
1370 def test_getrandom_value(self):
1371 data1 = os.getrandom(16)
1372 data2 = os.getrandom(16)
1373 self.assertNotEqual(data1, data2)
1374
1375
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001376# os.urandom() doesn't use a file descriptor when it is implemented with the
1377# getentropy() function, the getrandom() function or the getrandom() syscall
1378OS_URANDOM_DONT_USE_FD = (
1379 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1380 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1381 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001382
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001383@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1384 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001385class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001386 @unittest.skipUnless(resource, "test requires the resource module")
1387 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001388 # Check urandom() failing when it is not able to open /dev/random.
1389 # We spawn a new process to make the test more robust (if getrlimit()
1390 # failed to restore the file descriptor limit after this, the whole
1391 # test suite would crash; this actually happened on the OS X Tiger
1392 # buildbot).
1393 code = """if 1:
1394 import errno
1395 import os
1396 import resource
1397
1398 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1399 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1400 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001401 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001402 except OSError as e:
1403 assert e.errno == errno.EMFILE, e.errno
1404 else:
1405 raise AssertionError("OSError not raised")
1406 """
1407 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001408
Antoine Pitroue472aea2014-04-26 14:33:03 +02001409 def test_urandom_fd_closed(self):
1410 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1411 # closed.
1412 code = """if 1:
1413 import os
1414 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001415 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001416 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001417 with test.support.SuppressCrashReport():
1418 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001419 sys.stdout.buffer.write(os.urandom(4))
1420 """
1421 rc, out, err = assert_python_ok('-Sc', code)
1422
1423 def test_urandom_fd_reopened(self):
1424 # Issue #21207: urandom() should detect its fd to /dev/urandom
1425 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001426 self.addCleanup(support.unlink, support.TESTFN)
1427 create_file(support.TESTFN, b"x" * 256)
1428
Antoine Pitroue472aea2014-04-26 14:33:03 +02001429 code = """if 1:
1430 import os
1431 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001432 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001433 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001434 with test.support.SuppressCrashReport():
1435 for fd in range(3, 256):
1436 try:
1437 os.close(fd)
1438 except OSError:
1439 pass
1440 else:
1441 # Found the urandom fd (XXX hopefully)
1442 break
1443 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001444 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001445 new_fd = f.fileno()
1446 # Issue #26935: posix allows new_fd and fd to be equal but
1447 # some libc implementations have dup2 return an error in this
1448 # case.
1449 if new_fd != fd:
1450 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001451 sys.stdout.buffer.write(os.urandom(4))
1452 sys.stdout.buffer.write(os.urandom(4))
1453 """.format(TESTFN=support.TESTFN)
1454 rc, out, err = assert_python_ok('-Sc', code)
1455 self.assertEqual(len(out), 8)
1456 self.assertNotEqual(out[0:4], out[4:8])
1457 rc, out2, err2 = assert_python_ok('-Sc', code)
1458 self.assertEqual(len(out2), 8)
1459 self.assertNotEqual(out2, out)
1460
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001461
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001462@contextlib.contextmanager
1463def _execvpe_mockup(defpath=None):
1464 """
1465 Stubs out execv and execve functions when used as context manager.
1466 Records exec calls. The mock execv and execve functions always raise an
1467 exception as they would normally never return.
1468 """
1469 # A list of tuples containing (function name, first arg, args)
1470 # of calls to execv or execve that have been made.
1471 calls = []
1472
1473 def mock_execv(name, *args):
1474 calls.append(('execv', name, args))
1475 raise RuntimeError("execv called")
1476
1477 def mock_execve(name, *args):
1478 calls.append(('execve', name, args))
1479 raise OSError(errno.ENOTDIR, "execve called")
1480
1481 try:
1482 orig_execv = os.execv
1483 orig_execve = os.execve
1484 orig_defpath = os.defpath
1485 os.execv = mock_execv
1486 os.execve = mock_execve
1487 if defpath is not None:
1488 os.defpath = defpath
1489 yield calls
1490 finally:
1491 os.execv = orig_execv
1492 os.execve = orig_execve
1493 os.defpath = orig_defpath
1494
Victor Stinner4659ccf2016-09-14 10:57:00 +02001495
Guido van Rossume7ba4952007-06-06 23:52:48 +00001496class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001497 @unittest.skipIf(USING_LINUXTHREADS,
1498 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001499 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001500 self.assertRaises(OSError, os.execvpe, 'no such app-',
1501 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001502
Steve Dowerbce26262016-11-19 19:17:26 -08001503 def test_execv_with_bad_arglist(self):
1504 self.assertRaises(ValueError, os.execv, 'notepad', ())
1505 self.assertRaises(ValueError, os.execv, 'notepad', [])
1506 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1507 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1508
Thomas Heller6790d602007-08-30 17:15:14 +00001509 def test_execvpe_with_bad_arglist(self):
1510 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001511 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1512 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001513
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001514 @unittest.skipUnless(hasattr(os, '_execvpe'),
1515 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001516 def _test_internal_execvpe(self, test_type):
1517 program_path = os.sep + 'absolutepath'
1518 if test_type is bytes:
1519 program = b'executable'
1520 fullpath = os.path.join(os.fsencode(program_path), program)
1521 native_fullpath = fullpath
1522 arguments = [b'progname', 'arg1', 'arg2']
1523 else:
1524 program = 'executable'
1525 arguments = ['progname', 'arg1', 'arg2']
1526 fullpath = os.path.join(program_path, program)
1527 if os.name != "nt":
1528 native_fullpath = os.fsencode(fullpath)
1529 else:
1530 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001531 env = {'spam': 'beans'}
1532
Victor Stinnerb745a742010-05-18 17:17:23 +00001533 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001534 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001535 self.assertRaises(RuntimeError,
1536 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001537 self.assertEqual(len(calls), 1)
1538 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1539
Victor Stinnerb745a742010-05-18 17:17:23 +00001540 # test os._execvpe() with a relative path:
1541 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001542 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001543 self.assertRaises(OSError,
1544 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001545 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001546 self.assertSequenceEqual(calls[0],
1547 ('execve', native_fullpath, (arguments, env)))
1548
1549 # test os._execvpe() with a relative path:
1550 # os.get_exec_path() reads the 'PATH' variable
1551 with _execvpe_mockup() as calls:
1552 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001553 if test_type is bytes:
1554 env_path[b'PATH'] = program_path
1555 else:
1556 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001557 self.assertRaises(OSError,
1558 os._execvpe, program, arguments, env=env_path)
1559 self.assertEqual(len(calls), 1)
1560 self.assertSequenceEqual(calls[0],
1561 ('execve', native_fullpath, (arguments, env_path)))
1562
1563 def test_internal_execvpe_str(self):
1564 self._test_internal_execvpe(str)
1565 if os.name != "nt":
1566 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001567
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001568 def test_execve_invalid_env(self):
1569 args = [sys.executable, '-c', 'pass']
1570
Ville Skyttä49b27342017-08-03 09:00:59 +03001571 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001572 newenv = os.environ.copy()
1573 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1574 with self.assertRaises(ValueError):
1575 os.execve(args[0], args, newenv)
1576
Ville Skyttä49b27342017-08-03 09:00:59 +03001577 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001578 newenv = os.environ.copy()
1579 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1580 with self.assertRaises(ValueError):
1581 os.execve(args[0], args, newenv)
1582
Ville Skyttä49b27342017-08-03 09:00:59 +03001583 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001584 newenv = os.environ.copy()
1585 newenv["FRUIT=ORANGE"] = "lemon"
1586 with self.assertRaises(ValueError):
1587 os.execve(args[0], args, newenv)
1588
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001589
Serhiy Storchaka43767632013-11-03 21:31:38 +02001590@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001591class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001592 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001593 try:
1594 os.stat(support.TESTFN)
1595 except FileNotFoundError:
1596 exists = False
1597 except OSError as exc:
1598 exists = True
1599 self.fail("file %s must not exist; os.stat failed with %s"
1600 % (support.TESTFN, exc))
1601 else:
1602 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001603
Thomas Wouters477c8d52006-05-27 19:21:47 +00001604 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001605 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001606
1607 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001608 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001609
1610 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001611 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001612
1613 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001614 self.addCleanup(support.unlink, support.TESTFN)
1615
Victor Stinnere77c9742016-03-25 10:28:23 +01001616 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001617 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001618
1619 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001620 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001621
Thomas Wouters477c8d52006-05-27 19:21:47 +00001622 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001623 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001624
Victor Stinnere77c9742016-03-25 10:28:23 +01001625
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001626class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001627 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001628 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1629 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001630 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001631 def get_single(f):
1632 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001633 if hasattr(os, f):
1634 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001635 return helper
1636 for f in singles:
1637 locals()["test_"+f] = get_single(f)
1638
Benjamin Peterson7522c742009-01-19 21:00:09 +00001639 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001640 try:
1641 f(support.make_bad_fd(), *args)
1642 except OSError as e:
1643 self.assertEqual(e.errno, errno.EBADF)
1644 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001645 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001646 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001647
Serhiy Storchaka43767632013-11-03 21:31:38 +02001648 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001649 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001650 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001651
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001653 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001654 fd = support.make_bad_fd()
1655 # Make sure none of the descriptors we are about to close are
1656 # currently valid (issue 6542).
1657 for i in range(10):
1658 try: os.fstat(fd+i)
1659 except OSError:
1660 pass
1661 else:
1662 break
1663 if i < 2:
1664 raise unittest.SkipTest(
1665 "Unable to acquire a range of invalid file descriptors")
1666 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001667
Serhiy Storchaka43767632013-11-03 21:31:38 +02001668 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001669 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001670 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001671
Serhiy Storchaka43767632013-11-03 21:31:38 +02001672 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001673 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001674 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001675
Serhiy Storchaka43767632013-11-03 21:31:38 +02001676 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001677 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001678 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001679
Serhiy Storchaka43767632013-11-03 21:31:38 +02001680 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001681 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001682 self.check(os.pathconf, "PC_NAME_MAX")
1683 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001686 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001687 self.check(os.truncate, 0)
1688 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001689
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001691 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001692 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001693
Serhiy Storchaka43767632013-11-03 21:31:38 +02001694 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001695 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001696 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001697
Victor Stinner57ddf782014-01-08 15:21:28 +01001698 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1699 def test_readv(self):
1700 buf = bytearray(10)
1701 self.check(os.readv, [buf])
1702
Serhiy Storchaka43767632013-11-03 21:31:38 +02001703 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001704 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001705 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001706
Serhiy Storchaka43767632013-11-03 21:31:38 +02001707 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001708 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001709 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001710
Victor Stinner57ddf782014-01-08 15:21:28 +01001711 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1712 def test_writev(self):
1713 self.check(os.writev, [b'abc'])
1714
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001715 def test_inheritable(self):
1716 self.check(os.get_inheritable)
1717 self.check(os.set_inheritable, True)
1718
1719 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1720 'needs os.get_blocking() and os.set_blocking()')
1721 def test_blocking(self):
1722 self.check(os.get_blocking)
1723 self.check(os.set_blocking, True)
1724
Brian Curtin1b9df392010-11-24 20:24:31 +00001725
1726class LinkTests(unittest.TestCase):
1727 def setUp(self):
1728 self.file1 = support.TESTFN
1729 self.file2 = os.path.join(support.TESTFN + "2")
1730
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001731 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001732 for file in (self.file1, self.file2):
1733 if os.path.exists(file):
1734 os.unlink(file)
1735
Brian Curtin1b9df392010-11-24 20:24:31 +00001736 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001737 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001738
xdegaye6a55d092017-11-12 17:57:04 +01001739 try:
1740 os.link(file1, file2)
1741 except PermissionError as e:
1742 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001743 with open(file1, "r") as f1, open(file2, "r") as f2:
1744 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1745
1746 def test_link(self):
1747 self._test_link(self.file1, self.file2)
1748
1749 def test_link_bytes(self):
1750 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1751 bytes(self.file2, sys.getfilesystemencoding()))
1752
Brian Curtinf498b752010-11-30 15:54:04 +00001753 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001754 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001755 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001756 except UnicodeError:
1757 raise unittest.SkipTest("Unable to encode for this platform.")
1758
Brian Curtinf498b752010-11-30 15:54:04 +00001759 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001760 self.file2 = self.file1 + "2"
1761 self._test_link(self.file1, self.file2)
1762
Serhiy Storchaka43767632013-11-03 21:31:38 +02001763@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1764class PosixUidGidTests(unittest.TestCase):
1765 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1766 def test_setuid(self):
1767 if os.getuid() != 0:
1768 self.assertRaises(OSError, os.setuid, 0)
1769 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001770
Serhiy Storchaka43767632013-11-03 21:31:38 +02001771 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1772 def test_setgid(self):
1773 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1774 self.assertRaises(OSError, os.setgid, 0)
1775 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001776
Serhiy Storchaka43767632013-11-03 21:31:38 +02001777 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1778 def test_seteuid(self):
1779 if os.getuid() != 0:
1780 self.assertRaises(OSError, os.seteuid, 0)
1781 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001782
Serhiy Storchaka43767632013-11-03 21:31:38 +02001783 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1784 def test_setegid(self):
1785 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1786 self.assertRaises(OSError, os.setegid, 0)
1787 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001788
Serhiy Storchaka43767632013-11-03 21:31:38 +02001789 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1790 def test_setreuid(self):
1791 if os.getuid() != 0:
1792 self.assertRaises(OSError, os.setreuid, 0, 0)
1793 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1794 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001795
Serhiy Storchaka43767632013-11-03 21:31:38 +02001796 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1797 def test_setreuid_neg1(self):
1798 # Needs to accept -1. We run this in a subprocess to avoid
1799 # altering the test runner's process state (issue8045).
1800 subprocess.check_call([
1801 sys.executable, '-c',
1802 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001803
Serhiy Storchaka43767632013-11-03 21:31:38 +02001804 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1805 def test_setregid(self):
1806 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1807 self.assertRaises(OSError, os.setregid, 0, 0)
1808 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1809 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001810
Serhiy Storchaka43767632013-11-03 21:31:38 +02001811 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1812 def test_setregid_neg1(self):
1813 # Needs to accept -1. We run this in a subprocess to avoid
1814 # altering the test runner's process state (issue8045).
1815 subprocess.check_call([
1816 sys.executable, '-c',
1817 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001818
Serhiy Storchaka43767632013-11-03 21:31:38 +02001819@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1820class Pep383Tests(unittest.TestCase):
1821 def setUp(self):
1822 if support.TESTFN_UNENCODABLE:
1823 self.dir = support.TESTFN_UNENCODABLE
1824 elif support.TESTFN_NONASCII:
1825 self.dir = support.TESTFN_NONASCII
1826 else:
1827 self.dir = support.TESTFN
1828 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001829
Serhiy Storchaka43767632013-11-03 21:31:38 +02001830 bytesfn = []
1831 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001832 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001833 fn = os.fsencode(fn)
1834 except UnicodeEncodeError:
1835 return
1836 bytesfn.append(fn)
1837 add_filename(support.TESTFN_UNICODE)
1838 if support.TESTFN_UNENCODABLE:
1839 add_filename(support.TESTFN_UNENCODABLE)
1840 if support.TESTFN_NONASCII:
1841 add_filename(support.TESTFN_NONASCII)
1842 if not bytesfn:
1843 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001844
Serhiy Storchaka43767632013-11-03 21:31:38 +02001845 self.unicodefn = set()
1846 os.mkdir(self.dir)
1847 try:
1848 for fn in bytesfn:
1849 support.create_empty_file(os.path.join(self.bdir, fn))
1850 fn = os.fsdecode(fn)
1851 if fn in self.unicodefn:
1852 raise ValueError("duplicate filename")
1853 self.unicodefn.add(fn)
1854 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001855 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001856 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001857
Serhiy Storchaka43767632013-11-03 21:31:38 +02001858 def tearDown(self):
1859 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001860
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 def test_listdir(self):
1862 expected = self.unicodefn
1863 found = set(os.listdir(self.dir))
1864 self.assertEqual(found, expected)
1865 # test listdir without arguments
1866 current_directory = os.getcwd()
1867 try:
1868 os.chdir(os.sep)
1869 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1870 finally:
1871 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001872
Serhiy Storchaka43767632013-11-03 21:31:38 +02001873 def test_open(self):
1874 for fn in self.unicodefn:
1875 f = open(os.path.join(self.dir, fn), 'rb')
1876 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001877
Serhiy Storchaka43767632013-11-03 21:31:38 +02001878 @unittest.skipUnless(hasattr(os, 'statvfs'),
1879 "need os.statvfs()")
1880 def test_statvfs(self):
1881 # issue #9645
1882 for fn in self.unicodefn:
1883 # should not fail with file not found error
1884 fullname = os.path.join(self.dir, fn)
1885 os.statvfs(fullname)
1886
1887 def test_stat(self):
1888 for fn in self.unicodefn:
1889 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001890
Brian Curtineb24d742010-04-12 17:16:38 +00001891@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1892class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001893 def _kill(self, sig):
1894 # Start sys.executable as a subprocess and communicate from the
1895 # subprocess to the parent that the interpreter is ready. When it
1896 # becomes ready, send *sig* via os.kill to the subprocess and check
1897 # that the return code is equal to *sig*.
1898 import ctypes
1899 from ctypes import wintypes
1900 import msvcrt
1901
1902 # Since we can't access the contents of the process' stdout until the
1903 # process has exited, use PeekNamedPipe to see what's inside stdout
1904 # without waiting. This is done so we can tell that the interpreter
1905 # is started and running at a point where it could handle a signal.
1906 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1907 PeekNamedPipe.restype = wintypes.BOOL
1908 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1909 ctypes.POINTER(ctypes.c_char), # stdout buf
1910 wintypes.DWORD, # Buffer size
1911 ctypes.POINTER(wintypes.DWORD), # bytes read
1912 ctypes.POINTER(wintypes.DWORD), # bytes avail
1913 ctypes.POINTER(wintypes.DWORD)) # bytes left
1914 msg = "running"
1915 proc = subprocess.Popen([sys.executable, "-c",
1916 "import sys;"
1917 "sys.stdout.write('{}');"
1918 "sys.stdout.flush();"
1919 "input()".format(msg)],
1920 stdout=subprocess.PIPE,
1921 stderr=subprocess.PIPE,
1922 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001923 self.addCleanup(proc.stdout.close)
1924 self.addCleanup(proc.stderr.close)
1925 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001926
1927 count, max = 0, 100
1928 while count < max and proc.poll() is None:
1929 # Create a string buffer to store the result of stdout from the pipe
1930 buf = ctypes.create_string_buffer(len(msg))
1931 # Obtain the text currently in proc.stdout
1932 # Bytes read/avail/left are left as NULL and unused
1933 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1934 buf, ctypes.sizeof(buf), None, None, None)
1935 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1936 if buf.value:
1937 self.assertEqual(msg, buf.value.decode())
1938 break
1939 time.sleep(0.1)
1940 count += 1
1941 else:
1942 self.fail("Did not receive communication from the subprocess")
1943
Brian Curtineb24d742010-04-12 17:16:38 +00001944 os.kill(proc.pid, sig)
1945 self.assertEqual(proc.wait(), sig)
1946
1947 def test_kill_sigterm(self):
1948 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001949 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001950
1951 def test_kill_int(self):
1952 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001953 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001954
1955 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001956 tagname = "test_os_%s" % uuid.uuid1()
1957 m = mmap.mmap(-1, 1, tagname)
1958 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001959 # Run a script which has console control handling enabled.
1960 proc = subprocess.Popen([sys.executable,
1961 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001962 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001963 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1964 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001965 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001966 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001967 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001968 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001969 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001970 count += 1
1971 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001972 # Forcefully kill the process if we weren't able to signal it.
1973 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001974 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001975 os.kill(proc.pid, event)
1976 # proc.send_signal(event) could also be done here.
1977 # Allow time for the signal to be passed and the process to exit.
1978 time.sleep(0.5)
1979 if not proc.poll():
1980 # Forcefully kill the process if we weren't able to signal it.
1981 os.kill(proc.pid, signal.SIGINT)
1982 self.fail("subprocess did not stop on {}".format(name))
1983
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001984 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001985 def test_CTRL_C_EVENT(self):
1986 from ctypes import wintypes
1987 import ctypes
1988
1989 # Make a NULL value by creating a pointer with no argument.
1990 NULL = ctypes.POINTER(ctypes.c_int)()
1991 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1992 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1993 wintypes.BOOL)
1994 SetConsoleCtrlHandler.restype = wintypes.BOOL
1995
1996 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001997 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001998 # by subprocesses.
1999 SetConsoleCtrlHandler(NULL, 0)
2000
2001 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2002
2003 def test_CTRL_BREAK_EVENT(self):
2004 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2005
2006
Brian Curtind40e6f72010-07-08 21:39:08 +00002007@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002008class Win32ListdirTests(unittest.TestCase):
2009 """Test listdir on Windows."""
2010
2011 def setUp(self):
2012 self.created_paths = []
2013 for i in range(2):
2014 dir_name = 'SUB%d' % i
2015 dir_path = os.path.join(support.TESTFN, dir_name)
2016 file_name = 'FILE%d' % i
2017 file_path = os.path.join(support.TESTFN, file_name)
2018 os.makedirs(dir_path)
2019 with open(file_path, 'w') as f:
2020 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2021 self.created_paths.extend([dir_name, file_name])
2022 self.created_paths.sort()
2023
2024 def tearDown(self):
2025 shutil.rmtree(support.TESTFN)
2026
2027 def test_listdir_no_extended_path(self):
2028 """Test when the path is not an "extended" path."""
2029 # unicode
2030 self.assertEqual(
2031 sorted(os.listdir(support.TESTFN)),
2032 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002033
Tim Golden781bbeb2013-10-25 20:24:06 +01002034 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002035 self.assertEqual(
2036 sorted(os.listdir(os.fsencode(support.TESTFN))),
2037 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002038
2039 def test_listdir_extended_path(self):
2040 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002041 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002042 # unicode
2043 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2044 self.assertEqual(
2045 sorted(os.listdir(path)),
2046 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002047
Tim Golden781bbeb2013-10-25 20:24:06 +01002048 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002049 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2050 self.assertEqual(
2051 sorted(os.listdir(path)),
2052 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002053
2054
2055@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002056@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002057class Win32SymlinkTests(unittest.TestCase):
2058 filelink = 'filelinktest'
2059 filelink_target = os.path.abspath(__file__)
2060 dirlink = 'dirlinktest'
2061 dirlink_target = os.path.dirname(filelink_target)
2062 missing_link = 'missing link'
2063
2064 def setUp(self):
2065 assert os.path.exists(self.dirlink_target)
2066 assert os.path.exists(self.filelink_target)
2067 assert not os.path.exists(self.dirlink)
2068 assert not os.path.exists(self.filelink)
2069 assert not os.path.exists(self.missing_link)
2070
2071 def tearDown(self):
2072 if os.path.exists(self.filelink):
2073 os.remove(self.filelink)
2074 if os.path.exists(self.dirlink):
2075 os.rmdir(self.dirlink)
2076 if os.path.lexists(self.missing_link):
2077 os.remove(self.missing_link)
2078
2079 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002080 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002081 self.assertTrue(os.path.exists(self.dirlink))
2082 self.assertTrue(os.path.isdir(self.dirlink))
2083 self.assertTrue(os.path.islink(self.dirlink))
2084 self.check_stat(self.dirlink, self.dirlink_target)
2085
2086 def test_file_link(self):
2087 os.symlink(self.filelink_target, self.filelink)
2088 self.assertTrue(os.path.exists(self.filelink))
2089 self.assertTrue(os.path.isfile(self.filelink))
2090 self.assertTrue(os.path.islink(self.filelink))
2091 self.check_stat(self.filelink, self.filelink_target)
2092
2093 def _create_missing_dir_link(self):
2094 'Create a "directory" link to a non-existent target'
2095 linkname = self.missing_link
2096 if os.path.lexists(linkname):
2097 os.remove(linkname)
2098 target = r'c:\\target does not exist.29r3c740'
2099 assert not os.path.exists(target)
2100 target_is_dir = True
2101 os.symlink(target, linkname, target_is_dir)
2102
2103 def test_remove_directory_link_to_missing_target(self):
2104 self._create_missing_dir_link()
2105 # For compatibility with Unix, os.remove will check the
2106 # directory status and call RemoveDirectory if the symlink
2107 # was created with target_is_dir==True.
2108 os.remove(self.missing_link)
2109
2110 @unittest.skip("currently fails; consider for improvement")
2111 def test_isdir_on_directory_link_to_missing_target(self):
2112 self._create_missing_dir_link()
2113 # consider having isdir return true for directory links
2114 self.assertTrue(os.path.isdir(self.missing_link))
2115
2116 @unittest.skip("currently fails; consider for improvement")
2117 def test_rmdir_on_directory_link_to_missing_target(self):
2118 self._create_missing_dir_link()
2119 # consider allowing rmdir to remove directory links
2120 os.rmdir(self.missing_link)
2121
2122 def check_stat(self, link, target):
2123 self.assertEqual(os.stat(link), os.stat(target))
2124 self.assertNotEqual(os.lstat(link), os.stat(link))
2125
Brian Curtind25aef52011-06-13 15:16:04 -05002126 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002127 self.assertEqual(os.stat(bytes_link), os.stat(target))
2128 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002129
2130 def test_12084(self):
2131 level1 = os.path.abspath(support.TESTFN)
2132 level2 = os.path.join(level1, "level2")
2133 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002134 self.addCleanup(support.rmtree, level1)
2135
2136 os.mkdir(level1)
2137 os.mkdir(level2)
2138 os.mkdir(level3)
2139
2140 file1 = os.path.abspath(os.path.join(level1, "file1"))
2141 create_file(file1)
2142
2143 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002144 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002145 os.chdir(level2)
2146 link = os.path.join(level2, "link")
2147 os.symlink(os.path.relpath(file1), "link")
2148 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002149
Victor Stinnerae39d232016-03-24 17:12:55 +01002150 # Check os.stat calls from the same dir as the link
2151 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002152
Victor Stinnerae39d232016-03-24 17:12:55 +01002153 # Check os.stat calls from a dir below the link
2154 os.chdir(level1)
2155 self.assertEqual(os.stat(file1),
2156 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002157
Victor Stinnerae39d232016-03-24 17:12:55 +01002158 # Check os.stat calls from a dir above the link
2159 os.chdir(level3)
2160 self.assertEqual(os.stat(file1),
2161 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002162 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002163 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002164
Brian Curtind40e6f72010-07-08 21:39:08 +00002165
Tim Golden0321cf22014-05-05 19:46:17 +01002166@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2167class Win32JunctionTests(unittest.TestCase):
2168 junction = 'junctiontest'
2169 junction_target = os.path.dirname(os.path.abspath(__file__))
2170
2171 def setUp(self):
2172 assert os.path.exists(self.junction_target)
2173 assert not os.path.exists(self.junction)
2174
2175 def tearDown(self):
2176 if os.path.exists(self.junction):
2177 # os.rmdir delegates to Windows' RemoveDirectoryW,
2178 # which removes junction points safely.
2179 os.rmdir(self.junction)
2180
2181 def test_create_junction(self):
2182 _winapi.CreateJunction(self.junction_target, self.junction)
2183 self.assertTrue(os.path.exists(self.junction))
2184 self.assertTrue(os.path.isdir(self.junction))
2185
2186 # Junctions are not recognized as links.
2187 self.assertFalse(os.path.islink(self.junction))
2188
2189 def test_unlink_removes_junction(self):
2190 _winapi.CreateJunction(self.junction_target, self.junction)
2191 self.assertTrue(os.path.exists(self.junction))
2192
2193 os.unlink(self.junction)
2194 self.assertFalse(os.path.exists(self.junction))
2195
2196
Jason R. Coombs3a092862013-05-27 23:21:28 -04002197@support.skip_unless_symlink
2198class NonLocalSymlinkTests(unittest.TestCase):
2199
2200 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002201 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002202 Create this structure:
2203
2204 base
2205 \___ some_dir
2206 """
2207 os.makedirs('base/some_dir')
2208
2209 def tearDown(self):
2210 shutil.rmtree('base')
2211
2212 def test_directory_link_nonlocal(self):
2213 """
2214 The symlink target should resolve relative to the link, not relative
2215 to the current directory.
2216
2217 Then, link base/some_link -> base/some_dir and ensure that some_link
2218 is resolved as a directory.
2219
2220 In issue13772, it was discovered that directory detection failed if
2221 the symlink target was not specified relative to the current
2222 directory, which was a defect in the implementation.
2223 """
2224 src = os.path.join('base', 'some_link')
2225 os.symlink('some_dir', src)
2226 assert os.path.isdir(src)
2227
2228
Victor Stinnere8d51452010-08-19 01:05:19 +00002229class FSEncodingTests(unittest.TestCase):
2230 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002231 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2232 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002233
Victor Stinnere8d51452010-08-19 01:05:19 +00002234 def test_identity(self):
2235 # assert fsdecode(fsencode(x)) == x
2236 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2237 try:
2238 bytesfn = os.fsencode(fn)
2239 except UnicodeEncodeError:
2240 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002242
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002243
Brett Cannonefb00c02012-02-29 18:31:31 -05002244
2245class DeviceEncodingTests(unittest.TestCase):
2246
2247 def test_bad_fd(self):
2248 # Return None when an fd doesn't actually exist.
2249 self.assertIsNone(os.device_encoding(123456))
2250
Philip Jenveye308b7c2012-02-29 16:16:15 -08002251 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2252 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002253 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002254 def test_device_encoding(self):
2255 encoding = os.device_encoding(0)
2256 self.assertIsNotNone(encoding)
2257 self.assertTrue(codecs.lookup(encoding))
2258
2259
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002260class PidTests(unittest.TestCase):
2261 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2262 def test_getppid(self):
2263 p = subprocess.Popen([sys.executable, '-c',
2264 'import os; print(os.getppid())'],
2265 stdout=subprocess.PIPE)
2266 stdout, _ = p.communicate()
2267 # We are the parent of our subprocess
2268 self.assertEqual(int(stdout), os.getpid())
2269
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002270 def test_waitpid(self):
2271 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002272 # Add an implicit test for PyUnicode_FSConverter().
2273 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002274 status = os.waitpid(pid, 0)
2275 self.assertEqual(status, (pid, 0))
2276
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002277
Victor Stinner4659ccf2016-09-14 10:57:00 +02002278class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002279 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002280 self.exitcode = 17
2281
2282 filename = support.TESTFN
2283 self.addCleanup(support.unlink, filename)
2284
2285 if not with_env:
2286 code = 'import sys; sys.exit(%s)' % self.exitcode
2287 else:
2288 self.env = dict(os.environ)
2289 # create an unique key
2290 self.key = str(uuid.uuid4())
2291 self.env[self.key] = self.key
2292 # read the variable from os.environ to check that it exists
2293 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2294 % (self.key, self.exitcode))
2295
2296 with open(filename, "w") as fp:
2297 fp.write(code)
2298
Berker Peksag81816462016-09-15 20:19:47 +03002299 args = [sys.executable, filename]
2300 if use_bytes:
2301 args = [os.fsencode(a) for a in args]
2302 self.env = {os.fsencode(k): os.fsencode(v)
2303 for k, v in self.env.items()}
2304
2305 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002306
Berker Peksag4af23d72016-09-15 20:32:44 +03002307 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002308 def test_spawnl(self):
2309 args = self.create_args()
2310 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2311 self.assertEqual(exitcode, self.exitcode)
2312
Berker Peksag4af23d72016-09-15 20:32:44 +03002313 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002314 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002315 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002316 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2317 self.assertEqual(exitcode, self.exitcode)
2318
Berker Peksag4af23d72016-09-15 20:32:44 +03002319 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002320 def test_spawnlp(self):
2321 args = self.create_args()
2322 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2323 self.assertEqual(exitcode, self.exitcode)
2324
Berker Peksag4af23d72016-09-15 20:32:44 +03002325 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002326 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002327 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002328 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2329 self.assertEqual(exitcode, self.exitcode)
2330
Berker Peksag4af23d72016-09-15 20:32:44 +03002331 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002332 def test_spawnv(self):
2333 args = self.create_args()
2334 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2335 self.assertEqual(exitcode, self.exitcode)
2336
Berker Peksag4af23d72016-09-15 20:32:44 +03002337 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002338 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002339 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002340 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2341 self.assertEqual(exitcode, self.exitcode)
2342
Berker Peksag4af23d72016-09-15 20:32:44 +03002343 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002344 def test_spawnvp(self):
2345 args = self.create_args()
2346 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2347 self.assertEqual(exitcode, self.exitcode)
2348
Berker Peksag4af23d72016-09-15 20:32:44 +03002349 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002350 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002351 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002352 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2353 self.assertEqual(exitcode, self.exitcode)
2354
Berker Peksag4af23d72016-09-15 20:32:44 +03002355 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002356 def test_nowait(self):
2357 args = self.create_args()
2358 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2359 result = os.waitpid(pid, 0)
2360 self.assertEqual(result[0], pid)
2361 status = result[1]
2362 if hasattr(os, 'WIFEXITED'):
2363 self.assertTrue(os.WIFEXITED(status))
2364 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2365 else:
2366 self.assertEqual(status, self.exitcode << 8)
2367
Berker Peksag4af23d72016-09-15 20:32:44 +03002368 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002369 def test_spawnve_bytes(self):
2370 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2371 args = self.create_args(with_env=True, use_bytes=True)
2372 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2373 self.assertEqual(exitcode, self.exitcode)
2374
Steve Dower859fd7b2016-11-19 18:53:19 -08002375 @requires_os_func('spawnl')
2376 def test_spawnl_noargs(self):
2377 args = self.create_args()
2378 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002379 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002380
2381 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002382 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002383 args = self.create_args()
2384 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002385 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002386
2387 @requires_os_func('spawnv')
2388 def test_spawnv_noargs(self):
2389 args = self.create_args()
2390 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2391 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002392 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2393 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002394
2395 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002396 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002397 args = self.create_args()
2398 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2399 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002400 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2401 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002402
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002403 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002404 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002405
Ville Skyttä49b27342017-08-03 09:00:59 +03002406 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002407 newenv = os.environ.copy()
2408 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2409 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002410 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002411 except ValueError:
2412 pass
2413 else:
2414 self.assertEqual(exitcode, 127)
2415
Ville Skyttä49b27342017-08-03 09:00:59 +03002416 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002417 newenv = os.environ.copy()
2418 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2419 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002420 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002421 except ValueError:
2422 pass
2423 else:
2424 self.assertEqual(exitcode, 127)
2425
Ville Skyttä49b27342017-08-03 09:00:59 +03002426 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002427 newenv = os.environ.copy()
2428 newenv["FRUIT=ORANGE"] = "lemon"
2429 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002430 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002431 except ValueError:
2432 pass
2433 else:
2434 self.assertEqual(exitcode, 127)
2435
Ville Skyttä49b27342017-08-03 09:00:59 +03002436 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002437 filename = support.TESTFN
2438 self.addCleanup(support.unlink, filename)
2439 with open(filename, "w") as fp:
2440 fp.write('import sys, os\n'
2441 'if os.getenv("FRUIT") != "orange=lemon":\n'
2442 ' raise AssertionError')
2443 args = [sys.executable, filename]
2444 newenv = os.environ.copy()
2445 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002446 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002447 self.assertEqual(exitcode, 0)
2448
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002449 @requires_os_func('spawnve')
2450 def test_spawnve_invalid_env(self):
2451 self._test_invalid_env(os.spawnve)
2452
2453 @requires_os_func('spawnvpe')
2454 def test_spawnvpe_invalid_env(self):
2455 self._test_invalid_env(os.spawnvpe)
2456
Serhiy Storchaka77703942017-06-25 07:33:01 +03002457
Brian Curtin0151b8e2010-09-24 13:43:43 +00002458# The introduction of this TestCase caused at least two different errors on
2459# *nix buildbots. Temporarily skip this to let the buildbots move along.
2460@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002461@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2462class LoginTests(unittest.TestCase):
2463 def test_getlogin(self):
2464 user_name = os.getlogin()
2465 self.assertNotEqual(len(user_name), 0)
2466
2467
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002468@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2469 "needs os.getpriority and os.setpriority")
2470class ProgramPriorityTests(unittest.TestCase):
2471 """Tests for os.getpriority() and os.setpriority()."""
2472
2473 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002474
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002475 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2476 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2477 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002478 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2479 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002480 raise unittest.SkipTest("unable to reliably test setpriority "
2481 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002482 else:
2483 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002484 finally:
2485 try:
2486 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2487 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002488 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002489 raise
2490
2491
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002492class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002493
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002494 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002495
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002496 def __init__(self, conn):
2497 asynchat.async_chat.__init__(self, conn)
2498 self.in_buffer = []
2499 self.closed = False
2500 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002501
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 def handle_read(self):
2503 data = self.recv(4096)
2504 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002505
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506 def get_data(self):
2507 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002508
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002509 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002510 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002511 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002512
2513 def handle_error(self):
2514 raise
2515
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002516 def __init__(self, address):
2517 threading.Thread.__init__(self)
2518 asyncore.dispatcher.__init__(self)
2519 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2520 self.bind(address)
2521 self.listen(5)
2522 self.host, self.port = self.socket.getsockname()[:2]
2523 self.handler_instance = None
2524 self._active = False
2525 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 # --- public API
2528
2529 @property
2530 def running(self):
2531 return self._active
2532
2533 def start(self):
2534 assert not self.running
2535 self.__flag = threading.Event()
2536 threading.Thread.start(self)
2537 self.__flag.wait()
2538
2539 def stop(self):
2540 assert self.running
2541 self._active = False
2542 self.join()
2543
2544 def wait(self):
2545 # wait for handler connection to be closed, then stop the server
2546 while not getattr(self.handler_instance, "closed", False):
2547 time.sleep(0.001)
2548 self.stop()
2549
2550 # --- internals
2551
2552 def run(self):
2553 self._active = True
2554 self.__flag.set()
2555 while self._active and asyncore.socket_map:
2556 self._active_lock.acquire()
2557 asyncore.loop(timeout=0.001, count=1)
2558 self._active_lock.release()
2559 asyncore.close_all()
2560
2561 def handle_accept(self):
2562 conn, addr = self.accept()
2563 self.handler_instance = self.Handler(conn)
2564
2565 def handle_connect(self):
2566 self.close()
2567 handle_read = handle_connect
2568
2569 def writable(self):
2570 return 0
2571
2572 def handle_error(self):
2573 raise
2574
2575
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002576@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2577class TestSendfile(unittest.TestCase):
2578
Victor Stinner8c663fd2017-11-08 14:44:44 -08002579 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002580 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002581 not sys.platform.startswith("solaris") and \
2582 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002583 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2584 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002585
2586 @classmethod
2587 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002588 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002589 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002590
2591 @classmethod
2592 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002593 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002594 support.unlink(support.TESTFN)
2595
2596 def setUp(self):
2597 self.server = SendfileTestServer((support.HOST, 0))
2598 self.server.start()
2599 self.client = socket.socket()
2600 self.client.connect((self.server.host, self.server.port))
2601 self.client.settimeout(1)
2602 # synchronize by waiting for "220 ready" response
2603 self.client.recv(1024)
2604 self.sockno = self.client.fileno()
2605 self.file = open(support.TESTFN, 'rb')
2606 self.fileno = self.file.fileno()
2607
2608 def tearDown(self):
2609 self.file.close()
2610 self.client.close()
2611 if self.server.running:
2612 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002613 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002614
2615 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2616 """A higher level wrapper representing how an application is
2617 supposed to use sendfile().
2618 """
2619 while 1:
2620 try:
2621 if self.SUPPORT_HEADERS_TRAILERS:
2622 return os.sendfile(sock, file, offset, nbytes, headers,
2623 trailers)
2624 else:
2625 return os.sendfile(sock, file, offset, nbytes)
2626 except OSError as err:
2627 if err.errno == errno.ECONNRESET:
2628 # disconnected
2629 raise
2630 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2631 # we have to retry send data
2632 continue
2633 else:
2634 raise
2635
2636 def test_send_whole_file(self):
2637 # normal send
2638 total_sent = 0
2639 offset = 0
2640 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002641 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002642 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2643 if sent == 0:
2644 break
2645 offset += sent
2646 total_sent += sent
2647 self.assertTrue(sent <= nbytes)
2648 self.assertEqual(offset, total_sent)
2649
2650 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002651 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002652 self.client.close()
2653 self.server.wait()
2654 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002655 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002656 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002657
2658 def test_send_at_certain_offset(self):
2659 # start sending a file at a certain offset
2660 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002661 offset = len(self.DATA) // 2
2662 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002663 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002664 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002665 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2666 if sent == 0:
2667 break
2668 offset += sent
2669 total_sent += sent
2670 self.assertTrue(sent <= nbytes)
2671
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002672 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002673 self.client.close()
2674 self.server.wait()
2675 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002676 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002677 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002678 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002679 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002680
2681 def test_offset_overflow(self):
2682 # specify an offset > file size
2683 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002684 try:
2685 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2686 except OSError as e:
2687 # Solaris can raise EINVAL if offset >= file length, ignore.
2688 if e.errno != errno.EINVAL:
2689 raise
2690 else:
2691 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002692 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002693 self.client.close()
2694 self.server.wait()
2695 data = self.server.handler_instance.get_data()
2696 self.assertEqual(data, b'')
2697
2698 def test_invalid_offset(self):
2699 with self.assertRaises(OSError) as cm:
2700 os.sendfile(self.sockno, self.fileno, -1, 4096)
2701 self.assertEqual(cm.exception.errno, errno.EINVAL)
2702
Martin Panterbf19d162015-09-09 01:01:13 +00002703 def test_keywords(self):
2704 # Keyword arguments should be supported
2705 os.sendfile(out=self.sockno, offset=0, count=4096,
2706 **{'in': self.fileno})
2707 if self.SUPPORT_HEADERS_TRAILERS:
2708 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002709 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002710
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002711 # --- headers / trailers tests
2712
Serhiy Storchaka43767632013-11-03 21:31:38 +02002713 @requires_headers_trailers
2714 def test_headers(self):
2715 total_sent = 0
2716 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2717 headers=[b"x" * 512])
2718 total_sent += sent
2719 offset = 4096
2720 nbytes = 4096
2721 while 1:
2722 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2723 offset, nbytes)
2724 if sent == 0:
2725 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002726 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002727 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002728
Serhiy Storchaka43767632013-11-03 21:31:38 +02002729 expected_data = b"x" * 512 + self.DATA
2730 self.assertEqual(total_sent, len(expected_data))
2731 self.client.close()
2732 self.server.wait()
2733 data = self.server.handler_instance.get_data()
2734 self.assertEqual(hash(data), hash(expected_data))
2735
2736 @requires_headers_trailers
2737 def test_trailers(self):
2738 TESTFN2 = support.TESTFN + "2"
2739 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002740
2741 self.addCleanup(support.unlink, TESTFN2)
2742 create_file(TESTFN2, file_data)
2743
2744 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002745 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2746 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002747 self.client.close()
2748 self.server.wait()
2749 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002750 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002751
Serhiy Storchaka43767632013-11-03 21:31:38 +02002752 @requires_headers_trailers
2753 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2754 'test needs os.SF_NODISKIO')
2755 def test_flags(self):
2756 try:
2757 os.sendfile(self.sockno, self.fileno, 0, 4096,
2758 flags=os.SF_NODISKIO)
2759 except OSError as err:
2760 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2761 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002762
2763
Larry Hastings9cf065c2012-06-22 16:30:09 -07002764def supports_extended_attributes():
2765 if not hasattr(os, "setxattr"):
2766 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002767
Larry Hastings9cf065c2012-06-22 16:30:09 -07002768 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002769 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002770 try:
2771 os.setxattr(fp.fileno(), b"user.test", b"")
2772 except OSError:
2773 return False
2774 finally:
2775 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002776
2777 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002778
2779
2780@unittest.skipUnless(supports_extended_attributes(),
2781 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002782# Kernels < 2.6.39 don't respect setxattr flags.
2783@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002784class ExtendedAttributeTests(unittest.TestCase):
2785
Larry Hastings9cf065c2012-06-22 16:30:09 -07002786 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002787 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002788 self.addCleanup(support.unlink, fn)
2789 create_file(fn)
2790
Benjamin Peterson799bd802011-08-31 22:15:17 -04002791 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002792 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002793 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002794
Victor Stinnerf12e5062011-10-16 22:12:03 +02002795 init_xattr = listxattr(fn)
2796 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002797
Larry Hastings9cf065c2012-06-22 16:30:09 -07002798 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002799 xattr = set(init_xattr)
2800 xattr.add("user.test")
2801 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002802 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2803 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2804 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002805
Benjamin Peterson799bd802011-08-31 22:15:17 -04002806 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002807 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002808 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002809
Benjamin Peterson799bd802011-08-31 22:15:17 -04002810 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002811 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002812 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002813
Larry Hastings9cf065c2012-06-22 16:30:09 -07002814 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002815 xattr.add("user.test2")
2816 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002817 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002818
Benjamin Peterson799bd802011-08-31 22:15:17 -04002819 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002820 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002821 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002822
Victor Stinnerf12e5062011-10-16 22:12:03 +02002823 xattr.remove("user.test")
2824 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002825 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2826 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2827 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2828 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002829 many = sorted("user.test{}".format(i) for i in range(100))
2830 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002831 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002832 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002833
Larry Hastings9cf065c2012-06-22 16:30:09 -07002834 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002835 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002836 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002837
2838 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2839 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002840
2841 def test_simple(self):
2842 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2843 os.listxattr)
2844
2845 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002846 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2847 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002848
2849 def test_fds(self):
2850 def getxattr(path, *args):
2851 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002852 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002853 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002854 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002855 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002856 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002857 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002858 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002859 def listxattr(path, *args):
2860 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002861 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002862 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2863
2864
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002865@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2866class TermsizeTests(unittest.TestCase):
2867 def test_does_not_crash(self):
2868 """Check if get_terminal_size() returns a meaningful value.
2869
2870 There's no easy portable way to actually check the size of the
2871 terminal, so let's check if it returns something sensible instead.
2872 """
2873 try:
2874 size = os.get_terminal_size()
2875 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002876 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002877 # Under win32 a generic OSError can be thrown if the
2878 # handle cannot be retrieved
2879 self.skipTest("failed to query terminal size")
2880 raise
2881
Antoine Pitroucfade362012-02-08 23:48:59 +01002882 self.assertGreaterEqual(size.columns, 0)
2883 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002884
2885 def test_stty_match(self):
2886 """Check if stty returns the same results
2887
2888 stty actually tests stdin, so get_terminal_size is invoked on
2889 stdin explicitly. If stty succeeded, then get_terminal_size()
2890 should work too.
2891 """
2892 try:
2893 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01002894 except (FileNotFoundError, subprocess.CalledProcessError,
2895 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002896 self.skipTest("stty invocation failed")
2897 expected = (int(size[1]), int(size[0])) # reversed order
2898
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002899 try:
2900 actual = os.get_terminal_size(sys.__stdin__.fileno())
2901 except OSError as e:
2902 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2903 # Under win32 a generic OSError can be thrown if the
2904 # handle cannot be retrieved
2905 self.skipTest("failed to query terminal size")
2906 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002907 self.assertEqual(expected, actual)
2908
2909
Victor Stinner292c8352012-10-30 02:17:38 +01002910class OSErrorTests(unittest.TestCase):
2911 def setUp(self):
2912 class Str(str):
2913 pass
2914
Victor Stinnerafe17062012-10-31 22:47:43 +01002915 self.bytes_filenames = []
2916 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002917 if support.TESTFN_UNENCODABLE is not None:
2918 decoded = support.TESTFN_UNENCODABLE
2919 else:
2920 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002921 self.unicode_filenames.append(decoded)
2922 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002923 if support.TESTFN_UNDECODABLE is not None:
2924 encoded = support.TESTFN_UNDECODABLE
2925 else:
2926 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002927 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002928 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002929 self.bytes_filenames.append(memoryview(encoded))
2930
2931 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002932
2933 def test_oserror_filename(self):
2934 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002935 (self.filenames, os.chdir,),
2936 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002937 (self.filenames, os.lstat,),
2938 (self.filenames, os.open, os.O_RDONLY),
2939 (self.filenames, os.rmdir,),
2940 (self.filenames, os.stat,),
2941 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002942 ]
2943 if sys.platform == "win32":
2944 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002945 (self.bytes_filenames, os.rename, b"dst"),
2946 (self.bytes_filenames, os.replace, b"dst"),
2947 (self.unicode_filenames, os.rename, "dst"),
2948 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002949 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002950 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002951 else:
2952 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002953 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002954 (self.filenames, os.rename, "dst"),
2955 (self.filenames, os.replace, "dst"),
2956 ))
2957 if hasattr(os, "chown"):
2958 funcs.append((self.filenames, os.chown, 0, 0))
2959 if hasattr(os, "lchown"):
2960 funcs.append((self.filenames, os.lchown, 0, 0))
2961 if hasattr(os, "truncate"):
2962 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002963 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002964 funcs.append((self.filenames, os.chflags, 0))
2965 if hasattr(os, "lchflags"):
2966 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002967 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002968 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002969 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002970 if sys.platform == "win32":
2971 funcs.append((self.bytes_filenames, os.link, b"dst"))
2972 funcs.append((self.unicode_filenames, os.link, "dst"))
2973 else:
2974 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002975 if hasattr(os, "listxattr"):
2976 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002977 (self.filenames, os.listxattr,),
2978 (self.filenames, os.getxattr, "user.test"),
2979 (self.filenames, os.setxattr, "user.test", b'user'),
2980 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002981 ))
2982 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002983 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002984 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002985 if sys.platform == "win32":
2986 funcs.append((self.unicode_filenames, os.readlink,))
2987 else:
2988 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002989
Steve Dowercc16be82016-09-08 10:35:16 -07002990
Victor Stinnerafe17062012-10-31 22:47:43 +01002991 for filenames, func, *func_args in funcs:
2992 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002993 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002994 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002995 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002996 else:
2997 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2998 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002999 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003000 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003001 except UnicodeDecodeError:
3002 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003003 else:
3004 self.fail("No exception thrown by {}".format(func))
3005
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003006class CPUCountTests(unittest.TestCase):
3007 def test_cpu_count(self):
3008 cpus = os.cpu_count()
3009 if cpus is not None:
3010 self.assertIsInstance(cpus, int)
3011 self.assertGreater(cpus, 0)
3012 else:
3013 self.skipTest("Could not determine the number of CPUs")
3014
Victor Stinnerdaf45552013-08-28 00:53:59 +02003015
3016class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003017 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003018 fd = os.open(__file__, os.O_RDONLY)
3019 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003020 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003021
Victor Stinnerdaf45552013-08-28 00:53:59 +02003022 os.set_inheritable(fd, True)
3023 self.assertEqual(os.get_inheritable(fd), True)
3024
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003025 @unittest.skipIf(fcntl is None, "need fcntl")
3026 def test_get_inheritable_cloexec(self):
3027 fd = os.open(__file__, os.O_RDONLY)
3028 self.addCleanup(os.close, fd)
3029 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003030
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003031 # clear FD_CLOEXEC flag
3032 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3033 flags &= ~fcntl.FD_CLOEXEC
3034 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003035
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003036 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003037
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003038 @unittest.skipIf(fcntl is None, "need fcntl")
3039 def test_set_inheritable_cloexec(self):
3040 fd = os.open(__file__, os.O_RDONLY)
3041 self.addCleanup(os.close, fd)
3042 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3043 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003044
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003045 os.set_inheritable(fd, True)
3046 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3047 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003048
Victor Stinnerdaf45552013-08-28 00:53:59 +02003049 def test_open(self):
3050 fd = os.open(__file__, os.O_RDONLY)
3051 self.addCleanup(os.close, fd)
3052 self.assertEqual(os.get_inheritable(fd), False)
3053
3054 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3055 def test_pipe(self):
3056 rfd, wfd = os.pipe()
3057 self.addCleanup(os.close, rfd)
3058 self.addCleanup(os.close, wfd)
3059 self.assertEqual(os.get_inheritable(rfd), False)
3060 self.assertEqual(os.get_inheritable(wfd), False)
3061
3062 def test_dup(self):
3063 fd1 = os.open(__file__, os.O_RDONLY)
3064 self.addCleanup(os.close, fd1)
3065
3066 fd2 = os.dup(fd1)
3067 self.addCleanup(os.close, fd2)
3068 self.assertEqual(os.get_inheritable(fd2), False)
3069
3070 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3071 def test_dup2(self):
3072 fd = os.open(__file__, os.O_RDONLY)
3073 self.addCleanup(os.close, fd)
3074
3075 # inheritable by default
3076 fd2 = os.open(__file__, os.O_RDONLY)
3077 try:
3078 os.dup2(fd, fd2)
3079 self.assertEqual(os.get_inheritable(fd2), True)
3080 finally:
3081 os.close(fd2)
3082
3083 # force non-inheritable
3084 fd3 = os.open(__file__, os.O_RDONLY)
3085 try:
3086 os.dup2(fd, fd3, inheritable=False)
3087 self.assertEqual(os.get_inheritable(fd3), False)
3088 finally:
3089 os.close(fd3)
3090
3091 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3092 def test_openpty(self):
3093 master_fd, slave_fd = os.openpty()
3094 self.addCleanup(os.close, master_fd)
3095 self.addCleanup(os.close, slave_fd)
3096 self.assertEqual(os.get_inheritable(master_fd), False)
3097 self.assertEqual(os.get_inheritable(slave_fd), False)
3098
3099
Brett Cannon3f9183b2016-08-26 14:44:48 -07003100class PathTConverterTests(unittest.TestCase):
3101 # tuples of (function name, allows fd arguments, additional arguments to
3102 # function, cleanup function)
3103 functions = [
3104 ('stat', True, (), None),
3105 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003106 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003107 ('chflags', False, (0,), None),
3108 ('lchflags', False, (0,), None),
3109 ('open', False, (0,), getattr(os, 'close', None)),
3110 ]
3111
3112 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003113 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003114 if os.name == 'nt':
3115 bytes_fspath = bytes_filename = None
3116 else:
3117 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003118 bytes_fspath = _PathLike(bytes_filename)
3119 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003120 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003121 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003122
Brett Cannonec6ce872016-09-06 15:50:29 -07003123 int_fspath = _PathLike(fd)
3124 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003125
3126 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3127 with self.subTest(name=name):
3128 try:
3129 fn = getattr(os, name)
3130 except AttributeError:
3131 continue
3132
Brett Cannon8f96a302016-08-26 19:30:11 -07003133 for path in (str_filename, bytes_filename, str_fspath,
3134 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003135 if path is None:
3136 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003137 with self.subTest(name=name, path=path):
3138 result = fn(path, *extra_args)
3139 if cleanup_fn is not None:
3140 cleanup_fn(result)
3141
3142 with self.assertRaisesRegex(
3143 TypeError, 'should be string, bytes'):
3144 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003145
3146 if allow_fd:
3147 result = fn(fd, *extra_args) # should not fail
3148 if cleanup_fn is not None:
3149 cleanup_fn(result)
3150 else:
3151 with self.assertRaisesRegex(
3152 TypeError,
3153 'os.PathLike'):
3154 fn(fd, *extra_args)
3155
3156
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003157@unittest.skipUnless(hasattr(os, 'get_blocking'),
3158 'needs os.get_blocking() and os.set_blocking()')
3159class BlockingTests(unittest.TestCase):
3160 def test_blocking(self):
3161 fd = os.open(__file__, os.O_RDONLY)
3162 self.addCleanup(os.close, fd)
3163 self.assertEqual(os.get_blocking(fd), True)
3164
3165 os.set_blocking(fd, False)
3166 self.assertEqual(os.get_blocking(fd), False)
3167
3168 os.set_blocking(fd, True)
3169 self.assertEqual(os.get_blocking(fd), True)
3170
3171
Yury Selivanov97e2e062014-09-26 12:33:06 -04003172
3173class ExportsTests(unittest.TestCase):
3174 def test_os_all(self):
3175 self.assertIn('open', os.__all__)
3176 self.assertIn('walk', os.__all__)
3177
3178
Victor Stinner6036e442015-03-08 01:58:04 +01003179class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003180 check_no_resource_warning = support.check_no_resource_warning
3181
Victor Stinner6036e442015-03-08 01:58:04 +01003182 def setUp(self):
3183 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003184 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003185 self.addCleanup(support.rmtree, self.path)
3186 os.mkdir(self.path)
3187
3188 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003189 path = self.bytes_path if isinstance(name, bytes) else self.path
3190 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003191 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003192 return filename
3193
3194 def get_entries(self, names):
3195 entries = dict((entry.name, entry)
3196 for entry in os.scandir(self.path))
3197 self.assertEqual(sorted(entries.keys()), names)
3198 return entries
3199
3200 def assert_stat_equal(self, stat1, stat2, skip_fields):
3201 if skip_fields:
3202 for attr in dir(stat1):
3203 if not attr.startswith("st_"):
3204 continue
3205 if attr in ("st_dev", "st_ino", "st_nlink"):
3206 continue
3207 self.assertEqual(getattr(stat1, attr),
3208 getattr(stat2, attr),
3209 (stat1, stat2, attr))
3210 else:
3211 self.assertEqual(stat1, stat2)
3212
3213 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003214 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003215 self.assertEqual(entry.name, name)
3216 self.assertEqual(entry.path, os.path.join(self.path, name))
3217 self.assertEqual(entry.inode(),
3218 os.stat(entry.path, follow_symlinks=False).st_ino)
3219
3220 entry_stat = os.stat(entry.path)
3221 self.assertEqual(entry.is_dir(),
3222 stat.S_ISDIR(entry_stat.st_mode))
3223 self.assertEqual(entry.is_file(),
3224 stat.S_ISREG(entry_stat.st_mode))
3225 self.assertEqual(entry.is_symlink(),
3226 os.path.islink(entry.path))
3227
3228 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3229 self.assertEqual(entry.is_dir(follow_symlinks=False),
3230 stat.S_ISDIR(entry_lstat.st_mode))
3231 self.assertEqual(entry.is_file(follow_symlinks=False),
3232 stat.S_ISREG(entry_lstat.st_mode))
3233
3234 self.assert_stat_equal(entry.stat(),
3235 entry_stat,
3236 os.name == 'nt' and not is_symlink)
3237 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3238 entry_lstat,
3239 os.name == 'nt')
3240
3241 def test_attributes(self):
3242 link = hasattr(os, 'link')
3243 symlink = support.can_symlink()
3244
3245 dirname = os.path.join(self.path, "dir")
3246 os.mkdir(dirname)
3247 filename = self.create_file("file.txt")
3248 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003249 try:
3250 os.link(filename, os.path.join(self.path, "link_file.txt"))
3251 except PermissionError as e:
3252 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003253 if symlink:
3254 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3255 target_is_directory=True)
3256 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3257
3258 names = ['dir', 'file.txt']
3259 if link:
3260 names.append('link_file.txt')
3261 if symlink:
3262 names.extend(('symlink_dir', 'symlink_file.txt'))
3263 entries = self.get_entries(names)
3264
3265 entry = entries['dir']
3266 self.check_entry(entry, 'dir', True, False, False)
3267
3268 entry = entries['file.txt']
3269 self.check_entry(entry, 'file.txt', False, True, False)
3270
3271 if link:
3272 entry = entries['link_file.txt']
3273 self.check_entry(entry, 'link_file.txt', False, True, False)
3274
3275 if symlink:
3276 entry = entries['symlink_dir']
3277 self.check_entry(entry, 'symlink_dir', True, False, True)
3278
3279 entry = entries['symlink_file.txt']
3280 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3281
3282 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003283 path = self.bytes_path if isinstance(name, bytes) else self.path
3284 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003285 self.assertEqual(len(entries), 1)
3286
3287 entry = entries[0]
3288 self.assertEqual(entry.name, name)
3289 return entry
3290
Brett Cannon96881cd2016-06-10 14:37:21 -07003291 def create_file_entry(self, name='file.txt'):
3292 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003293 return self.get_entry(os.path.basename(filename))
3294
3295 def test_current_directory(self):
3296 filename = self.create_file()
3297 old_dir = os.getcwd()
3298 try:
3299 os.chdir(self.path)
3300
3301 # call scandir() without parameter: it must list the content
3302 # of the current directory
3303 entries = dict((entry.name, entry) for entry in os.scandir())
3304 self.assertEqual(sorted(entries.keys()),
3305 [os.path.basename(filename)])
3306 finally:
3307 os.chdir(old_dir)
3308
3309 def test_repr(self):
3310 entry = self.create_file_entry()
3311 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3312
Brett Cannon96881cd2016-06-10 14:37:21 -07003313 def test_fspath_protocol(self):
3314 entry = self.create_file_entry()
3315 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3316
3317 def test_fspath_protocol_bytes(self):
3318 bytes_filename = os.fsencode('bytesfile.txt')
3319 bytes_entry = self.create_file_entry(name=bytes_filename)
3320 fspath = os.fspath(bytes_entry)
3321 self.assertIsInstance(fspath, bytes)
3322 self.assertEqual(fspath,
3323 os.path.join(os.fsencode(self.path),bytes_filename))
3324
Victor Stinner6036e442015-03-08 01:58:04 +01003325 def test_removed_dir(self):
3326 path = os.path.join(self.path, 'dir')
3327
3328 os.mkdir(path)
3329 entry = self.get_entry('dir')
3330 os.rmdir(path)
3331
3332 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3333 if os.name == 'nt':
3334 self.assertTrue(entry.is_dir())
3335 self.assertFalse(entry.is_file())
3336 self.assertFalse(entry.is_symlink())
3337 if os.name == 'nt':
3338 self.assertRaises(FileNotFoundError, entry.inode)
3339 # don't fail
3340 entry.stat()
3341 entry.stat(follow_symlinks=False)
3342 else:
3343 self.assertGreater(entry.inode(), 0)
3344 self.assertRaises(FileNotFoundError, entry.stat)
3345 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3346
3347 def test_removed_file(self):
3348 entry = self.create_file_entry()
3349 os.unlink(entry.path)
3350
3351 self.assertFalse(entry.is_dir())
3352 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3353 if os.name == 'nt':
3354 self.assertTrue(entry.is_file())
3355 self.assertFalse(entry.is_symlink())
3356 if os.name == 'nt':
3357 self.assertRaises(FileNotFoundError, entry.inode)
3358 # don't fail
3359 entry.stat()
3360 entry.stat(follow_symlinks=False)
3361 else:
3362 self.assertGreater(entry.inode(), 0)
3363 self.assertRaises(FileNotFoundError, entry.stat)
3364 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3365
3366 def test_broken_symlink(self):
3367 if not support.can_symlink():
3368 return self.skipTest('cannot create symbolic link')
3369
3370 filename = self.create_file("file.txt")
3371 os.symlink(filename,
3372 os.path.join(self.path, "symlink.txt"))
3373 entries = self.get_entries(['file.txt', 'symlink.txt'])
3374 entry = entries['symlink.txt']
3375 os.unlink(filename)
3376
3377 self.assertGreater(entry.inode(), 0)
3378 self.assertFalse(entry.is_dir())
3379 self.assertFalse(entry.is_file()) # broken symlink returns False
3380 self.assertFalse(entry.is_dir(follow_symlinks=False))
3381 self.assertFalse(entry.is_file(follow_symlinks=False))
3382 self.assertTrue(entry.is_symlink())
3383 self.assertRaises(FileNotFoundError, entry.stat)
3384 # don't fail
3385 entry.stat(follow_symlinks=False)
3386
3387 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003388 self.create_file("file.txt")
3389
3390 path_bytes = os.fsencode(self.path)
3391 entries = list(os.scandir(path_bytes))
3392 self.assertEqual(len(entries), 1, entries)
3393 entry = entries[0]
3394
3395 self.assertEqual(entry.name, b'file.txt')
3396 self.assertEqual(entry.path,
3397 os.fsencode(os.path.join(self.path, 'file.txt')))
3398
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003399 def test_bytes_like(self):
3400 self.create_file("file.txt")
3401
3402 for cls in bytearray, memoryview:
3403 path_bytes = cls(os.fsencode(self.path))
3404 with self.assertWarns(DeprecationWarning):
3405 entries = list(os.scandir(path_bytes))
3406 self.assertEqual(len(entries), 1, entries)
3407 entry = entries[0]
3408
3409 self.assertEqual(entry.name, b'file.txt')
3410 self.assertEqual(entry.path,
3411 os.fsencode(os.path.join(self.path, 'file.txt')))
3412 self.assertIs(type(entry.name), bytes)
3413 self.assertIs(type(entry.path), bytes)
3414
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003415 @unittest.skipUnless(os.listdir in os.supports_fd,
3416 'fd support for listdir required for this test.')
3417 def test_fd(self):
3418 self.assertIn(os.scandir, os.supports_fd)
3419 self.create_file('file.txt')
3420 expected_names = ['file.txt']
3421 if support.can_symlink():
3422 os.symlink('file.txt', os.path.join(self.path, 'link'))
3423 expected_names.append('link')
3424
3425 fd = os.open(self.path, os.O_RDONLY)
3426 try:
3427 with os.scandir(fd) as it:
3428 entries = list(it)
3429 names = [entry.name for entry in entries]
3430 self.assertEqual(sorted(names), expected_names)
3431 self.assertEqual(names, os.listdir(fd))
3432 for entry in entries:
3433 self.assertEqual(entry.path, entry.name)
3434 self.assertEqual(os.fspath(entry), entry.name)
3435 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3436 if os.stat in os.supports_dir_fd:
3437 st = os.stat(entry.name, dir_fd=fd)
3438 self.assertEqual(entry.stat(), st)
3439 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3440 self.assertEqual(entry.stat(follow_symlinks=False), st)
3441 finally:
3442 os.close(fd)
3443
Victor Stinner6036e442015-03-08 01:58:04 +01003444 def test_empty_path(self):
3445 self.assertRaises(FileNotFoundError, os.scandir, '')
3446
3447 def test_consume_iterator_twice(self):
3448 self.create_file("file.txt")
3449 iterator = os.scandir(self.path)
3450
3451 entries = list(iterator)
3452 self.assertEqual(len(entries), 1, entries)
3453
3454 # check than consuming the iterator twice doesn't raise exception
3455 entries2 = list(iterator)
3456 self.assertEqual(len(entries2), 0, entries2)
3457
3458 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003459 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003460 self.assertRaises(TypeError, os.scandir, obj)
3461
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003462 def test_close(self):
3463 self.create_file("file.txt")
3464 self.create_file("file2.txt")
3465 iterator = os.scandir(self.path)
3466 next(iterator)
3467 iterator.close()
3468 # multiple closes
3469 iterator.close()
3470 with self.check_no_resource_warning():
3471 del iterator
3472
3473 def test_context_manager(self):
3474 self.create_file("file.txt")
3475 self.create_file("file2.txt")
3476 with os.scandir(self.path) as iterator:
3477 next(iterator)
3478 with self.check_no_resource_warning():
3479 del iterator
3480
3481 def test_context_manager_close(self):
3482 self.create_file("file.txt")
3483 self.create_file("file2.txt")
3484 with os.scandir(self.path) as iterator:
3485 next(iterator)
3486 iterator.close()
3487
3488 def test_context_manager_exception(self):
3489 self.create_file("file.txt")
3490 self.create_file("file2.txt")
3491 with self.assertRaises(ZeroDivisionError):
3492 with os.scandir(self.path) as iterator:
3493 next(iterator)
3494 1/0
3495 with self.check_no_resource_warning():
3496 del iterator
3497
3498 def test_resource_warning(self):
3499 self.create_file("file.txt")
3500 self.create_file("file2.txt")
3501 iterator = os.scandir(self.path)
3502 next(iterator)
3503 with self.assertWarns(ResourceWarning):
3504 del iterator
3505 support.gc_collect()
3506 # exhausted iterator
3507 iterator = os.scandir(self.path)
3508 list(iterator)
3509 with self.check_no_resource_warning():
3510 del iterator
3511
Victor Stinner6036e442015-03-08 01:58:04 +01003512
Ethan Furmancdc08792016-06-02 15:06:09 -07003513class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003514
3515 # Abstracted so it can be overridden to test pure Python implementation
3516 # if a C version is provided.
3517 fspath = staticmethod(os.fspath)
3518
Ethan Furmancdc08792016-06-02 15:06:09 -07003519 def test_return_bytes(self):
3520 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003521 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003522
3523 def test_return_string(self):
3524 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003525 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003526
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003527 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003528 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003529 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003530
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003531 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003532 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3533 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3534
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003535 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003536 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3537 self.assertTrue(issubclass(_PathLike, os.PathLike))
3538 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003539
Ethan Furmancdc08792016-06-02 15:06:09 -07003540 def test_garbage_in_exception_out(self):
3541 vapor = type('blah', (), {})
3542 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003543 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003544
3545 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003546 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003547
Brett Cannon044283a2016-07-15 10:41:49 -07003548 def test_bad_pathlike(self):
3549 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003550 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003551 # __fspath__ attribute that is not callable.
3552 c = type('foo', (), {})
3553 c.__fspath__ = 1
3554 self.assertRaises(TypeError, self.fspath, c())
3555 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003556 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003557 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003558
Victor Stinnerc29b5852017-11-02 07:28:27 -07003559
3560class TimesTests(unittest.TestCase):
3561 def test_times(self):
3562 times = os.times()
3563 self.assertIsInstance(times, os.times_result)
3564
3565 for field in ('user', 'system', 'children_user', 'children_system',
3566 'elapsed'):
3567 value = getattr(times, field)
3568 self.assertIsInstance(value, float)
3569
3570 if os.name == 'nt':
3571 self.assertEqual(times.children_user, 0)
3572 self.assertEqual(times.children_system, 0)
3573 self.assertEqual(times.elapsed, 0)
3574
3575
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003576# Only test if the C version is provided, otherwise TestPEP519 already tested
3577# the pure Python implementation.
3578if hasattr(os, "_fspath"):
3579 class TestPEP519PurePython(TestPEP519):
3580
3581 """Explicitly test the pure Python implementation of os.fspath()."""
3582
3583 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003584
3585
Fred Drake2e2be372001-09-20 21:33:42 +00003586if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003587 unittest.main()