blob: 6658a61ea2a74a53430024bf756694cce5ebc489 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020064from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Victor Stinnerae39d232016-03-24 17:12:55 +010088def create_file(filename, content=b'content'):
89 with open(filename, "xb", 0) as fp:
90 fp.write(content)
91
92
Thomas Wouters0e3f5912006-08-11 14:57:12 +000093# Tests creating TESTFN
94class FileTests(unittest.TestCase):
95 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000096 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000097 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000098 tearDown = setUp
99
100 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000101 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000102 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000103 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000104
Christian Heimesfdab48e2008-01-20 09:06:41 +0000105 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000106 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
107 # We must allocate two consecutive file descriptors, otherwise
108 # it will mess up other file descriptors (perhaps even the three
109 # standard ones).
110 second = os.dup(first)
111 try:
112 retries = 0
113 while second != first + 1:
114 os.close(first)
115 retries += 1
116 if retries > 10:
117 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000118 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000119 first, second = second, os.dup(second)
120 finally:
121 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000122 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000123 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000124 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000126 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000127 def test_rename(self):
128 path = support.TESTFN
129 old = sys.getrefcount(path)
130 self.assertRaises(TypeError, os.rename, path, 0)
131 new = sys.getrefcount(path)
132 self.assertEqual(old, new)
133
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000134 def test_read(self):
135 with open(support.TESTFN, "w+b") as fobj:
136 fobj.write(b"spam")
137 fobj.flush()
138 fd = fobj.fileno()
139 os.lseek(fd, 0, 0)
140 s = os.read(fd, 4)
141 self.assertEqual(type(s), bytes)
142 self.assertEqual(s, b"spam")
143
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200144 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200145 # Skip the test on 32-bit platforms: the number of bytes must fit in a
146 # Py_ssize_t type
147 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
148 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200149 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
150 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200151 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100152 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200153
154 # Issue #21932: Make sure that os.read() does not raise an
155 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200156 with open(support.TESTFN, "rb") as fp:
157 data = os.read(fp.fileno(), size)
158
Victor Stinner8c663fd2017-11-08 14:44:44 -0800159 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200160 # operating system is free to return less bytes than requested.
161 self.assertEqual(data, b'test')
162
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000163 def test_write(self):
164 # os.write() accepts bytes- and buffer-like objects but not strings
165 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
166 self.assertRaises(TypeError, os.write, fd, "beans")
167 os.write(fd, b"bacon\n")
168 os.write(fd, bytearray(b"eggs\n"))
169 os.write(fd, memoryview(b"spam\n"))
170 os.close(fd)
171 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000172 self.assertEqual(fobj.read().splitlines(),
173 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000174
Victor Stinnere0daff12011-03-20 23:36:35 +0100175 def write_windows_console(self, *args):
176 retcode = subprocess.call(args,
177 # use a new console to not flood the test output
178 creationflags=subprocess.CREATE_NEW_CONSOLE,
179 # use a shell to hide the console window (SW_HIDE)
180 shell=True)
181 self.assertEqual(retcode, 0)
182
183 @unittest.skipUnless(sys.platform == 'win32',
184 'test specific to the Windows console')
185 def test_write_windows_console(self):
186 # Issue #11395: the Windows console returns an error (12: not enough
187 # space error) on writing into stdout if stdout mode is binary and the
188 # length is greater than 66,000 bytes (or less, depending on heap
189 # usage).
190 code = "print('x' * 100000)"
191 self.write_windows_console(sys.executable, "-c", code)
192 self.write_windows_console(sys.executable, "-u", "-c", code)
193
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000194 def fdopen_helper(self, *args):
195 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200196 f = os.fdopen(fd, *args)
197 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000198
199 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200200 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
201 os.close(fd)
202
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000203 self.fdopen_helper()
204 self.fdopen_helper('r')
205 self.fdopen_helper('r', 100)
206
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100207 def test_replace(self):
208 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 self.addCleanup(support.unlink, support.TESTFN)
210 self.addCleanup(support.unlink, TESTFN2)
211
212 create_file(support.TESTFN, b"1")
213 create_file(TESTFN2, b"2")
214
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100215 os.replace(support.TESTFN, TESTFN2)
216 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
217 with open(TESTFN2, 'r') as f:
218 self.assertEqual(f.read(), "1")
219
Martin Panterbf19d162015-09-09 01:01:13 +0000220 def test_open_keywords(self):
221 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
222 dir_fd=None)
223 os.close(f)
224
225 def test_symlink_keywords(self):
226 symlink = support.get_attribute(os, "symlink")
227 try:
228 symlink(src='target', dst=support.TESTFN,
229 target_is_directory=False, dir_fd=None)
230 except (NotImplementedError, OSError):
231 pass # No OS support or unprivileged user
232
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200233
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234# Test attributes on return values from os.*stat* family.
235class StatAttributeTests(unittest.TestCase):
236 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200237 self.fname = support.TESTFN
238 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100239 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240
Serhiy Storchaka43767632013-11-03 21:31:38 +0200241 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000242 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000243 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244
245 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(result[stat.ST_SIZE], 3)
247 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 # Make sure all the attributes are there
250 members = dir(result)
251 for name in dir(stat):
252 if name[:3] == 'ST_':
253 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000254 if name.endswith("TIME"):
255 def trunc(x): return int(x)
256 else:
257 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700263 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700264 for name in 'st_atime st_mtime st_ctime'.split():
265 floaty = int(getattr(result, name) * 100000)
266 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700267 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700268
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 try:
270 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 except IndexError:
273 pass
274
275 # Make sure that assignment fails
276 try:
277 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200278 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000279 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000280 pass
281
282 try:
283 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200284 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000285 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 pass
287
288 try:
289 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except AttributeError:
292 pass
293
294 # Use the stat_result constructor with a too-short tuple.
295 try:
296 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200297 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000298 except TypeError:
299 pass
300
Ezio Melotti42da6632011-03-15 05:18:48 +0200301 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000302 try:
303 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
304 except TypeError:
305 pass
306
Antoine Pitrou38425292010-09-21 18:19:07 +0000307 def test_stat_attributes(self):
308 self.check_stat_attributes(self.fname)
309
310 def test_stat_attributes_bytes(self):
311 try:
312 fname = self.fname.encode(sys.getfilesystemencoding())
313 except UnicodeEncodeError:
314 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700329 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100340 self.assertTrue(isinstance(result.f_fsid, int))
341
342 # Test that the size of the tuple doesn't change
343 self.assertEqual(len(result), 10)
344
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700374 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Berker Peksag0b4dc482016-09-17 15:49:59 +0300434 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
435 def test_access_denied(self):
436 # Default to FindFirstFile WIN32_FIND_DATA when access is
437 # denied. See issue 28075.
438 # os.environ['TEMP'] should be located on a volume that
439 # supports file ACLs.
440 fname = os.path.join(os.environ['TEMP'], self.fname)
441 self.addCleanup(support.unlink, fname)
442 create_file(fname, b'ABC')
443 # Deny the right to [S]YNCHRONIZE on the file to
444 # force CreateFile to fail with ERROR_ACCESS_DENIED.
445 DETACHED_PROCESS = 8
446 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500447 # bpo-30584: Use security identifier *S-1-5-32-545 instead
448 # of localized "Users" to not depend on the locale.
449 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300450 creationflags=DETACHED_PROCESS
451 )
452 result = os.stat(fname)
453 self.assertNotEqual(result.st_size, 0)
454
Victor Stinner47aacc82015-06-12 17:26:23 +0200455
456class UtimeTests(unittest.TestCase):
457 def setUp(self):
458 self.dirname = support.TESTFN
459 self.fname = os.path.join(self.dirname, "f1")
460
461 self.addCleanup(support.rmtree, self.dirname)
462 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100463 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200464
Victor Stinner47aacc82015-06-12 17:26:23 +0200465 def support_subsecond(self, filename):
466 # Heuristic to check if the filesystem supports timestamp with
467 # subsecond resolution: check if float and int timestamps are different
468 st = os.stat(filename)
469 return ((st.st_atime != st[7])
470 or (st.st_mtime != st[8])
471 or (st.st_ctime != st[9]))
472
473 def _test_utime(self, set_time, filename=None):
474 if not filename:
475 filename = self.fname
476
477 support_subsecond = self.support_subsecond(filename)
478 if support_subsecond:
479 # Timestamp with a resolution of 1 microsecond (10^-6).
480 #
481 # The resolution of the C internal function used by os.utime()
482 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
483 # test with a resolution of 1 ns requires more work:
484 # see the issue #15745.
485 atime_ns = 1002003000 # 1.002003 seconds
486 mtime_ns = 4005006000 # 4.005006 seconds
487 else:
488 # use a resolution of 1 second
489 atime_ns = 5 * 10**9
490 mtime_ns = 8 * 10**9
491
492 set_time(filename, (atime_ns, mtime_ns))
493 st = os.stat(filename)
494
495 if support_subsecond:
496 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
497 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
498 else:
499 self.assertEqual(st.st_atime, atime_ns * 1e-9)
500 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
501 self.assertEqual(st.st_atime_ns, atime_ns)
502 self.assertEqual(st.st_mtime_ns, mtime_ns)
503
504 def test_utime(self):
505 def set_time(filename, ns):
506 # test the ns keyword parameter
507 os.utime(filename, ns=ns)
508 self._test_utime(set_time)
509
510 @staticmethod
511 def ns_to_sec(ns):
512 # Convert a number of nanosecond (int) to a number of seconds (float).
513 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
514 # issue, os.utime() rounds towards minus infinity.
515 return (ns * 1e-9) + 0.5e-9
516
517 def test_utime_by_indexed(self):
518 # pass times as floating point seconds as the second indexed parameter
519 def set_time(filename, ns):
520 atime_ns, mtime_ns = ns
521 atime = self.ns_to_sec(atime_ns)
522 mtime = self.ns_to_sec(mtime_ns)
523 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
524 # or utime(time_t)
525 os.utime(filename, (atime, mtime))
526 self._test_utime(set_time)
527
528 def test_utime_by_times(self):
529 def set_time(filename, ns):
530 atime_ns, mtime_ns = ns
531 atime = self.ns_to_sec(atime_ns)
532 mtime = self.ns_to_sec(mtime_ns)
533 # test the times keyword parameter
534 os.utime(filename, times=(atime, mtime))
535 self._test_utime(set_time)
536
537 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
538 "follow_symlinks support for utime required "
539 "for this test.")
540 def test_utime_nofollow_symlinks(self):
541 def set_time(filename, ns):
542 # use follow_symlinks=False to test utimensat(timespec)
543 # or lutimes(timeval)
544 os.utime(filename, ns=ns, follow_symlinks=False)
545 self._test_utime(set_time)
546
547 @unittest.skipUnless(os.utime in os.supports_fd,
548 "fd support for utime required for this test.")
549 def test_utime_fd(self):
550 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100551 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200552 # use a file descriptor to test futimens(timespec)
553 # or futimes(timeval)
554 os.utime(fp.fileno(), ns=ns)
555 self._test_utime(set_time)
556
557 @unittest.skipUnless(os.utime in os.supports_dir_fd,
558 "dir_fd support for utime required for this test.")
559 def test_utime_dir_fd(self):
560 def set_time(filename, ns):
561 dirname, name = os.path.split(filename)
562 dirfd = os.open(dirname, os.O_RDONLY)
563 try:
564 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
565 os.utime(name, dir_fd=dirfd, ns=ns)
566 finally:
567 os.close(dirfd)
568 self._test_utime(set_time)
569
570 def test_utime_directory(self):
571 def set_time(filename, ns):
572 # test calling os.utime() on a directory
573 os.utime(filename, ns=ns)
574 self._test_utime(set_time, filename=self.dirname)
575
576 def _test_utime_current(self, set_time):
577 # Get the system clock
578 current = time.time()
579
580 # Call os.utime() to set the timestamp to the current system clock
581 set_time(self.fname)
582
583 if not self.support_subsecond(self.fname):
584 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700585 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200586 # On Windows, the usual resolution of time.time() is 15.6 ms.
587 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700588 #
589 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
590 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200591 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200592 st = os.stat(self.fname)
593 msg = ("st_time=%r, current=%r, dt=%r"
594 % (st.st_mtime, current, st.st_mtime - current))
595 self.assertAlmostEqual(st.st_mtime, current,
596 delta=delta, msg=msg)
597
598 def test_utime_current(self):
599 def set_time(filename):
600 # Set to the current time in the new way
601 os.utime(self.fname)
602 self._test_utime_current(set_time)
603
604 def test_utime_current_old(self):
605 def set_time(filename):
606 # Set to the current time in the old explicit way.
607 os.utime(self.fname, None)
608 self._test_utime_current(set_time)
609
610 def get_file_system(self, path):
611 if sys.platform == 'win32':
612 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
613 import ctypes
614 kernel32 = ctypes.windll.kernel32
615 buf = ctypes.create_unicode_buffer("", 100)
616 ok = kernel32.GetVolumeInformationW(root, None, 0,
617 None, None, None,
618 buf, len(buf))
619 if ok:
620 return buf.value
621 # return None if the filesystem is unknown
622
623 def test_large_time(self):
624 # Many filesystems are limited to the year 2038. At least, the test
625 # pass with NTFS filesystem.
626 if self.get_file_system(self.dirname) != "NTFS":
627 self.skipTest("requires NTFS")
628
629 large = 5000000000 # some day in 2128
630 os.utime(self.fname, (large, large))
631 self.assertEqual(os.stat(self.fname).st_mtime, large)
632
633 def test_utime_invalid_arguments(self):
634 # seconds and nanoseconds parameters are mutually exclusive
635 with self.assertRaises(ValueError):
636 os.utime(self.fname, (5, 5), ns=(5, 5))
637
638
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000639from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000640
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000641class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000642 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000643 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000644
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000645 def setUp(self):
646 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000647 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000648 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000649 for key, value in self._reference().items():
650 os.environ[key] = value
651
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000652 def tearDown(self):
653 os.environ.clear()
654 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000655 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000656 os.environb.clear()
657 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000658
Christian Heimes90333392007-11-01 19:08:42 +0000659 def _reference(self):
660 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
661
662 def _empty_mapping(self):
663 os.environ.clear()
664 return os.environ
665
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000666 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200667 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
668 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000669 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000670 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300671 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200672 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300673 value = popen.read().strip()
674 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000675
Xavier de Gayed1415312016-07-22 12:15:29 +0200676 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
677 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000678 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200679 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
680 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300681 it = iter(popen)
682 self.assertEqual(next(it), "line1\n")
683 self.assertEqual(next(it), "line2\n")
684 self.assertEqual(next(it), "line3\n")
685 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000686
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000687 # Verify environ keys and values from the OS are of the
688 # correct str type.
689 def test_keyvalue_types(self):
690 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000691 self.assertEqual(type(key), str)
692 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000693
Christian Heimes90333392007-11-01 19:08:42 +0000694 def test_items(self):
695 for key, value in self._reference().items():
696 self.assertEqual(os.environ.get(key), value)
697
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000698 # Issue 7310
699 def test___repr__(self):
700 """Check that the repr() of os.environ looks like environ({...})."""
701 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000702 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
703 '{!r}: {!r}'.format(key, value)
704 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000705
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000706 def test_get_exec_path(self):
707 defpath_list = os.defpath.split(os.pathsep)
708 test_path = ['/monty', '/python', '', '/flying/circus']
709 test_env = {'PATH': os.pathsep.join(test_path)}
710
711 saved_environ = os.environ
712 try:
713 os.environ = dict(test_env)
714 # Test that defaulting to os.environ works.
715 self.assertSequenceEqual(test_path, os.get_exec_path())
716 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
717 finally:
718 os.environ = saved_environ
719
720 # No PATH environment variable
721 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
722 # Empty PATH environment variable
723 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
724 # Supplied PATH environment variable
725 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
726
Victor Stinnerb745a742010-05-18 17:17:23 +0000727 if os.supports_bytes_environ:
728 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000729 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000730 # ignore BytesWarning warning
731 with warnings.catch_warnings(record=True):
732 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000733 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000734 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000735 pass
736 else:
737 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000738
739 # bytes key and/or value
740 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
741 ['abc'])
742 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
743 ['abc'])
744 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
745 ['abc'])
746
747 @unittest.skipUnless(os.supports_bytes_environ,
748 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000749 def test_environb(self):
750 # os.environ -> os.environb
751 value = 'euro\u20ac'
752 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000753 value_bytes = value.encode(sys.getfilesystemencoding(),
754 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000755 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000756 msg = "U+20AC character is not encodable to %s" % (
757 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000758 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000759 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000760 self.assertEqual(os.environ['unicode'], value)
761 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000762
763 # os.environb -> os.environ
764 value = b'\xff'
765 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000766 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000767 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000768 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000769
Victor Stinner13ff2452018-01-22 18:32:50 +0100770 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100771 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100772 def test_unset_error(self):
773 if sys.platform == "win32":
774 # an environment variable is limited to 32,767 characters
775 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100776 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100777 else:
778 # "=" is not allowed in a variable name
779 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100780 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100781
Victor Stinner6d101392013-04-14 16:35:04 +0200782 def test_key_type(self):
783 missing = 'missingkey'
784 self.assertNotIn(missing, os.environ)
785
Victor Stinner839e5ea2013-04-14 16:43:03 +0200786 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200787 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200788 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200789 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200790
Victor Stinner839e5ea2013-04-14 16:43:03 +0200791 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200792 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200793 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200794 self.assertTrue(cm.exception.__suppress_context__)
795
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300796 def _test_environ_iteration(self, collection):
797 iterator = iter(collection)
798 new_key = "__new_key__"
799
800 next(iterator) # start iteration over os.environ.items
801
802 # add a new key in os.environ mapping
803 os.environ[new_key] = "test_environ_iteration"
804
805 try:
806 next(iterator) # force iteration over modified mapping
807 self.assertEqual(os.environ[new_key], "test_environ_iteration")
808 finally:
809 del os.environ[new_key]
810
811 def test_iter_error_when_changing_os_environ(self):
812 self._test_environ_iteration(os.environ)
813
814 def test_iter_error_when_changing_os_environ_items(self):
815 self._test_environ_iteration(os.environ.items())
816
817 def test_iter_error_when_changing_os_environ_values(self):
818 self._test_environ_iteration(os.environ.values())
819
Victor Stinner6d101392013-04-14 16:35:04 +0200820
Tim Petersc4e09402003-04-25 07:11:48 +0000821class WalkTests(unittest.TestCase):
822 """Tests for os.walk()."""
823
Victor Stinner0561c532015-03-12 10:28:24 +0100824 # Wrapper to hide minor differences between os.walk and os.fwalk
825 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200826 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200827 if 'follow_symlinks' in kwargs:
828 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200829 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100830
Charles-François Natali7372b062012-02-05 15:15:38 +0100831 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100832 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100833 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000834
835 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000836 # TESTFN/
837 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000838 # tmp1
839 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000840 # tmp2
841 # SUB11/ no kids
842 # SUB2/ a file kid and a dirsymlink kid
843 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300844 # SUB21/ not readable
845 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000846 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200847 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300848 # broken_link2
849 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000850 # TEST2/
851 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100852 self.walk_path = join(support.TESTFN, "TEST1")
853 self.sub1_path = join(self.walk_path, "SUB1")
854 self.sub11_path = join(self.sub1_path, "SUB11")
855 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300856 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100857 tmp1_path = join(self.walk_path, "tmp1")
858 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000859 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300860 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100861 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000862 t2_path = join(support.TESTFN, "TEST2")
863 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200864 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300865 broken_link2_path = join(sub2_path, "broken_link2")
866 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000867
868 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100869 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000870 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300871 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000872 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100873
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300874 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100875 with open(path, "x") as f:
876 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000877
Victor Stinner0561c532015-03-12 10:28:24 +0100878 if support.can_symlink():
879 os.symlink(os.path.abspath(t2_path), self.link_path)
880 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300881 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
882 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300883 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300884 ["broken_link", "broken_link2", "broken_link3",
885 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100886 else:
887 self.sub2_tree = (sub2_path, [], ["tmp3"])
888
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300889 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300890 try:
891 os.listdir(sub21_path)
892 except PermissionError:
893 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
894 else:
895 os.chmod(sub21_path, stat.S_IRWXU)
896 os.unlink(tmp5_path)
897 os.rmdir(sub21_path)
898 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300899
Victor Stinner0561c532015-03-12 10:28:24 +0100900 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000901 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300902 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100903
Tim Petersc4e09402003-04-25 07:11:48 +0000904 self.assertEqual(len(all), 4)
905 # We can't know which order SUB1 and SUB2 will appear in.
906 # Not flipped: TESTFN, SUB1, SUB11, SUB2
907 # flipped: TESTFN, SUB2, SUB1, SUB11
908 flipped = all[0][1][0] != "SUB1"
909 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200910 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300911 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100912 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
913 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
914 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
915 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000916
Brett Cannon3f9183b2016-08-26 14:44:48 -0700917 def test_walk_prune(self, walk_path=None):
918 if walk_path is None:
919 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000920 # Prune the search.
921 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700922 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000923 all.append((root, dirs, files))
924 # Don't descend into SUB1.
925 if 'SUB1' in dirs:
926 # Note that this also mutates the dirs we appended to all!
927 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000928
Victor Stinner0561c532015-03-12 10:28:24 +0100929 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200930 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100931
932 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300933 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100934 self.assertEqual(all[1], self.sub2_tree)
935
Brett Cannon3f9183b2016-08-26 14:44:48 -0700936 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200937 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700938
Victor Stinner0561c532015-03-12 10:28:24 +0100939 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000940 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100941 all = list(self.walk(self.walk_path, topdown=False))
942
Victor Stinner53b0a412016-03-26 01:12:36 +0100943 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000944 # We can't know which order SUB1 and SUB2 will appear in.
945 # Not flipped: SUB11, SUB1, SUB2, TESTFN
946 # flipped: SUB2, SUB11, SUB1, TESTFN
947 flipped = all[3][1][0] != "SUB1"
948 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200949 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300950 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100951 self.assertEqual(all[3],
952 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
953 self.assertEqual(all[flipped],
954 (self.sub11_path, [], []))
955 self.assertEqual(all[flipped + 1],
956 (self.sub1_path, ["SUB11"], ["tmp2"]))
957 self.assertEqual(all[2 - 2 * flipped],
958 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000959
Victor Stinner0561c532015-03-12 10:28:24 +0100960 def test_walk_symlink(self):
961 if not support.can_symlink():
962 self.skipTest("need symlink support")
963
964 # Walk, following symlinks.
965 walk_it = self.walk(self.walk_path, follow_symlinks=True)
966 for root, dirs, files in walk_it:
967 if root == self.link_path:
968 self.assertEqual(dirs, [])
969 self.assertEqual(files, ["tmp4"])
970 break
971 else:
972 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000973
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200974 def test_walk_bad_dir(self):
975 # Walk top-down.
976 errors = []
977 walk_it = self.walk(self.walk_path, onerror=errors.append)
978 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300979 self.assertEqual(errors, [])
980 dir1 = 'SUB1'
981 path1 = os.path.join(root, dir1)
982 path1new = os.path.join(root, dir1 + '.new')
983 os.rename(path1, path1new)
984 try:
985 roots = [r for r, d, f in walk_it]
986 self.assertTrue(errors)
987 self.assertNotIn(path1, roots)
988 self.assertNotIn(path1new, roots)
989 for dir2 in dirs:
990 if dir2 != dir1:
991 self.assertIn(os.path.join(root, dir2), roots)
992 finally:
993 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200994
Charles-François Natali7372b062012-02-05 15:15:38 +0100995
996@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
997class FwalkTests(WalkTests):
998 """Tests for os.fwalk()."""
999
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001000 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001001 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001002 yield (root, dirs, files)
1003
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001004 def fwalk(self, *args, **kwargs):
1005 return os.fwalk(*args, **kwargs)
1006
Larry Hastingsc48fe982012-06-25 04:49:05 -07001007 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1008 """
1009 compare with walk() results.
1010 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001011 walk_kwargs = walk_kwargs.copy()
1012 fwalk_kwargs = fwalk_kwargs.copy()
1013 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1014 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1015 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001016
Charles-François Natali7372b062012-02-05 15:15:38 +01001017 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001018 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001019 expected[root] = (set(dirs), set(files))
1020
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001021 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001022 self.assertIn(root, expected)
1023 self.assertEqual(expected[root], (set(dirs), set(files)))
1024
Larry Hastingsc48fe982012-06-25 04:49:05 -07001025 def test_compare_to_walk(self):
1026 kwargs = {'top': support.TESTFN}
1027 self._compare_to_walk(kwargs, kwargs)
1028
Charles-François Natali7372b062012-02-05 15:15:38 +01001029 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001030 try:
1031 fd = os.open(".", os.O_RDONLY)
1032 walk_kwargs = {'top': support.TESTFN}
1033 fwalk_kwargs = walk_kwargs.copy()
1034 fwalk_kwargs['dir_fd'] = fd
1035 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1036 finally:
1037 os.close(fd)
1038
1039 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001040 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001041 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1042 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001043 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001044 # check that the FD is valid
1045 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001046 # redundant check
1047 os.stat(rootfd)
1048 # check that listdir() returns consistent information
1049 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001050
1051 def test_fd_leak(self):
1052 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1053 # we both check that calling fwalk() a large number of times doesn't
1054 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1055 minfd = os.dup(1)
1056 os.close(minfd)
1057 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001058 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001059 pass
1060 newfd = os.dup(1)
1061 self.addCleanup(os.close, newfd)
1062 self.assertEqual(newfd, minfd)
1063
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001064class BytesWalkTests(WalkTests):
1065 """Tests for os.walk() with bytes."""
1066 def walk(self, top, **kwargs):
1067 if 'follow_symlinks' in kwargs:
1068 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1069 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1070 root = os.fsdecode(broot)
1071 dirs = list(map(os.fsdecode, bdirs))
1072 files = list(map(os.fsdecode, bfiles))
1073 yield (root, dirs, files)
1074 bdirs[:] = list(map(os.fsencode, dirs))
1075 bfiles[:] = list(map(os.fsencode, files))
1076
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001077@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1078class BytesFwalkTests(FwalkTests):
1079 """Tests for os.walk() with bytes."""
1080 def fwalk(self, top='.', *args, **kwargs):
1081 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1082 root = os.fsdecode(broot)
1083 dirs = list(map(os.fsdecode, bdirs))
1084 files = list(map(os.fsdecode, bfiles))
1085 yield (root, dirs, files, topfd)
1086 bdirs[:] = list(map(os.fsencode, dirs))
1087 bfiles[:] = list(map(os.fsencode, files))
1088
Charles-François Natali7372b062012-02-05 15:15:38 +01001089
Guido van Rossume7ba4952007-06-06 23:52:48 +00001090class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001091 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001092 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001093
1094 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001095 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001096 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1097 os.makedirs(path) # Should work
1098 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1099 os.makedirs(path)
1100
1101 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001102 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001103 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1104 os.makedirs(path)
1105 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1106 'dir5', 'dir6')
1107 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001108
Serhiy Storchakae304e332017-03-24 13:27:42 +02001109 def test_mode(self):
1110 with support.temp_umask(0o002):
1111 base = support.TESTFN
1112 parent = os.path.join(base, 'dir1')
1113 path = os.path.join(parent, 'dir2')
1114 os.makedirs(path, 0o555)
1115 self.assertTrue(os.path.exists(path))
1116 self.assertTrue(os.path.isdir(path))
1117 if os.name != 'nt':
1118 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1119 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1120
Terry Reedy5a22b652010-12-02 07:05:56 +00001121 def test_exist_ok_existing_directory(self):
1122 path = os.path.join(support.TESTFN, 'dir1')
1123 mode = 0o777
1124 old_mask = os.umask(0o022)
1125 os.makedirs(path, mode)
1126 self.assertRaises(OSError, os.makedirs, path, mode)
1127 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001128 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001129 os.makedirs(path, mode=mode, exist_ok=True)
1130 os.umask(old_mask)
1131
Martin Pantera82642f2015-11-19 04:48:44 +00001132 # Issue #25583: A drive root could raise PermissionError on Windows
1133 os.makedirs(os.path.abspath('/'), exist_ok=True)
1134
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001135 def test_exist_ok_s_isgid_directory(self):
1136 path = os.path.join(support.TESTFN, 'dir1')
1137 S_ISGID = stat.S_ISGID
1138 mode = 0o777
1139 old_mask = os.umask(0o022)
1140 try:
1141 existing_testfn_mode = stat.S_IMODE(
1142 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001143 try:
1144 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001145 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001146 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001147 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1148 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1149 # The os should apply S_ISGID from the parent dir for us, but
1150 # this test need not depend on that behavior. Be explicit.
1151 os.makedirs(path, mode | S_ISGID)
1152 # http://bugs.python.org/issue14992
1153 # Should not fail when the bit is already set.
1154 os.makedirs(path, mode, exist_ok=True)
1155 # remove the bit.
1156 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001157 # May work even when the bit is not already set when demanded.
1158 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001159 finally:
1160 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001161
1162 def test_exist_ok_existing_regular_file(self):
1163 base = support.TESTFN
1164 path = os.path.join(support.TESTFN, 'dir1')
1165 f = open(path, 'w')
1166 f.write('abc')
1167 f.close()
1168 self.assertRaises(OSError, os.makedirs, path)
1169 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1170 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1171 os.remove(path)
1172
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001173 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001174 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001175 'dir4', 'dir5', 'dir6')
1176 # If the tests failed, the bottom-most directory ('../dir6')
1177 # may not have been created, so we look for the outermost directory
1178 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001179 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001180 path = os.path.dirname(path)
1181
1182 os.removedirs(path)
1183
Andrew Svetlov405faed2012-12-25 12:18:09 +02001184
R David Murrayf2ad1732014-12-25 18:36:56 -05001185@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1186class ChownFileTests(unittest.TestCase):
1187
Berker Peksag036a71b2015-07-21 09:29:48 +03001188 @classmethod
1189 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001190 os.mkdir(support.TESTFN)
1191
1192 def test_chown_uid_gid_arguments_must_be_index(self):
1193 stat = os.stat(support.TESTFN)
1194 uid = stat.st_uid
1195 gid = stat.st_gid
1196 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1197 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1198 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1199 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1200 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1201
1202 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1203 def test_chown(self):
1204 gid_1, gid_2 = groups[:2]
1205 uid = os.stat(support.TESTFN).st_uid
1206 os.chown(support.TESTFN, uid, gid_1)
1207 gid = os.stat(support.TESTFN).st_gid
1208 self.assertEqual(gid, gid_1)
1209 os.chown(support.TESTFN, uid, gid_2)
1210 gid = os.stat(support.TESTFN).st_gid
1211 self.assertEqual(gid, gid_2)
1212
1213 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1214 "test needs root privilege and more than one user")
1215 def test_chown_with_root(self):
1216 uid_1, uid_2 = all_users[:2]
1217 gid = os.stat(support.TESTFN).st_gid
1218 os.chown(support.TESTFN, uid_1, gid)
1219 uid = os.stat(support.TESTFN).st_uid
1220 self.assertEqual(uid, uid_1)
1221 os.chown(support.TESTFN, uid_2, gid)
1222 uid = os.stat(support.TESTFN).st_uid
1223 self.assertEqual(uid, uid_2)
1224
1225 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1226 "test needs non-root account and more than one user")
1227 def test_chown_without_permission(self):
1228 uid_1, uid_2 = all_users[:2]
1229 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001230 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001231 os.chown(support.TESTFN, uid_1, gid)
1232 os.chown(support.TESTFN, uid_2, gid)
1233
Berker Peksag036a71b2015-07-21 09:29:48 +03001234 @classmethod
1235 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001236 os.rmdir(support.TESTFN)
1237
1238
Andrew Svetlov405faed2012-12-25 12:18:09 +02001239class RemoveDirsTests(unittest.TestCase):
1240 def setUp(self):
1241 os.makedirs(support.TESTFN)
1242
1243 def tearDown(self):
1244 support.rmtree(support.TESTFN)
1245
1246 def test_remove_all(self):
1247 dira = os.path.join(support.TESTFN, 'dira')
1248 os.mkdir(dira)
1249 dirb = os.path.join(dira, 'dirb')
1250 os.mkdir(dirb)
1251 os.removedirs(dirb)
1252 self.assertFalse(os.path.exists(dirb))
1253 self.assertFalse(os.path.exists(dira))
1254 self.assertFalse(os.path.exists(support.TESTFN))
1255
1256 def test_remove_partial(self):
1257 dira = os.path.join(support.TESTFN, 'dira')
1258 os.mkdir(dira)
1259 dirb = os.path.join(dira, 'dirb')
1260 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001261 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001262 os.removedirs(dirb)
1263 self.assertFalse(os.path.exists(dirb))
1264 self.assertTrue(os.path.exists(dira))
1265 self.assertTrue(os.path.exists(support.TESTFN))
1266
1267 def test_remove_nothing(self):
1268 dira = os.path.join(support.TESTFN, 'dira')
1269 os.mkdir(dira)
1270 dirb = os.path.join(dira, 'dirb')
1271 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001272 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001273 with self.assertRaises(OSError):
1274 os.removedirs(dirb)
1275 self.assertTrue(os.path.exists(dirb))
1276 self.assertTrue(os.path.exists(dira))
1277 self.assertTrue(os.path.exists(support.TESTFN))
1278
1279
Guido van Rossume7ba4952007-06-06 23:52:48 +00001280class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001281 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001282 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001283 f.write(b'hello')
1284 f.close()
1285 with open(os.devnull, 'rb') as f:
1286 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001287
Andrew Svetlov405faed2012-12-25 12:18:09 +02001288
Guido van Rossume7ba4952007-06-06 23:52:48 +00001289class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001290 def test_urandom_length(self):
1291 self.assertEqual(len(os.urandom(0)), 0)
1292 self.assertEqual(len(os.urandom(1)), 1)
1293 self.assertEqual(len(os.urandom(10)), 10)
1294 self.assertEqual(len(os.urandom(100)), 100)
1295 self.assertEqual(len(os.urandom(1000)), 1000)
1296
1297 def test_urandom_value(self):
1298 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001299 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001300 data2 = os.urandom(16)
1301 self.assertNotEqual(data1, data2)
1302
1303 def get_urandom_subprocess(self, count):
1304 code = '\n'.join((
1305 'import os, sys',
1306 'data = os.urandom(%s)' % count,
1307 'sys.stdout.buffer.write(data)',
1308 'sys.stdout.buffer.flush()'))
1309 out = assert_python_ok('-c', code)
1310 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001311 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001312 return stdout
1313
1314 def test_urandom_subprocess(self):
1315 data1 = self.get_urandom_subprocess(16)
1316 data2 = self.get_urandom_subprocess(16)
1317 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001318
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001319
Victor Stinner9b1f4742016-09-06 16:18:52 -07001320@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1321class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001322 @classmethod
1323 def setUpClass(cls):
1324 try:
1325 os.getrandom(1)
1326 except OSError as exc:
1327 if exc.errno == errno.ENOSYS:
1328 # Python compiled on a more recent Linux version
1329 # than the current Linux kernel
1330 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1331 else:
1332 raise
1333
Victor Stinner9b1f4742016-09-06 16:18:52 -07001334 def test_getrandom_type(self):
1335 data = os.getrandom(16)
1336 self.assertIsInstance(data, bytes)
1337 self.assertEqual(len(data), 16)
1338
1339 def test_getrandom0(self):
1340 empty = os.getrandom(0)
1341 self.assertEqual(empty, b'')
1342
1343 def test_getrandom_random(self):
1344 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1345
1346 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1347 # resource /dev/random
1348
1349 def test_getrandom_nonblock(self):
1350 # The call must not fail. Check also that the flag exists
1351 try:
1352 os.getrandom(1, os.GRND_NONBLOCK)
1353 except BlockingIOError:
1354 # System urandom is not initialized yet
1355 pass
1356
1357 def test_getrandom_value(self):
1358 data1 = os.getrandom(16)
1359 data2 = os.getrandom(16)
1360 self.assertNotEqual(data1, data2)
1361
1362
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001363# os.urandom() doesn't use a file descriptor when it is implemented with the
1364# getentropy() function, the getrandom() function or the getrandom() syscall
1365OS_URANDOM_DONT_USE_FD = (
1366 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1367 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1368 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001369
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001370@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1371 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001372class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001373 @unittest.skipUnless(resource, "test requires the resource module")
1374 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001375 # Check urandom() failing when it is not able to open /dev/random.
1376 # We spawn a new process to make the test more robust (if getrlimit()
1377 # failed to restore the file descriptor limit after this, the whole
1378 # test suite would crash; this actually happened on the OS X Tiger
1379 # buildbot).
1380 code = """if 1:
1381 import errno
1382 import os
1383 import resource
1384
1385 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1386 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1387 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001388 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001389 except OSError as e:
1390 assert e.errno == errno.EMFILE, e.errno
1391 else:
1392 raise AssertionError("OSError not raised")
1393 """
1394 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001395
Antoine Pitroue472aea2014-04-26 14:33:03 +02001396 def test_urandom_fd_closed(self):
1397 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1398 # closed.
1399 code = """if 1:
1400 import os
1401 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001402 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001403 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001404 with test.support.SuppressCrashReport():
1405 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001406 sys.stdout.buffer.write(os.urandom(4))
1407 """
1408 rc, out, err = assert_python_ok('-Sc', code)
1409
1410 def test_urandom_fd_reopened(self):
1411 # Issue #21207: urandom() should detect its fd to /dev/urandom
1412 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001413 self.addCleanup(support.unlink, support.TESTFN)
1414 create_file(support.TESTFN, b"x" * 256)
1415
Antoine Pitroue472aea2014-04-26 14:33:03 +02001416 code = """if 1:
1417 import os
1418 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001419 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001420 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001421 with test.support.SuppressCrashReport():
1422 for fd in range(3, 256):
1423 try:
1424 os.close(fd)
1425 except OSError:
1426 pass
1427 else:
1428 # Found the urandom fd (XXX hopefully)
1429 break
1430 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001431 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001432 new_fd = f.fileno()
1433 # Issue #26935: posix allows new_fd and fd to be equal but
1434 # some libc implementations have dup2 return an error in this
1435 # case.
1436 if new_fd != fd:
1437 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001438 sys.stdout.buffer.write(os.urandom(4))
1439 sys.stdout.buffer.write(os.urandom(4))
1440 """.format(TESTFN=support.TESTFN)
1441 rc, out, err = assert_python_ok('-Sc', code)
1442 self.assertEqual(len(out), 8)
1443 self.assertNotEqual(out[0:4], out[4:8])
1444 rc, out2, err2 = assert_python_ok('-Sc', code)
1445 self.assertEqual(len(out2), 8)
1446 self.assertNotEqual(out2, out)
1447
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001448
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001449@contextlib.contextmanager
1450def _execvpe_mockup(defpath=None):
1451 """
1452 Stubs out execv and execve functions when used as context manager.
1453 Records exec calls. The mock execv and execve functions always raise an
1454 exception as they would normally never return.
1455 """
1456 # A list of tuples containing (function name, first arg, args)
1457 # of calls to execv or execve that have been made.
1458 calls = []
1459
1460 def mock_execv(name, *args):
1461 calls.append(('execv', name, args))
1462 raise RuntimeError("execv called")
1463
1464 def mock_execve(name, *args):
1465 calls.append(('execve', name, args))
1466 raise OSError(errno.ENOTDIR, "execve called")
1467
1468 try:
1469 orig_execv = os.execv
1470 orig_execve = os.execve
1471 orig_defpath = os.defpath
1472 os.execv = mock_execv
1473 os.execve = mock_execve
1474 if defpath is not None:
1475 os.defpath = defpath
1476 yield calls
1477 finally:
1478 os.execv = orig_execv
1479 os.execve = orig_execve
1480 os.defpath = orig_defpath
1481
Victor Stinner4659ccf2016-09-14 10:57:00 +02001482
Guido van Rossume7ba4952007-06-06 23:52:48 +00001483class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001484 @unittest.skipIf(USING_LINUXTHREADS,
1485 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001486 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001487 self.assertRaises(OSError, os.execvpe, 'no such app-',
1488 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001489
Steve Dowerbce26262016-11-19 19:17:26 -08001490 def test_execv_with_bad_arglist(self):
1491 self.assertRaises(ValueError, os.execv, 'notepad', ())
1492 self.assertRaises(ValueError, os.execv, 'notepad', [])
1493 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1494 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1495
Thomas Heller6790d602007-08-30 17:15:14 +00001496 def test_execvpe_with_bad_arglist(self):
1497 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001498 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1499 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001500
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001501 @unittest.skipUnless(hasattr(os, '_execvpe'),
1502 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001503 def _test_internal_execvpe(self, test_type):
1504 program_path = os.sep + 'absolutepath'
1505 if test_type is bytes:
1506 program = b'executable'
1507 fullpath = os.path.join(os.fsencode(program_path), program)
1508 native_fullpath = fullpath
1509 arguments = [b'progname', 'arg1', 'arg2']
1510 else:
1511 program = 'executable'
1512 arguments = ['progname', 'arg1', 'arg2']
1513 fullpath = os.path.join(program_path, program)
1514 if os.name != "nt":
1515 native_fullpath = os.fsencode(fullpath)
1516 else:
1517 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001518 env = {'spam': 'beans'}
1519
Victor Stinnerb745a742010-05-18 17:17:23 +00001520 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001521 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001522 self.assertRaises(RuntimeError,
1523 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001524 self.assertEqual(len(calls), 1)
1525 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1526
Victor Stinnerb745a742010-05-18 17:17:23 +00001527 # test os._execvpe() with a relative path:
1528 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001529 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001530 self.assertRaises(OSError,
1531 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001532 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001533 self.assertSequenceEqual(calls[0],
1534 ('execve', native_fullpath, (arguments, env)))
1535
1536 # test os._execvpe() with a relative path:
1537 # os.get_exec_path() reads the 'PATH' variable
1538 with _execvpe_mockup() as calls:
1539 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001540 if test_type is bytes:
1541 env_path[b'PATH'] = program_path
1542 else:
1543 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001544 self.assertRaises(OSError,
1545 os._execvpe, program, arguments, env=env_path)
1546 self.assertEqual(len(calls), 1)
1547 self.assertSequenceEqual(calls[0],
1548 ('execve', native_fullpath, (arguments, env_path)))
1549
1550 def test_internal_execvpe_str(self):
1551 self._test_internal_execvpe(str)
1552 if os.name != "nt":
1553 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001554
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001555 def test_execve_invalid_env(self):
1556 args = [sys.executable, '-c', 'pass']
1557
Ville Skyttä49b27342017-08-03 09:00:59 +03001558 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001559 newenv = os.environ.copy()
1560 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1561 with self.assertRaises(ValueError):
1562 os.execve(args[0], args, newenv)
1563
Ville Skyttä49b27342017-08-03 09:00:59 +03001564 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001565 newenv = os.environ.copy()
1566 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1567 with self.assertRaises(ValueError):
1568 os.execve(args[0], args, newenv)
1569
Ville Skyttä49b27342017-08-03 09:00:59 +03001570 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001571 newenv = os.environ.copy()
1572 newenv["FRUIT=ORANGE"] = "lemon"
1573 with self.assertRaises(ValueError):
1574 os.execve(args[0], args, newenv)
1575
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001576
Serhiy Storchaka43767632013-11-03 21:31:38 +02001577@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001578class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001579 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001580 try:
1581 os.stat(support.TESTFN)
1582 except FileNotFoundError:
1583 exists = False
1584 except OSError as exc:
1585 exists = True
1586 self.fail("file %s must not exist; os.stat failed with %s"
1587 % (support.TESTFN, exc))
1588 else:
1589 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001590
Thomas Wouters477c8d52006-05-27 19:21:47 +00001591 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001592 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001593
1594 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001595 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001596
1597 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001598 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001599
1600 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001601 self.addCleanup(support.unlink, support.TESTFN)
1602
Victor Stinnere77c9742016-03-25 10:28:23 +01001603 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001604 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001605
1606 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001607 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001608
Thomas Wouters477c8d52006-05-27 19:21:47 +00001609 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001610 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001611
Victor Stinnere77c9742016-03-25 10:28:23 +01001612
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001613class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001614 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001615 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1616 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001617 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001618 def get_single(f):
1619 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001620 if hasattr(os, f):
1621 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001622 return helper
1623 for f in singles:
1624 locals()["test_"+f] = get_single(f)
1625
Benjamin Peterson7522c742009-01-19 21:00:09 +00001626 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001627 try:
1628 f(support.make_bad_fd(), *args)
1629 except OSError as e:
1630 self.assertEqual(e.errno, errno.EBADF)
1631 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001632 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001633 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001634
Serhiy Storchaka43767632013-11-03 21:31:38 +02001635 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001636 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001638
Serhiy Storchaka43767632013-11-03 21:31:38 +02001639 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001640 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001641 fd = support.make_bad_fd()
1642 # Make sure none of the descriptors we are about to close are
1643 # currently valid (issue 6542).
1644 for i in range(10):
1645 try: os.fstat(fd+i)
1646 except OSError:
1647 pass
1648 else:
1649 break
1650 if i < 2:
1651 raise unittest.SkipTest(
1652 "Unable to acquire a range of invalid file descriptors")
1653 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001654
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001657 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001658
Serhiy Storchaka43767632013-11-03 21:31:38 +02001659 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001660 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001661 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666
Serhiy Storchaka43767632013-11-03 21:31:38 +02001667 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001668 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001669 self.check(os.pathconf, "PC_NAME_MAX")
1670 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001671
Serhiy Storchaka43767632013-11-03 21:31:38 +02001672 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001673 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001674 self.check(os.truncate, 0)
1675 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001676
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001678 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001679 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001680
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001682 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684
Victor Stinner57ddf782014-01-08 15:21:28 +01001685 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1686 def test_readv(self):
1687 buf = bytearray(10)
1688 self.check(os.readv, [buf])
1689
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001691 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001692 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001693
Serhiy Storchaka43767632013-11-03 21:31:38 +02001694 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001695 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001696 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001697
Victor Stinner57ddf782014-01-08 15:21:28 +01001698 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1699 def test_writev(self):
1700 self.check(os.writev, [b'abc'])
1701
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001702 def test_inheritable(self):
1703 self.check(os.get_inheritable)
1704 self.check(os.set_inheritable, True)
1705
1706 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1707 'needs os.get_blocking() and os.set_blocking()')
1708 def test_blocking(self):
1709 self.check(os.get_blocking)
1710 self.check(os.set_blocking, True)
1711
Brian Curtin1b9df392010-11-24 20:24:31 +00001712
1713class LinkTests(unittest.TestCase):
1714 def setUp(self):
1715 self.file1 = support.TESTFN
1716 self.file2 = os.path.join(support.TESTFN + "2")
1717
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001718 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001719 for file in (self.file1, self.file2):
1720 if os.path.exists(file):
1721 os.unlink(file)
1722
Brian Curtin1b9df392010-11-24 20:24:31 +00001723 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001724 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001725
xdegaye6a55d092017-11-12 17:57:04 +01001726 try:
1727 os.link(file1, file2)
1728 except PermissionError as e:
1729 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001730 with open(file1, "r") as f1, open(file2, "r") as f2:
1731 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1732
1733 def test_link(self):
1734 self._test_link(self.file1, self.file2)
1735
1736 def test_link_bytes(self):
1737 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1738 bytes(self.file2, sys.getfilesystemencoding()))
1739
Brian Curtinf498b752010-11-30 15:54:04 +00001740 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001741 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001742 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001743 except UnicodeError:
1744 raise unittest.SkipTest("Unable to encode for this platform.")
1745
Brian Curtinf498b752010-11-30 15:54:04 +00001746 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001747 self.file2 = self.file1 + "2"
1748 self._test_link(self.file1, self.file2)
1749
Serhiy Storchaka43767632013-11-03 21:31:38 +02001750@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1751class PosixUidGidTests(unittest.TestCase):
1752 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1753 def test_setuid(self):
1754 if os.getuid() != 0:
1755 self.assertRaises(OSError, os.setuid, 0)
1756 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001757
Serhiy Storchaka43767632013-11-03 21:31:38 +02001758 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1759 def test_setgid(self):
1760 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1761 self.assertRaises(OSError, os.setgid, 0)
1762 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001763
Serhiy Storchaka43767632013-11-03 21:31:38 +02001764 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1765 def test_seteuid(self):
1766 if os.getuid() != 0:
1767 self.assertRaises(OSError, os.seteuid, 0)
1768 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001769
Serhiy Storchaka43767632013-11-03 21:31:38 +02001770 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1771 def test_setegid(self):
1772 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1773 self.assertRaises(OSError, os.setegid, 0)
1774 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001775
Serhiy Storchaka43767632013-11-03 21:31:38 +02001776 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1777 def test_setreuid(self):
1778 if os.getuid() != 0:
1779 self.assertRaises(OSError, os.setreuid, 0, 0)
1780 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1781 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001782
Serhiy Storchaka43767632013-11-03 21:31:38 +02001783 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1784 def test_setreuid_neg1(self):
1785 # Needs to accept -1. We run this in a subprocess to avoid
1786 # altering the test runner's process state (issue8045).
1787 subprocess.check_call([
1788 sys.executable, '-c',
1789 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001790
Serhiy Storchaka43767632013-11-03 21:31:38 +02001791 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1792 def test_setregid(self):
1793 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1794 self.assertRaises(OSError, os.setregid, 0, 0)
1795 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1796 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001797
Serhiy Storchaka43767632013-11-03 21:31:38 +02001798 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1799 def test_setregid_neg1(self):
1800 # Needs to accept -1. We run this in a subprocess to avoid
1801 # altering the test runner's process state (issue8045).
1802 subprocess.check_call([
1803 sys.executable, '-c',
1804 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001805
Serhiy Storchaka43767632013-11-03 21:31:38 +02001806@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1807class Pep383Tests(unittest.TestCase):
1808 def setUp(self):
1809 if support.TESTFN_UNENCODABLE:
1810 self.dir = support.TESTFN_UNENCODABLE
1811 elif support.TESTFN_NONASCII:
1812 self.dir = support.TESTFN_NONASCII
1813 else:
1814 self.dir = support.TESTFN
1815 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001816
Serhiy Storchaka43767632013-11-03 21:31:38 +02001817 bytesfn = []
1818 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001819 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001820 fn = os.fsencode(fn)
1821 except UnicodeEncodeError:
1822 return
1823 bytesfn.append(fn)
1824 add_filename(support.TESTFN_UNICODE)
1825 if support.TESTFN_UNENCODABLE:
1826 add_filename(support.TESTFN_UNENCODABLE)
1827 if support.TESTFN_NONASCII:
1828 add_filename(support.TESTFN_NONASCII)
1829 if not bytesfn:
1830 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001831
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832 self.unicodefn = set()
1833 os.mkdir(self.dir)
1834 try:
1835 for fn in bytesfn:
1836 support.create_empty_file(os.path.join(self.bdir, fn))
1837 fn = os.fsdecode(fn)
1838 if fn in self.unicodefn:
1839 raise ValueError("duplicate filename")
1840 self.unicodefn.add(fn)
1841 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001842 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001843 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001844
Serhiy Storchaka43767632013-11-03 21:31:38 +02001845 def tearDown(self):
1846 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001847
Serhiy Storchaka43767632013-11-03 21:31:38 +02001848 def test_listdir(self):
1849 expected = self.unicodefn
1850 found = set(os.listdir(self.dir))
1851 self.assertEqual(found, expected)
1852 # test listdir without arguments
1853 current_directory = os.getcwd()
1854 try:
1855 os.chdir(os.sep)
1856 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1857 finally:
1858 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001859
Serhiy Storchaka43767632013-11-03 21:31:38 +02001860 def test_open(self):
1861 for fn in self.unicodefn:
1862 f = open(os.path.join(self.dir, fn), 'rb')
1863 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001864
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865 @unittest.skipUnless(hasattr(os, 'statvfs'),
1866 "need os.statvfs()")
1867 def test_statvfs(self):
1868 # issue #9645
1869 for fn in self.unicodefn:
1870 # should not fail with file not found error
1871 fullname = os.path.join(self.dir, fn)
1872 os.statvfs(fullname)
1873
1874 def test_stat(self):
1875 for fn in self.unicodefn:
1876 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001877
Brian Curtineb24d742010-04-12 17:16:38 +00001878@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1879class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001880 def _kill(self, sig):
1881 # Start sys.executable as a subprocess and communicate from the
1882 # subprocess to the parent that the interpreter is ready. When it
1883 # becomes ready, send *sig* via os.kill to the subprocess and check
1884 # that the return code is equal to *sig*.
1885 import ctypes
1886 from ctypes import wintypes
1887 import msvcrt
1888
1889 # Since we can't access the contents of the process' stdout until the
1890 # process has exited, use PeekNamedPipe to see what's inside stdout
1891 # without waiting. This is done so we can tell that the interpreter
1892 # is started and running at a point where it could handle a signal.
1893 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1894 PeekNamedPipe.restype = wintypes.BOOL
1895 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1896 ctypes.POINTER(ctypes.c_char), # stdout buf
1897 wintypes.DWORD, # Buffer size
1898 ctypes.POINTER(wintypes.DWORD), # bytes read
1899 ctypes.POINTER(wintypes.DWORD), # bytes avail
1900 ctypes.POINTER(wintypes.DWORD)) # bytes left
1901 msg = "running"
1902 proc = subprocess.Popen([sys.executable, "-c",
1903 "import sys;"
1904 "sys.stdout.write('{}');"
1905 "sys.stdout.flush();"
1906 "input()".format(msg)],
1907 stdout=subprocess.PIPE,
1908 stderr=subprocess.PIPE,
1909 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001910 self.addCleanup(proc.stdout.close)
1911 self.addCleanup(proc.stderr.close)
1912 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001913
1914 count, max = 0, 100
1915 while count < max and proc.poll() is None:
1916 # Create a string buffer to store the result of stdout from the pipe
1917 buf = ctypes.create_string_buffer(len(msg))
1918 # Obtain the text currently in proc.stdout
1919 # Bytes read/avail/left are left as NULL and unused
1920 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1921 buf, ctypes.sizeof(buf), None, None, None)
1922 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1923 if buf.value:
1924 self.assertEqual(msg, buf.value.decode())
1925 break
1926 time.sleep(0.1)
1927 count += 1
1928 else:
1929 self.fail("Did not receive communication from the subprocess")
1930
Brian Curtineb24d742010-04-12 17:16:38 +00001931 os.kill(proc.pid, sig)
1932 self.assertEqual(proc.wait(), sig)
1933
1934 def test_kill_sigterm(self):
1935 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001936 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001937
1938 def test_kill_int(self):
1939 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001940 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001941
1942 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001943 tagname = "test_os_%s" % uuid.uuid1()
1944 m = mmap.mmap(-1, 1, tagname)
1945 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001946 # Run a script which has console control handling enabled.
1947 proc = subprocess.Popen([sys.executable,
1948 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001949 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001950 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1951 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001952 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001953 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001954 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001955 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001956 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001957 count += 1
1958 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001959 # Forcefully kill the process if we weren't able to signal it.
1960 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001961 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001962 os.kill(proc.pid, event)
1963 # proc.send_signal(event) could also be done here.
1964 # Allow time for the signal to be passed and the process to exit.
1965 time.sleep(0.5)
1966 if not proc.poll():
1967 # Forcefully kill the process if we weren't able to signal it.
1968 os.kill(proc.pid, signal.SIGINT)
1969 self.fail("subprocess did not stop on {}".format(name))
1970
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001971 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001972 def test_CTRL_C_EVENT(self):
1973 from ctypes import wintypes
1974 import ctypes
1975
1976 # Make a NULL value by creating a pointer with no argument.
1977 NULL = ctypes.POINTER(ctypes.c_int)()
1978 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1979 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1980 wintypes.BOOL)
1981 SetConsoleCtrlHandler.restype = wintypes.BOOL
1982
1983 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001984 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001985 # by subprocesses.
1986 SetConsoleCtrlHandler(NULL, 0)
1987
1988 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1989
1990 def test_CTRL_BREAK_EVENT(self):
1991 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1992
1993
Brian Curtind40e6f72010-07-08 21:39:08 +00001994@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001995class Win32ListdirTests(unittest.TestCase):
1996 """Test listdir on Windows."""
1997
1998 def setUp(self):
1999 self.created_paths = []
2000 for i in range(2):
2001 dir_name = 'SUB%d' % i
2002 dir_path = os.path.join(support.TESTFN, dir_name)
2003 file_name = 'FILE%d' % i
2004 file_path = os.path.join(support.TESTFN, file_name)
2005 os.makedirs(dir_path)
2006 with open(file_path, 'w') as f:
2007 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2008 self.created_paths.extend([dir_name, file_name])
2009 self.created_paths.sort()
2010
2011 def tearDown(self):
2012 shutil.rmtree(support.TESTFN)
2013
2014 def test_listdir_no_extended_path(self):
2015 """Test when the path is not an "extended" path."""
2016 # unicode
2017 self.assertEqual(
2018 sorted(os.listdir(support.TESTFN)),
2019 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002020
Tim Golden781bbeb2013-10-25 20:24:06 +01002021 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002022 self.assertEqual(
2023 sorted(os.listdir(os.fsencode(support.TESTFN))),
2024 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002025
2026 def test_listdir_extended_path(self):
2027 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002028 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002029 # unicode
2030 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2031 self.assertEqual(
2032 sorted(os.listdir(path)),
2033 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002034
Tim Golden781bbeb2013-10-25 20:24:06 +01002035 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002036 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2037 self.assertEqual(
2038 sorted(os.listdir(path)),
2039 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002040
2041
Berker Peksage0b5b202018-08-15 13:03:41 +03002042@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2043class ReadlinkTests(unittest.TestCase):
2044 filelink = 'readlinktest'
2045 filelink_target = os.path.abspath(__file__)
2046 filelinkb = os.fsencode(filelink)
2047 filelinkb_target = os.fsencode(filelink_target)
2048
2049 def setUp(self):
2050 self.assertTrue(os.path.exists(self.filelink_target))
2051 self.assertTrue(os.path.exists(self.filelinkb_target))
2052 self.assertFalse(os.path.exists(self.filelink))
2053 self.assertFalse(os.path.exists(self.filelinkb))
2054
2055 def test_not_symlink(self):
2056 filelink_target = FakePath(self.filelink_target)
2057 self.assertRaises(OSError, os.readlink, self.filelink_target)
2058 self.assertRaises(OSError, os.readlink, filelink_target)
2059
2060 def test_missing_link(self):
2061 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2062 self.assertRaises(FileNotFoundError, os.readlink,
2063 FakePath('missing-link'))
2064
2065 @support.skip_unless_symlink
2066 def test_pathlike(self):
2067 os.symlink(self.filelink_target, self.filelink)
2068 self.addCleanup(support.unlink, self.filelink)
2069 filelink = FakePath(self.filelink)
2070 self.assertEqual(os.readlink(filelink), self.filelink_target)
2071
2072 @support.skip_unless_symlink
2073 def test_pathlike_bytes(self):
2074 os.symlink(self.filelinkb_target, self.filelinkb)
2075 self.addCleanup(support.unlink, self.filelinkb)
2076 path = os.readlink(FakePath(self.filelinkb))
2077 self.assertEqual(path, self.filelinkb_target)
2078 self.assertIsInstance(path, bytes)
2079
2080 @support.skip_unless_symlink
2081 def test_bytes(self):
2082 os.symlink(self.filelinkb_target, self.filelinkb)
2083 self.addCleanup(support.unlink, self.filelinkb)
2084 path = os.readlink(self.filelinkb)
2085 self.assertEqual(path, self.filelinkb_target)
2086 self.assertIsInstance(path, bytes)
2087
2088
Tim Golden781bbeb2013-10-25 20:24:06 +01002089@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002090@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002091class Win32SymlinkTests(unittest.TestCase):
2092 filelink = 'filelinktest'
2093 filelink_target = os.path.abspath(__file__)
2094 dirlink = 'dirlinktest'
2095 dirlink_target = os.path.dirname(filelink_target)
2096 missing_link = 'missing link'
2097
2098 def setUp(self):
2099 assert os.path.exists(self.dirlink_target)
2100 assert os.path.exists(self.filelink_target)
2101 assert not os.path.exists(self.dirlink)
2102 assert not os.path.exists(self.filelink)
2103 assert not os.path.exists(self.missing_link)
2104
2105 def tearDown(self):
2106 if os.path.exists(self.filelink):
2107 os.remove(self.filelink)
2108 if os.path.exists(self.dirlink):
2109 os.rmdir(self.dirlink)
2110 if os.path.lexists(self.missing_link):
2111 os.remove(self.missing_link)
2112
2113 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002114 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002115 self.assertTrue(os.path.exists(self.dirlink))
2116 self.assertTrue(os.path.isdir(self.dirlink))
2117 self.assertTrue(os.path.islink(self.dirlink))
2118 self.check_stat(self.dirlink, self.dirlink_target)
2119
2120 def test_file_link(self):
2121 os.symlink(self.filelink_target, self.filelink)
2122 self.assertTrue(os.path.exists(self.filelink))
2123 self.assertTrue(os.path.isfile(self.filelink))
2124 self.assertTrue(os.path.islink(self.filelink))
2125 self.check_stat(self.filelink, self.filelink_target)
2126
2127 def _create_missing_dir_link(self):
2128 'Create a "directory" link to a non-existent target'
2129 linkname = self.missing_link
2130 if os.path.lexists(linkname):
2131 os.remove(linkname)
2132 target = r'c:\\target does not exist.29r3c740'
2133 assert not os.path.exists(target)
2134 target_is_dir = True
2135 os.symlink(target, linkname, target_is_dir)
2136
2137 def test_remove_directory_link_to_missing_target(self):
2138 self._create_missing_dir_link()
2139 # For compatibility with Unix, os.remove will check the
2140 # directory status and call RemoveDirectory if the symlink
2141 # was created with target_is_dir==True.
2142 os.remove(self.missing_link)
2143
2144 @unittest.skip("currently fails; consider for improvement")
2145 def test_isdir_on_directory_link_to_missing_target(self):
2146 self._create_missing_dir_link()
2147 # consider having isdir return true for directory links
2148 self.assertTrue(os.path.isdir(self.missing_link))
2149
2150 @unittest.skip("currently fails; consider for improvement")
2151 def test_rmdir_on_directory_link_to_missing_target(self):
2152 self._create_missing_dir_link()
2153 # consider allowing rmdir to remove directory links
2154 os.rmdir(self.missing_link)
2155
2156 def check_stat(self, link, target):
2157 self.assertEqual(os.stat(link), os.stat(target))
2158 self.assertNotEqual(os.lstat(link), os.stat(link))
2159
Brian Curtind25aef52011-06-13 15:16:04 -05002160 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002161 self.assertEqual(os.stat(bytes_link), os.stat(target))
2162 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002163
2164 def test_12084(self):
2165 level1 = os.path.abspath(support.TESTFN)
2166 level2 = os.path.join(level1, "level2")
2167 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002168 self.addCleanup(support.rmtree, level1)
2169
2170 os.mkdir(level1)
2171 os.mkdir(level2)
2172 os.mkdir(level3)
2173
2174 file1 = os.path.abspath(os.path.join(level1, "file1"))
2175 create_file(file1)
2176
2177 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002178 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002179 os.chdir(level2)
2180 link = os.path.join(level2, "link")
2181 os.symlink(os.path.relpath(file1), "link")
2182 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002183
Victor Stinnerae39d232016-03-24 17:12:55 +01002184 # Check os.stat calls from the same dir as the link
2185 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002186
Victor Stinnerae39d232016-03-24 17:12:55 +01002187 # Check os.stat calls from a dir below the link
2188 os.chdir(level1)
2189 self.assertEqual(os.stat(file1),
2190 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002191
Victor Stinnerae39d232016-03-24 17:12:55 +01002192 # Check os.stat calls from a dir above the link
2193 os.chdir(level3)
2194 self.assertEqual(os.stat(file1),
2195 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002196 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002197 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002198
SSE43c34aad2018-02-13 00:10:35 +07002199 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2200 and os.path.exists(r'C:\ProgramData'),
2201 'Test directories not found')
2202 def test_29248(self):
2203 # os.symlink() calls CreateSymbolicLink, which creates
2204 # the reparse data buffer with the print name stored
2205 # first, so the offset is always 0. CreateSymbolicLink
2206 # stores the "PrintName" DOS path (e.g. "C:\") first,
2207 # with an offset of 0, followed by the "SubstituteName"
2208 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2209 # the other hand, seems to have been created manually
2210 # with an inverted order.
2211 target = os.readlink(r'C:\Users\All Users')
2212 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2213
Steve Dower6921e732018-03-05 14:26:08 -08002214 def test_buffer_overflow(self):
2215 # Older versions would have a buffer overflow when detecting
2216 # whether a link source was a directory. This test ensures we
2217 # no longer crash, but does not otherwise validate the behavior
2218 segment = 'X' * 27
2219 path = os.path.join(*[segment] * 10)
2220 test_cases = [
2221 # overflow with absolute src
2222 ('\\' + path, segment),
2223 # overflow dest with relative src
2224 (segment, path),
2225 # overflow when joining src
2226 (path[:180], path[:180]),
2227 ]
2228 for src, dest in test_cases:
2229 try:
2230 os.symlink(src, dest)
2231 except FileNotFoundError:
2232 pass
2233 else:
2234 try:
2235 os.remove(dest)
2236 except OSError:
2237 pass
2238 # Also test with bytes, since that is a separate code path.
2239 try:
2240 os.symlink(os.fsencode(src), os.fsencode(dest))
2241 except FileNotFoundError:
2242 pass
2243 else:
2244 try:
2245 os.remove(dest)
2246 except OSError:
2247 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002248
Tim Golden0321cf22014-05-05 19:46:17 +01002249@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2250class Win32JunctionTests(unittest.TestCase):
2251 junction = 'junctiontest'
2252 junction_target = os.path.dirname(os.path.abspath(__file__))
2253
2254 def setUp(self):
2255 assert os.path.exists(self.junction_target)
2256 assert not os.path.exists(self.junction)
2257
2258 def tearDown(self):
2259 if os.path.exists(self.junction):
2260 # os.rmdir delegates to Windows' RemoveDirectoryW,
2261 # which removes junction points safely.
2262 os.rmdir(self.junction)
2263
2264 def test_create_junction(self):
2265 _winapi.CreateJunction(self.junction_target, self.junction)
2266 self.assertTrue(os.path.exists(self.junction))
2267 self.assertTrue(os.path.isdir(self.junction))
2268
2269 # Junctions are not recognized as links.
2270 self.assertFalse(os.path.islink(self.junction))
2271
2272 def test_unlink_removes_junction(self):
2273 _winapi.CreateJunction(self.junction_target, self.junction)
2274 self.assertTrue(os.path.exists(self.junction))
2275
2276 os.unlink(self.junction)
2277 self.assertFalse(os.path.exists(self.junction))
2278
2279
Jason R. Coombs3a092862013-05-27 23:21:28 -04002280@support.skip_unless_symlink
2281class NonLocalSymlinkTests(unittest.TestCase):
2282
2283 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002284 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002285 Create this structure:
2286
2287 base
2288 \___ some_dir
2289 """
2290 os.makedirs('base/some_dir')
2291
2292 def tearDown(self):
2293 shutil.rmtree('base')
2294
2295 def test_directory_link_nonlocal(self):
2296 """
2297 The symlink target should resolve relative to the link, not relative
2298 to the current directory.
2299
2300 Then, link base/some_link -> base/some_dir and ensure that some_link
2301 is resolved as a directory.
2302
2303 In issue13772, it was discovered that directory detection failed if
2304 the symlink target was not specified relative to the current
2305 directory, which was a defect in the implementation.
2306 """
2307 src = os.path.join('base', 'some_link')
2308 os.symlink('some_dir', src)
2309 assert os.path.isdir(src)
2310
2311
Victor Stinnere8d51452010-08-19 01:05:19 +00002312class FSEncodingTests(unittest.TestCase):
2313 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002314 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2315 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002316
Victor Stinnere8d51452010-08-19 01:05:19 +00002317 def test_identity(self):
2318 # assert fsdecode(fsencode(x)) == x
2319 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2320 try:
2321 bytesfn = os.fsencode(fn)
2322 except UnicodeEncodeError:
2323 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002324 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002325
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002326
Brett Cannonefb00c02012-02-29 18:31:31 -05002327
2328class DeviceEncodingTests(unittest.TestCase):
2329
2330 def test_bad_fd(self):
2331 # Return None when an fd doesn't actually exist.
2332 self.assertIsNone(os.device_encoding(123456))
2333
Philip Jenveye308b7c2012-02-29 16:16:15 -08002334 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2335 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002336 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002337 def test_device_encoding(self):
2338 encoding = os.device_encoding(0)
2339 self.assertIsNotNone(encoding)
2340 self.assertTrue(codecs.lookup(encoding))
2341
2342
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002343class PidTests(unittest.TestCase):
2344 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2345 def test_getppid(self):
2346 p = subprocess.Popen([sys.executable, '-c',
2347 'import os; print(os.getppid())'],
2348 stdout=subprocess.PIPE)
2349 stdout, _ = p.communicate()
2350 # We are the parent of our subprocess
2351 self.assertEqual(int(stdout), os.getpid())
2352
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002353 def test_waitpid(self):
2354 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002355 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002356 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002357 status = os.waitpid(pid, 0)
2358 self.assertEqual(status, (pid, 0))
2359
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002360
Victor Stinner4659ccf2016-09-14 10:57:00 +02002361class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002362 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002363 self.exitcode = 17
2364
2365 filename = support.TESTFN
2366 self.addCleanup(support.unlink, filename)
2367
2368 if not with_env:
2369 code = 'import sys; sys.exit(%s)' % self.exitcode
2370 else:
2371 self.env = dict(os.environ)
2372 # create an unique key
2373 self.key = str(uuid.uuid4())
2374 self.env[self.key] = self.key
2375 # read the variable from os.environ to check that it exists
2376 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2377 % (self.key, self.exitcode))
2378
2379 with open(filename, "w") as fp:
2380 fp.write(code)
2381
Berker Peksag81816462016-09-15 20:19:47 +03002382 args = [sys.executable, filename]
2383 if use_bytes:
2384 args = [os.fsencode(a) for a in args]
2385 self.env = {os.fsencode(k): os.fsencode(v)
2386 for k, v in self.env.items()}
2387
2388 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002389
Berker Peksag4af23d72016-09-15 20:32:44 +03002390 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002391 def test_spawnl(self):
2392 args = self.create_args()
2393 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2394 self.assertEqual(exitcode, self.exitcode)
2395
Berker Peksag4af23d72016-09-15 20:32:44 +03002396 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002397 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002398 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002399 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2400 self.assertEqual(exitcode, self.exitcode)
2401
Berker Peksag4af23d72016-09-15 20:32:44 +03002402 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002403 def test_spawnlp(self):
2404 args = self.create_args()
2405 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2406 self.assertEqual(exitcode, self.exitcode)
2407
Berker Peksag4af23d72016-09-15 20:32:44 +03002408 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002409 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002410 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002411 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2412 self.assertEqual(exitcode, self.exitcode)
2413
Berker Peksag4af23d72016-09-15 20:32:44 +03002414 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002415 def test_spawnv(self):
2416 args = self.create_args()
2417 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2418 self.assertEqual(exitcode, self.exitcode)
2419
Berker Peksag4af23d72016-09-15 20:32:44 +03002420 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002421 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002422 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002423 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2424 self.assertEqual(exitcode, self.exitcode)
2425
Berker Peksag4af23d72016-09-15 20:32:44 +03002426 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002427 def test_spawnvp(self):
2428 args = self.create_args()
2429 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2430 self.assertEqual(exitcode, self.exitcode)
2431
Berker Peksag4af23d72016-09-15 20:32:44 +03002432 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002433 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002434 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002435 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2436 self.assertEqual(exitcode, self.exitcode)
2437
Berker Peksag4af23d72016-09-15 20:32:44 +03002438 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002439 def test_nowait(self):
2440 args = self.create_args()
2441 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2442 result = os.waitpid(pid, 0)
2443 self.assertEqual(result[0], pid)
2444 status = result[1]
2445 if hasattr(os, 'WIFEXITED'):
2446 self.assertTrue(os.WIFEXITED(status))
2447 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2448 else:
2449 self.assertEqual(status, self.exitcode << 8)
2450
Berker Peksag4af23d72016-09-15 20:32:44 +03002451 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002452 def test_spawnve_bytes(self):
2453 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2454 args = self.create_args(with_env=True, use_bytes=True)
2455 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2456 self.assertEqual(exitcode, self.exitcode)
2457
Steve Dower859fd7b2016-11-19 18:53:19 -08002458 @requires_os_func('spawnl')
2459 def test_spawnl_noargs(self):
2460 args = self.create_args()
2461 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002462 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002463
2464 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002465 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002466 args = self.create_args()
2467 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002468 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002469
2470 @requires_os_func('spawnv')
2471 def test_spawnv_noargs(self):
2472 args = self.create_args()
2473 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2474 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002475 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2476 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002477
2478 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002479 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002480 args = self.create_args()
2481 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2482 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002483 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2484 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002485
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002486 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002487 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002488
Ville Skyttä49b27342017-08-03 09:00:59 +03002489 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002490 newenv = os.environ.copy()
2491 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2492 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002493 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002494 except ValueError:
2495 pass
2496 else:
2497 self.assertEqual(exitcode, 127)
2498
Ville Skyttä49b27342017-08-03 09:00:59 +03002499 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002500 newenv = os.environ.copy()
2501 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2502 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002503 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002504 except ValueError:
2505 pass
2506 else:
2507 self.assertEqual(exitcode, 127)
2508
Ville Skyttä49b27342017-08-03 09:00:59 +03002509 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002510 newenv = os.environ.copy()
2511 newenv["FRUIT=ORANGE"] = "lemon"
2512 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002513 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002514 except ValueError:
2515 pass
2516 else:
2517 self.assertEqual(exitcode, 127)
2518
Ville Skyttä49b27342017-08-03 09:00:59 +03002519 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002520 filename = support.TESTFN
2521 self.addCleanup(support.unlink, filename)
2522 with open(filename, "w") as fp:
2523 fp.write('import sys, os\n'
2524 'if os.getenv("FRUIT") != "orange=lemon":\n'
2525 ' raise AssertionError')
2526 args = [sys.executable, filename]
2527 newenv = os.environ.copy()
2528 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002529 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002530 self.assertEqual(exitcode, 0)
2531
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002532 @requires_os_func('spawnve')
2533 def test_spawnve_invalid_env(self):
2534 self._test_invalid_env(os.spawnve)
2535
2536 @requires_os_func('spawnvpe')
2537 def test_spawnvpe_invalid_env(self):
2538 self._test_invalid_env(os.spawnvpe)
2539
Serhiy Storchaka77703942017-06-25 07:33:01 +03002540
Brian Curtin0151b8e2010-09-24 13:43:43 +00002541# The introduction of this TestCase caused at least two different errors on
2542# *nix buildbots. Temporarily skip this to let the buildbots move along.
2543@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002544@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2545class LoginTests(unittest.TestCase):
2546 def test_getlogin(self):
2547 user_name = os.getlogin()
2548 self.assertNotEqual(len(user_name), 0)
2549
2550
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002551@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2552 "needs os.getpriority and os.setpriority")
2553class ProgramPriorityTests(unittest.TestCase):
2554 """Tests for os.getpriority() and os.setpriority()."""
2555
2556 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002557
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002558 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2559 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2560 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002561 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2562 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002563 raise unittest.SkipTest("unable to reliably test setpriority "
2564 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002565 else:
2566 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002567 finally:
2568 try:
2569 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2570 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002571 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002572 raise
2573
2574
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002575class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002576
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002577 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002578
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002579 def __init__(self, conn):
2580 asynchat.async_chat.__init__(self, conn)
2581 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002582 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 self.closed = False
2584 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002585
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 def handle_read(self):
2587 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002588 if self.accumulate:
2589 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002590
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002591 def get_data(self):
2592 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002593
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002595 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002596 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002597
2598 def handle_error(self):
2599 raise
2600
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002601 def __init__(self, address):
2602 threading.Thread.__init__(self)
2603 asyncore.dispatcher.__init__(self)
2604 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2605 self.bind(address)
2606 self.listen(5)
2607 self.host, self.port = self.socket.getsockname()[:2]
2608 self.handler_instance = None
2609 self._active = False
2610 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002611
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612 # --- public API
2613
2614 @property
2615 def running(self):
2616 return self._active
2617
2618 def start(self):
2619 assert not self.running
2620 self.__flag = threading.Event()
2621 threading.Thread.start(self)
2622 self.__flag.wait()
2623
2624 def stop(self):
2625 assert self.running
2626 self._active = False
2627 self.join()
2628
2629 def wait(self):
2630 # wait for handler connection to be closed, then stop the server
2631 while not getattr(self.handler_instance, "closed", False):
2632 time.sleep(0.001)
2633 self.stop()
2634
2635 # --- internals
2636
2637 def run(self):
2638 self._active = True
2639 self.__flag.set()
2640 while self._active and asyncore.socket_map:
2641 self._active_lock.acquire()
2642 asyncore.loop(timeout=0.001, count=1)
2643 self._active_lock.release()
2644 asyncore.close_all()
2645
2646 def handle_accept(self):
2647 conn, addr = self.accept()
2648 self.handler_instance = self.Handler(conn)
2649
2650 def handle_connect(self):
2651 self.close()
2652 handle_read = handle_connect
2653
2654 def writable(self):
2655 return 0
2656
2657 def handle_error(self):
2658 raise
2659
2660
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002661@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2662class TestSendfile(unittest.TestCase):
2663
Victor Stinner8c663fd2017-11-08 14:44:44 -08002664 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002665 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002666 not sys.platform.startswith("solaris") and \
2667 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002668 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2669 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002670 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2671 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002672
2673 @classmethod
2674 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002675 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002676 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002677
2678 @classmethod
2679 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002680 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002681 support.unlink(support.TESTFN)
2682
2683 def setUp(self):
2684 self.server = SendfileTestServer((support.HOST, 0))
2685 self.server.start()
2686 self.client = socket.socket()
2687 self.client.connect((self.server.host, self.server.port))
2688 self.client.settimeout(1)
2689 # synchronize by waiting for "220 ready" response
2690 self.client.recv(1024)
2691 self.sockno = self.client.fileno()
2692 self.file = open(support.TESTFN, 'rb')
2693 self.fileno = self.file.fileno()
2694
2695 def tearDown(self):
2696 self.file.close()
2697 self.client.close()
2698 if self.server.running:
2699 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002700 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002701
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002702 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002703 """A higher level wrapper representing how an application is
2704 supposed to use sendfile().
2705 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002706 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002707 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002708 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002709 except OSError as err:
2710 if err.errno == errno.ECONNRESET:
2711 # disconnected
2712 raise
2713 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2714 # we have to retry send data
2715 continue
2716 else:
2717 raise
2718
2719 def test_send_whole_file(self):
2720 # normal send
2721 total_sent = 0
2722 offset = 0
2723 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002724 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002725 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2726 if sent == 0:
2727 break
2728 offset += sent
2729 total_sent += sent
2730 self.assertTrue(sent <= nbytes)
2731 self.assertEqual(offset, total_sent)
2732
2733 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002734 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002735 self.client.close()
2736 self.server.wait()
2737 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002738 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002739 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002740
2741 def test_send_at_certain_offset(self):
2742 # start sending a file at a certain offset
2743 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002744 offset = len(self.DATA) // 2
2745 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002746 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002747 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002748 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2749 if sent == 0:
2750 break
2751 offset += sent
2752 total_sent += sent
2753 self.assertTrue(sent <= nbytes)
2754
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002755 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002756 self.client.close()
2757 self.server.wait()
2758 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002759 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002760 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002761 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002762 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002763
2764 def test_offset_overflow(self):
2765 # specify an offset > file size
2766 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002767 try:
2768 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2769 except OSError as e:
2770 # Solaris can raise EINVAL if offset >= file length, ignore.
2771 if e.errno != errno.EINVAL:
2772 raise
2773 else:
2774 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002775 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002776 self.client.close()
2777 self.server.wait()
2778 data = self.server.handler_instance.get_data()
2779 self.assertEqual(data, b'')
2780
2781 def test_invalid_offset(self):
2782 with self.assertRaises(OSError) as cm:
2783 os.sendfile(self.sockno, self.fileno, -1, 4096)
2784 self.assertEqual(cm.exception.errno, errno.EINVAL)
2785
Martin Panterbf19d162015-09-09 01:01:13 +00002786 def test_keywords(self):
2787 # Keyword arguments should be supported
2788 os.sendfile(out=self.sockno, offset=0, count=4096,
2789 **{'in': self.fileno})
2790 if self.SUPPORT_HEADERS_TRAILERS:
2791 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002792 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002793
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002794 # --- headers / trailers tests
2795
Serhiy Storchaka43767632013-11-03 21:31:38 +02002796 @requires_headers_trailers
2797 def test_headers(self):
2798 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002799 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002800 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002801 headers=[b"x" * 512, b"y" * 256])
2802 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002803 total_sent += sent
2804 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002805 while total_sent < len(expected_data):
2806 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002807 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2808 offset, nbytes)
2809 if sent == 0:
2810 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002811 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002812 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002813 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002814
Serhiy Storchaka43767632013-11-03 21:31:38 +02002815 self.assertEqual(total_sent, len(expected_data))
2816 self.client.close()
2817 self.server.wait()
2818 data = self.server.handler_instance.get_data()
2819 self.assertEqual(hash(data), hash(expected_data))
2820
2821 @requires_headers_trailers
2822 def test_trailers(self):
2823 TESTFN2 = support.TESTFN + "2"
2824 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002825
2826 self.addCleanup(support.unlink, TESTFN2)
2827 create_file(TESTFN2, file_data)
2828
2829 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002830 os.sendfile(self.sockno, f.fileno(), 0, 5,
2831 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002832 self.client.close()
2833 self.server.wait()
2834 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002835 self.assertEqual(data, b"abcde123456789")
2836
2837 @requires_headers_trailers
2838 @requires_32b
2839 def test_headers_overflow_32bits(self):
2840 self.server.handler_instance.accumulate = False
2841 with self.assertRaises(OSError) as cm:
2842 os.sendfile(self.sockno, self.fileno, 0, 0,
2843 headers=[b"x" * 2**16] * 2**15)
2844 self.assertEqual(cm.exception.errno, errno.EINVAL)
2845
2846 @requires_headers_trailers
2847 @requires_32b
2848 def test_trailers_overflow_32bits(self):
2849 self.server.handler_instance.accumulate = False
2850 with self.assertRaises(OSError) as cm:
2851 os.sendfile(self.sockno, self.fileno, 0, 0,
2852 trailers=[b"x" * 2**16] * 2**15)
2853 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002854
Serhiy Storchaka43767632013-11-03 21:31:38 +02002855 @requires_headers_trailers
2856 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2857 'test needs os.SF_NODISKIO')
2858 def test_flags(self):
2859 try:
2860 os.sendfile(self.sockno, self.fileno, 0, 4096,
2861 flags=os.SF_NODISKIO)
2862 except OSError as err:
2863 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2864 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002865
2866
Larry Hastings9cf065c2012-06-22 16:30:09 -07002867def supports_extended_attributes():
2868 if not hasattr(os, "setxattr"):
2869 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002870
Larry Hastings9cf065c2012-06-22 16:30:09 -07002871 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002872 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002873 try:
2874 os.setxattr(fp.fileno(), b"user.test", b"")
2875 except OSError:
2876 return False
2877 finally:
2878 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002879
2880 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002881
2882
2883@unittest.skipUnless(supports_extended_attributes(),
2884 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002885# Kernels < 2.6.39 don't respect setxattr flags.
2886@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002887class ExtendedAttributeTests(unittest.TestCase):
2888
Larry Hastings9cf065c2012-06-22 16:30:09 -07002889 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002890 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002891 self.addCleanup(support.unlink, fn)
2892 create_file(fn)
2893
Benjamin Peterson799bd802011-08-31 22:15:17 -04002894 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002895 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002896 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002897
Victor Stinnerf12e5062011-10-16 22:12:03 +02002898 init_xattr = listxattr(fn)
2899 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002900
Larry Hastings9cf065c2012-06-22 16:30:09 -07002901 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002902 xattr = set(init_xattr)
2903 xattr.add("user.test")
2904 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002905 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2906 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2907 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002908
Benjamin Peterson799bd802011-08-31 22:15:17 -04002909 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002910 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002911 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002912
Benjamin Peterson799bd802011-08-31 22:15:17 -04002913 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002914 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002915 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002916
Larry Hastings9cf065c2012-06-22 16:30:09 -07002917 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002918 xattr.add("user.test2")
2919 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002920 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002921
Benjamin Peterson799bd802011-08-31 22:15:17 -04002922 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002923 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002924 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002925
Victor Stinnerf12e5062011-10-16 22:12:03 +02002926 xattr.remove("user.test")
2927 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002928 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2929 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2930 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2931 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002932 many = sorted("user.test{}".format(i) for i in range(100))
2933 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002934 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002935 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002936
Larry Hastings9cf065c2012-06-22 16:30:09 -07002937 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002938 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002939 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002940
2941 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2942 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002943
2944 def test_simple(self):
2945 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2946 os.listxattr)
2947
2948 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002949 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2950 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002951
2952 def test_fds(self):
2953 def getxattr(path, *args):
2954 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002955 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002956 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002957 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002958 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002959 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002960 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002961 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002962 def listxattr(path, *args):
2963 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002964 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002965 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2966
2967
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002968@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2969class TermsizeTests(unittest.TestCase):
2970 def test_does_not_crash(self):
2971 """Check if get_terminal_size() returns a meaningful value.
2972
2973 There's no easy portable way to actually check the size of the
2974 terminal, so let's check if it returns something sensible instead.
2975 """
2976 try:
2977 size = os.get_terminal_size()
2978 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002979 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002980 # Under win32 a generic OSError can be thrown if the
2981 # handle cannot be retrieved
2982 self.skipTest("failed to query terminal size")
2983 raise
2984
Antoine Pitroucfade362012-02-08 23:48:59 +01002985 self.assertGreaterEqual(size.columns, 0)
2986 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002987
2988 def test_stty_match(self):
2989 """Check if stty returns the same results
2990
2991 stty actually tests stdin, so get_terminal_size is invoked on
2992 stdin explicitly. If stty succeeded, then get_terminal_size()
2993 should work too.
2994 """
2995 try:
2996 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01002997 except (FileNotFoundError, subprocess.CalledProcessError,
2998 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002999 self.skipTest("stty invocation failed")
3000 expected = (int(size[1]), int(size[0])) # reversed order
3001
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003002 try:
3003 actual = os.get_terminal_size(sys.__stdin__.fileno())
3004 except OSError as e:
3005 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3006 # Under win32 a generic OSError can be thrown if the
3007 # handle cannot be retrieved
3008 self.skipTest("failed to query terminal size")
3009 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003010 self.assertEqual(expected, actual)
3011
3012
Victor Stinner292c8352012-10-30 02:17:38 +01003013class OSErrorTests(unittest.TestCase):
3014 def setUp(self):
3015 class Str(str):
3016 pass
3017
Victor Stinnerafe17062012-10-31 22:47:43 +01003018 self.bytes_filenames = []
3019 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003020 if support.TESTFN_UNENCODABLE is not None:
3021 decoded = support.TESTFN_UNENCODABLE
3022 else:
3023 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003024 self.unicode_filenames.append(decoded)
3025 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003026 if support.TESTFN_UNDECODABLE is not None:
3027 encoded = support.TESTFN_UNDECODABLE
3028 else:
3029 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003030 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003031 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003032 self.bytes_filenames.append(memoryview(encoded))
3033
3034 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003035
3036 def test_oserror_filename(self):
3037 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003038 (self.filenames, os.chdir,),
3039 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003040 (self.filenames, os.lstat,),
3041 (self.filenames, os.open, os.O_RDONLY),
3042 (self.filenames, os.rmdir,),
3043 (self.filenames, os.stat,),
3044 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003045 ]
3046 if sys.platform == "win32":
3047 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003048 (self.bytes_filenames, os.rename, b"dst"),
3049 (self.bytes_filenames, os.replace, b"dst"),
3050 (self.unicode_filenames, os.rename, "dst"),
3051 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003052 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003053 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003054 else:
3055 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003056 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003057 (self.filenames, os.rename, "dst"),
3058 (self.filenames, os.replace, "dst"),
3059 ))
3060 if hasattr(os, "chown"):
3061 funcs.append((self.filenames, os.chown, 0, 0))
3062 if hasattr(os, "lchown"):
3063 funcs.append((self.filenames, os.lchown, 0, 0))
3064 if hasattr(os, "truncate"):
3065 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003066 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003067 funcs.append((self.filenames, os.chflags, 0))
3068 if hasattr(os, "lchflags"):
3069 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003070 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003071 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003072 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003073 if sys.platform == "win32":
3074 funcs.append((self.bytes_filenames, os.link, b"dst"))
3075 funcs.append((self.unicode_filenames, os.link, "dst"))
3076 else:
3077 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003078 if hasattr(os, "listxattr"):
3079 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003080 (self.filenames, os.listxattr,),
3081 (self.filenames, os.getxattr, "user.test"),
3082 (self.filenames, os.setxattr, "user.test", b'user'),
3083 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003084 ))
3085 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003086 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003087 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003088 if sys.platform == "win32":
3089 funcs.append((self.unicode_filenames, os.readlink,))
3090 else:
3091 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003092
Steve Dowercc16be82016-09-08 10:35:16 -07003093
Victor Stinnerafe17062012-10-31 22:47:43 +01003094 for filenames, func, *func_args in funcs:
3095 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003096 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003097 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003098 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003099 else:
3100 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3101 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003102 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003103 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003104 except UnicodeDecodeError:
3105 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003106 else:
3107 self.fail("No exception thrown by {}".format(func))
3108
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003109class CPUCountTests(unittest.TestCase):
3110 def test_cpu_count(self):
3111 cpus = os.cpu_count()
3112 if cpus is not None:
3113 self.assertIsInstance(cpus, int)
3114 self.assertGreater(cpus, 0)
3115 else:
3116 self.skipTest("Could not determine the number of CPUs")
3117
Victor Stinnerdaf45552013-08-28 00:53:59 +02003118
3119class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003120 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003121 fd = os.open(__file__, os.O_RDONLY)
3122 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003123 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003124
Victor Stinnerdaf45552013-08-28 00:53:59 +02003125 os.set_inheritable(fd, True)
3126 self.assertEqual(os.get_inheritable(fd), True)
3127
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003128 @unittest.skipIf(fcntl is None, "need fcntl")
3129 def test_get_inheritable_cloexec(self):
3130 fd = os.open(__file__, os.O_RDONLY)
3131 self.addCleanup(os.close, fd)
3132 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003133
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003134 # clear FD_CLOEXEC flag
3135 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3136 flags &= ~fcntl.FD_CLOEXEC
3137 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003138
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003139 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003140
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003141 @unittest.skipIf(fcntl is None, "need fcntl")
3142 def test_set_inheritable_cloexec(self):
3143 fd = os.open(__file__, os.O_RDONLY)
3144 self.addCleanup(os.close, fd)
3145 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3146 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003147
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003148 os.set_inheritable(fd, True)
3149 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3150 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003151
Victor Stinnerdaf45552013-08-28 00:53:59 +02003152 def test_open(self):
3153 fd = os.open(__file__, os.O_RDONLY)
3154 self.addCleanup(os.close, fd)
3155 self.assertEqual(os.get_inheritable(fd), False)
3156
3157 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3158 def test_pipe(self):
3159 rfd, wfd = os.pipe()
3160 self.addCleanup(os.close, rfd)
3161 self.addCleanup(os.close, wfd)
3162 self.assertEqual(os.get_inheritable(rfd), False)
3163 self.assertEqual(os.get_inheritable(wfd), False)
3164
3165 def test_dup(self):
3166 fd1 = os.open(__file__, os.O_RDONLY)
3167 self.addCleanup(os.close, fd1)
3168
3169 fd2 = os.dup(fd1)
3170 self.addCleanup(os.close, fd2)
3171 self.assertEqual(os.get_inheritable(fd2), False)
3172
3173 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3174 def test_dup2(self):
3175 fd = os.open(__file__, os.O_RDONLY)
3176 self.addCleanup(os.close, fd)
3177
3178 # inheritable by default
3179 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003180 self.addCleanup(os.close, fd2)
3181 self.assertEqual(os.dup2(fd, fd2), fd2)
3182 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003183
3184 # force non-inheritable
3185 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003186 self.addCleanup(os.close, fd3)
3187 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3188 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003189
3190 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3191 def test_openpty(self):
3192 master_fd, slave_fd = os.openpty()
3193 self.addCleanup(os.close, master_fd)
3194 self.addCleanup(os.close, slave_fd)
3195 self.assertEqual(os.get_inheritable(master_fd), False)
3196 self.assertEqual(os.get_inheritable(slave_fd), False)
3197
3198
Brett Cannon3f9183b2016-08-26 14:44:48 -07003199class PathTConverterTests(unittest.TestCase):
3200 # tuples of (function name, allows fd arguments, additional arguments to
3201 # function, cleanup function)
3202 functions = [
3203 ('stat', True, (), None),
3204 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003205 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003206 ('chflags', False, (0,), None),
3207 ('lchflags', False, (0,), None),
3208 ('open', False, (0,), getattr(os, 'close', None)),
3209 ]
3210
3211 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003212 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003213 if os.name == 'nt':
3214 bytes_fspath = bytes_filename = None
3215 else:
3216 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003217 bytes_fspath = FakePath(bytes_filename)
3218 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003219 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003220 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003221
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003222 int_fspath = FakePath(fd)
3223 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003224
3225 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3226 with self.subTest(name=name):
3227 try:
3228 fn = getattr(os, name)
3229 except AttributeError:
3230 continue
3231
Brett Cannon8f96a302016-08-26 19:30:11 -07003232 for path in (str_filename, bytes_filename, str_fspath,
3233 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003234 if path is None:
3235 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003236 with self.subTest(name=name, path=path):
3237 result = fn(path, *extra_args)
3238 if cleanup_fn is not None:
3239 cleanup_fn(result)
3240
3241 with self.assertRaisesRegex(
3242 TypeError, 'should be string, bytes'):
3243 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003244
3245 if allow_fd:
3246 result = fn(fd, *extra_args) # should not fail
3247 if cleanup_fn is not None:
3248 cleanup_fn(result)
3249 else:
3250 with self.assertRaisesRegex(
3251 TypeError,
3252 'os.PathLike'):
3253 fn(fd, *extra_args)
3254
3255
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003256@unittest.skipUnless(hasattr(os, 'get_blocking'),
3257 'needs os.get_blocking() and os.set_blocking()')
3258class BlockingTests(unittest.TestCase):
3259 def test_blocking(self):
3260 fd = os.open(__file__, os.O_RDONLY)
3261 self.addCleanup(os.close, fd)
3262 self.assertEqual(os.get_blocking(fd), True)
3263
3264 os.set_blocking(fd, False)
3265 self.assertEqual(os.get_blocking(fd), False)
3266
3267 os.set_blocking(fd, True)
3268 self.assertEqual(os.get_blocking(fd), True)
3269
3270
Yury Selivanov97e2e062014-09-26 12:33:06 -04003271
3272class ExportsTests(unittest.TestCase):
3273 def test_os_all(self):
3274 self.assertIn('open', os.__all__)
3275 self.assertIn('walk', os.__all__)
3276
3277
Victor Stinner6036e442015-03-08 01:58:04 +01003278class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003279 check_no_resource_warning = support.check_no_resource_warning
3280
Victor Stinner6036e442015-03-08 01:58:04 +01003281 def setUp(self):
3282 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003283 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003284 self.addCleanup(support.rmtree, self.path)
3285 os.mkdir(self.path)
3286
3287 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003288 path = self.bytes_path if isinstance(name, bytes) else self.path
3289 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003290 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003291 return filename
3292
3293 def get_entries(self, names):
3294 entries = dict((entry.name, entry)
3295 for entry in os.scandir(self.path))
3296 self.assertEqual(sorted(entries.keys()), names)
3297 return entries
3298
3299 def assert_stat_equal(self, stat1, stat2, skip_fields):
3300 if skip_fields:
3301 for attr in dir(stat1):
3302 if not attr.startswith("st_"):
3303 continue
3304 if attr in ("st_dev", "st_ino", "st_nlink"):
3305 continue
3306 self.assertEqual(getattr(stat1, attr),
3307 getattr(stat2, attr),
3308 (stat1, stat2, attr))
3309 else:
3310 self.assertEqual(stat1, stat2)
3311
3312 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003313 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003314 self.assertEqual(entry.name, name)
3315 self.assertEqual(entry.path, os.path.join(self.path, name))
3316 self.assertEqual(entry.inode(),
3317 os.stat(entry.path, follow_symlinks=False).st_ino)
3318
3319 entry_stat = os.stat(entry.path)
3320 self.assertEqual(entry.is_dir(),
3321 stat.S_ISDIR(entry_stat.st_mode))
3322 self.assertEqual(entry.is_file(),
3323 stat.S_ISREG(entry_stat.st_mode))
3324 self.assertEqual(entry.is_symlink(),
3325 os.path.islink(entry.path))
3326
3327 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3328 self.assertEqual(entry.is_dir(follow_symlinks=False),
3329 stat.S_ISDIR(entry_lstat.st_mode))
3330 self.assertEqual(entry.is_file(follow_symlinks=False),
3331 stat.S_ISREG(entry_lstat.st_mode))
3332
3333 self.assert_stat_equal(entry.stat(),
3334 entry_stat,
3335 os.name == 'nt' and not is_symlink)
3336 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3337 entry_lstat,
3338 os.name == 'nt')
3339
3340 def test_attributes(self):
3341 link = hasattr(os, 'link')
3342 symlink = support.can_symlink()
3343
3344 dirname = os.path.join(self.path, "dir")
3345 os.mkdir(dirname)
3346 filename = self.create_file("file.txt")
3347 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003348 try:
3349 os.link(filename, os.path.join(self.path, "link_file.txt"))
3350 except PermissionError as e:
3351 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003352 if symlink:
3353 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3354 target_is_directory=True)
3355 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3356
3357 names = ['dir', 'file.txt']
3358 if link:
3359 names.append('link_file.txt')
3360 if symlink:
3361 names.extend(('symlink_dir', 'symlink_file.txt'))
3362 entries = self.get_entries(names)
3363
3364 entry = entries['dir']
3365 self.check_entry(entry, 'dir', True, False, False)
3366
3367 entry = entries['file.txt']
3368 self.check_entry(entry, 'file.txt', False, True, False)
3369
3370 if link:
3371 entry = entries['link_file.txt']
3372 self.check_entry(entry, 'link_file.txt', False, True, False)
3373
3374 if symlink:
3375 entry = entries['symlink_dir']
3376 self.check_entry(entry, 'symlink_dir', True, False, True)
3377
3378 entry = entries['symlink_file.txt']
3379 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3380
3381 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003382 path = self.bytes_path if isinstance(name, bytes) else self.path
3383 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003384 self.assertEqual(len(entries), 1)
3385
3386 entry = entries[0]
3387 self.assertEqual(entry.name, name)
3388 return entry
3389
Brett Cannon96881cd2016-06-10 14:37:21 -07003390 def create_file_entry(self, name='file.txt'):
3391 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003392 return self.get_entry(os.path.basename(filename))
3393
3394 def test_current_directory(self):
3395 filename = self.create_file()
3396 old_dir = os.getcwd()
3397 try:
3398 os.chdir(self.path)
3399
3400 # call scandir() without parameter: it must list the content
3401 # of the current directory
3402 entries = dict((entry.name, entry) for entry in os.scandir())
3403 self.assertEqual(sorted(entries.keys()),
3404 [os.path.basename(filename)])
3405 finally:
3406 os.chdir(old_dir)
3407
3408 def test_repr(self):
3409 entry = self.create_file_entry()
3410 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3411
Brett Cannon96881cd2016-06-10 14:37:21 -07003412 def test_fspath_protocol(self):
3413 entry = self.create_file_entry()
3414 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3415
3416 def test_fspath_protocol_bytes(self):
3417 bytes_filename = os.fsencode('bytesfile.txt')
3418 bytes_entry = self.create_file_entry(name=bytes_filename)
3419 fspath = os.fspath(bytes_entry)
3420 self.assertIsInstance(fspath, bytes)
3421 self.assertEqual(fspath,
3422 os.path.join(os.fsencode(self.path),bytes_filename))
3423
Victor Stinner6036e442015-03-08 01:58:04 +01003424 def test_removed_dir(self):
3425 path = os.path.join(self.path, 'dir')
3426
3427 os.mkdir(path)
3428 entry = self.get_entry('dir')
3429 os.rmdir(path)
3430
3431 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3432 if os.name == 'nt':
3433 self.assertTrue(entry.is_dir())
3434 self.assertFalse(entry.is_file())
3435 self.assertFalse(entry.is_symlink())
3436 if os.name == 'nt':
3437 self.assertRaises(FileNotFoundError, entry.inode)
3438 # don't fail
3439 entry.stat()
3440 entry.stat(follow_symlinks=False)
3441 else:
3442 self.assertGreater(entry.inode(), 0)
3443 self.assertRaises(FileNotFoundError, entry.stat)
3444 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3445
3446 def test_removed_file(self):
3447 entry = self.create_file_entry()
3448 os.unlink(entry.path)
3449
3450 self.assertFalse(entry.is_dir())
3451 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3452 if os.name == 'nt':
3453 self.assertTrue(entry.is_file())
3454 self.assertFalse(entry.is_symlink())
3455 if os.name == 'nt':
3456 self.assertRaises(FileNotFoundError, entry.inode)
3457 # don't fail
3458 entry.stat()
3459 entry.stat(follow_symlinks=False)
3460 else:
3461 self.assertGreater(entry.inode(), 0)
3462 self.assertRaises(FileNotFoundError, entry.stat)
3463 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3464
3465 def test_broken_symlink(self):
3466 if not support.can_symlink():
3467 return self.skipTest('cannot create symbolic link')
3468
3469 filename = self.create_file("file.txt")
3470 os.symlink(filename,
3471 os.path.join(self.path, "symlink.txt"))
3472 entries = self.get_entries(['file.txt', 'symlink.txt'])
3473 entry = entries['symlink.txt']
3474 os.unlink(filename)
3475
3476 self.assertGreater(entry.inode(), 0)
3477 self.assertFalse(entry.is_dir())
3478 self.assertFalse(entry.is_file()) # broken symlink returns False
3479 self.assertFalse(entry.is_dir(follow_symlinks=False))
3480 self.assertFalse(entry.is_file(follow_symlinks=False))
3481 self.assertTrue(entry.is_symlink())
3482 self.assertRaises(FileNotFoundError, entry.stat)
3483 # don't fail
3484 entry.stat(follow_symlinks=False)
3485
3486 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003487 self.create_file("file.txt")
3488
3489 path_bytes = os.fsencode(self.path)
3490 entries = list(os.scandir(path_bytes))
3491 self.assertEqual(len(entries), 1, entries)
3492 entry = entries[0]
3493
3494 self.assertEqual(entry.name, b'file.txt')
3495 self.assertEqual(entry.path,
3496 os.fsencode(os.path.join(self.path, 'file.txt')))
3497
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003498 def test_bytes_like(self):
3499 self.create_file("file.txt")
3500
3501 for cls in bytearray, memoryview:
3502 path_bytes = cls(os.fsencode(self.path))
3503 with self.assertWarns(DeprecationWarning):
3504 entries = list(os.scandir(path_bytes))
3505 self.assertEqual(len(entries), 1, entries)
3506 entry = entries[0]
3507
3508 self.assertEqual(entry.name, b'file.txt')
3509 self.assertEqual(entry.path,
3510 os.fsencode(os.path.join(self.path, 'file.txt')))
3511 self.assertIs(type(entry.name), bytes)
3512 self.assertIs(type(entry.path), bytes)
3513
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003514 @unittest.skipUnless(os.listdir in os.supports_fd,
3515 'fd support for listdir required for this test.')
3516 def test_fd(self):
3517 self.assertIn(os.scandir, os.supports_fd)
3518 self.create_file('file.txt')
3519 expected_names = ['file.txt']
3520 if support.can_symlink():
3521 os.symlink('file.txt', os.path.join(self.path, 'link'))
3522 expected_names.append('link')
3523
3524 fd = os.open(self.path, os.O_RDONLY)
3525 try:
3526 with os.scandir(fd) as it:
3527 entries = list(it)
3528 names = [entry.name for entry in entries]
3529 self.assertEqual(sorted(names), expected_names)
3530 self.assertEqual(names, os.listdir(fd))
3531 for entry in entries:
3532 self.assertEqual(entry.path, entry.name)
3533 self.assertEqual(os.fspath(entry), entry.name)
3534 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3535 if os.stat in os.supports_dir_fd:
3536 st = os.stat(entry.name, dir_fd=fd)
3537 self.assertEqual(entry.stat(), st)
3538 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3539 self.assertEqual(entry.stat(follow_symlinks=False), st)
3540 finally:
3541 os.close(fd)
3542
Victor Stinner6036e442015-03-08 01:58:04 +01003543 def test_empty_path(self):
3544 self.assertRaises(FileNotFoundError, os.scandir, '')
3545
3546 def test_consume_iterator_twice(self):
3547 self.create_file("file.txt")
3548 iterator = os.scandir(self.path)
3549
3550 entries = list(iterator)
3551 self.assertEqual(len(entries), 1, entries)
3552
3553 # check than consuming the iterator twice doesn't raise exception
3554 entries2 = list(iterator)
3555 self.assertEqual(len(entries2), 0, entries2)
3556
3557 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003558 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003559 self.assertRaises(TypeError, os.scandir, obj)
3560
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003561 def test_close(self):
3562 self.create_file("file.txt")
3563 self.create_file("file2.txt")
3564 iterator = os.scandir(self.path)
3565 next(iterator)
3566 iterator.close()
3567 # multiple closes
3568 iterator.close()
3569 with self.check_no_resource_warning():
3570 del iterator
3571
3572 def test_context_manager(self):
3573 self.create_file("file.txt")
3574 self.create_file("file2.txt")
3575 with os.scandir(self.path) as iterator:
3576 next(iterator)
3577 with self.check_no_resource_warning():
3578 del iterator
3579
3580 def test_context_manager_close(self):
3581 self.create_file("file.txt")
3582 self.create_file("file2.txt")
3583 with os.scandir(self.path) as iterator:
3584 next(iterator)
3585 iterator.close()
3586
3587 def test_context_manager_exception(self):
3588 self.create_file("file.txt")
3589 self.create_file("file2.txt")
3590 with self.assertRaises(ZeroDivisionError):
3591 with os.scandir(self.path) as iterator:
3592 next(iterator)
3593 1/0
3594 with self.check_no_resource_warning():
3595 del iterator
3596
3597 def test_resource_warning(self):
3598 self.create_file("file.txt")
3599 self.create_file("file2.txt")
3600 iterator = os.scandir(self.path)
3601 next(iterator)
3602 with self.assertWarns(ResourceWarning):
3603 del iterator
3604 support.gc_collect()
3605 # exhausted iterator
3606 iterator = os.scandir(self.path)
3607 list(iterator)
3608 with self.check_no_resource_warning():
3609 del iterator
3610
Victor Stinner6036e442015-03-08 01:58:04 +01003611
Ethan Furmancdc08792016-06-02 15:06:09 -07003612class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003613
3614 # Abstracted so it can be overridden to test pure Python implementation
3615 # if a C version is provided.
3616 fspath = staticmethod(os.fspath)
3617
Ethan Furmancdc08792016-06-02 15:06:09 -07003618 def test_return_bytes(self):
3619 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003620 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003621
3622 def test_return_string(self):
3623 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003624 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003625
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003626 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003627 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003628 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003629
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003630 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003631 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3632 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3633
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003634 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003635 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3636 self.assertTrue(issubclass(FakePath, os.PathLike))
3637 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003638
Ethan Furmancdc08792016-06-02 15:06:09 -07003639 def test_garbage_in_exception_out(self):
3640 vapor = type('blah', (), {})
3641 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003642 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003643
3644 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003645 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003646
Brett Cannon044283a2016-07-15 10:41:49 -07003647 def test_bad_pathlike(self):
3648 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003649 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003650 # __fspath__ attribute that is not callable.
3651 c = type('foo', (), {})
3652 c.__fspath__ = 1
3653 self.assertRaises(TypeError, self.fspath, c())
3654 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003655 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003656 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003657
Victor Stinnerc29b5852017-11-02 07:28:27 -07003658
3659class TimesTests(unittest.TestCase):
3660 def test_times(self):
3661 times = os.times()
3662 self.assertIsInstance(times, os.times_result)
3663
3664 for field in ('user', 'system', 'children_user', 'children_system',
3665 'elapsed'):
3666 value = getattr(times, field)
3667 self.assertIsInstance(value, float)
3668
3669 if os.name == 'nt':
3670 self.assertEqual(times.children_user, 0)
3671 self.assertEqual(times.children_system, 0)
3672 self.assertEqual(times.elapsed, 0)
3673
3674
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003675# Only test if the C version is provided, otherwise TestPEP519 already tested
3676# the pure Python implementation.
3677if hasattr(os, "_fspath"):
3678 class TestPEP519PurePython(TestPEP519):
3679
3680 """Explicitly test the pure Python implementation of os.fspath()."""
3681
3682 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003683
3684
Fred Drake2e2be372001-09-20 21:33:42 +00003685if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003686 unittest.main()