blob: 3f6e48f0c8e69c53f451b33fc8958a1f824b045f [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020064from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Victor Stinnerae39d232016-03-24 17:12:55 +010088def create_file(filename, content=b'content'):
89 with open(filename, "xb", 0) as fp:
90 fp.write(content)
91
92
Thomas Wouters0e3f5912006-08-11 14:57:12 +000093# Tests creating TESTFN
94class FileTests(unittest.TestCase):
95 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000096 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000097 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000098 tearDown = setUp
99
100 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000101 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000102 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000103 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000104
Christian Heimesfdab48e2008-01-20 09:06:41 +0000105 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000106 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
107 # We must allocate two consecutive file descriptors, otherwise
108 # it will mess up other file descriptors (perhaps even the three
109 # standard ones).
110 second = os.dup(first)
111 try:
112 retries = 0
113 while second != first + 1:
114 os.close(first)
115 retries += 1
116 if retries > 10:
117 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000118 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000119 first, second = second, os.dup(second)
120 finally:
121 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000122 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000123 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000124 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000126 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000127 def test_rename(self):
128 path = support.TESTFN
129 old = sys.getrefcount(path)
130 self.assertRaises(TypeError, os.rename, path, 0)
131 new = sys.getrefcount(path)
132 self.assertEqual(old, new)
133
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000134 def test_read(self):
135 with open(support.TESTFN, "w+b") as fobj:
136 fobj.write(b"spam")
137 fobj.flush()
138 fd = fobj.fileno()
139 os.lseek(fd, 0, 0)
140 s = os.read(fd, 4)
141 self.assertEqual(type(s), bytes)
142 self.assertEqual(s, b"spam")
143
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200144 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200145 # Skip the test on 32-bit platforms: the number of bytes must fit in a
146 # Py_ssize_t type
147 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
148 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200149 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
150 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200151 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100152 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200153
154 # Issue #21932: Make sure that os.read() does not raise an
155 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200156 with open(support.TESTFN, "rb") as fp:
157 data = os.read(fp.fileno(), size)
158
Victor Stinner8c663fd2017-11-08 14:44:44 -0800159 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200160 # operating system is free to return less bytes than requested.
161 self.assertEqual(data, b'test')
162
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000163 def test_write(self):
164 # os.write() accepts bytes- and buffer-like objects but not strings
165 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
166 self.assertRaises(TypeError, os.write, fd, "beans")
167 os.write(fd, b"bacon\n")
168 os.write(fd, bytearray(b"eggs\n"))
169 os.write(fd, memoryview(b"spam\n"))
170 os.close(fd)
171 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000172 self.assertEqual(fobj.read().splitlines(),
173 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000174
Victor Stinnere0daff12011-03-20 23:36:35 +0100175 def write_windows_console(self, *args):
176 retcode = subprocess.call(args,
177 # use a new console to not flood the test output
178 creationflags=subprocess.CREATE_NEW_CONSOLE,
179 # use a shell to hide the console window (SW_HIDE)
180 shell=True)
181 self.assertEqual(retcode, 0)
182
183 @unittest.skipUnless(sys.platform == 'win32',
184 'test specific to the Windows console')
185 def test_write_windows_console(self):
186 # Issue #11395: the Windows console returns an error (12: not enough
187 # space error) on writing into stdout if stdout mode is binary and the
188 # length is greater than 66,000 bytes (or less, depending on heap
189 # usage).
190 code = "print('x' * 100000)"
191 self.write_windows_console(sys.executable, "-c", code)
192 self.write_windows_console(sys.executable, "-u", "-c", code)
193
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000194 def fdopen_helper(self, *args):
195 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200196 f = os.fdopen(fd, *args)
197 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000198
199 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200200 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
201 os.close(fd)
202
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000203 self.fdopen_helper()
204 self.fdopen_helper('r')
205 self.fdopen_helper('r', 100)
206
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100207 def test_replace(self):
208 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 self.addCleanup(support.unlink, support.TESTFN)
210 self.addCleanup(support.unlink, TESTFN2)
211
212 create_file(support.TESTFN, b"1")
213 create_file(TESTFN2, b"2")
214
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100215 os.replace(support.TESTFN, TESTFN2)
216 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
217 with open(TESTFN2, 'r') as f:
218 self.assertEqual(f.read(), "1")
219
Martin Panterbf19d162015-09-09 01:01:13 +0000220 def test_open_keywords(self):
221 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
222 dir_fd=None)
223 os.close(f)
224
225 def test_symlink_keywords(self):
226 symlink = support.get_attribute(os, "symlink")
227 try:
228 symlink(src='target', dst=support.TESTFN,
229 target_is_directory=False, dir_fd=None)
230 except (NotImplementedError, OSError):
231 pass # No OS support or unprivileged user
232
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200233
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234# Test attributes on return values from os.*stat* family.
235class StatAttributeTests(unittest.TestCase):
236 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200237 self.fname = support.TESTFN
238 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100239 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240
Serhiy Storchaka43767632013-11-03 21:31:38 +0200241 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000242 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000243 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244
245 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(result[stat.ST_SIZE], 3)
247 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 # Make sure all the attributes are there
250 members = dir(result)
251 for name in dir(stat):
252 if name[:3] == 'ST_':
253 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000254 if name.endswith("TIME"):
255 def trunc(x): return int(x)
256 else:
257 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700263 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700264 for name in 'st_atime st_mtime st_ctime'.split():
265 floaty = int(getattr(result, name) * 100000)
266 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700267 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700268
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 try:
270 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 except IndexError:
273 pass
274
275 # Make sure that assignment fails
276 try:
277 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200278 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000279 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000280 pass
281
282 try:
283 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200284 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000285 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 pass
287
288 try:
289 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except AttributeError:
292 pass
293
294 # Use the stat_result constructor with a too-short tuple.
295 try:
296 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200297 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000298 except TypeError:
299 pass
300
Ezio Melotti42da6632011-03-15 05:18:48 +0200301 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000302 try:
303 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
304 except TypeError:
305 pass
306
Antoine Pitrou38425292010-09-21 18:19:07 +0000307 def test_stat_attributes(self):
308 self.check_stat_attributes(self.fname)
309
310 def test_stat_attributes_bytes(self):
311 try:
312 fname = self.fname.encode(sys.getfilesystemencoding())
313 except UnicodeEncodeError:
314 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700329 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100340 self.assertTrue(isinstance(result.f_fsid, int))
341
342 # Test that the size of the tuple doesn't change
343 self.assertEqual(len(result), 10)
344
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700374 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Berker Peksag0b4dc482016-09-17 15:49:59 +0300434 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
435 def test_access_denied(self):
436 # Default to FindFirstFile WIN32_FIND_DATA when access is
437 # denied. See issue 28075.
438 # os.environ['TEMP'] should be located on a volume that
439 # supports file ACLs.
440 fname = os.path.join(os.environ['TEMP'], self.fname)
441 self.addCleanup(support.unlink, fname)
442 create_file(fname, b'ABC')
443 # Deny the right to [S]YNCHRONIZE on the file to
444 # force CreateFile to fail with ERROR_ACCESS_DENIED.
445 DETACHED_PROCESS = 8
446 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500447 # bpo-30584: Use security identifier *S-1-5-32-545 instead
448 # of localized "Users" to not depend on the locale.
449 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300450 creationflags=DETACHED_PROCESS
451 )
452 result = os.stat(fname)
453 self.assertNotEqual(result.st_size, 0)
454
Victor Stinner47aacc82015-06-12 17:26:23 +0200455
456class UtimeTests(unittest.TestCase):
457 def setUp(self):
458 self.dirname = support.TESTFN
459 self.fname = os.path.join(self.dirname, "f1")
460
461 self.addCleanup(support.rmtree, self.dirname)
462 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100463 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200464
Victor Stinner47aacc82015-06-12 17:26:23 +0200465 def support_subsecond(self, filename):
466 # Heuristic to check if the filesystem supports timestamp with
467 # subsecond resolution: check if float and int timestamps are different
468 st = os.stat(filename)
469 return ((st.st_atime != st[7])
470 or (st.st_mtime != st[8])
471 or (st.st_ctime != st[9]))
472
473 def _test_utime(self, set_time, filename=None):
474 if not filename:
475 filename = self.fname
476
477 support_subsecond = self.support_subsecond(filename)
478 if support_subsecond:
479 # Timestamp with a resolution of 1 microsecond (10^-6).
480 #
481 # The resolution of the C internal function used by os.utime()
482 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
483 # test with a resolution of 1 ns requires more work:
484 # see the issue #15745.
485 atime_ns = 1002003000 # 1.002003 seconds
486 mtime_ns = 4005006000 # 4.005006 seconds
487 else:
488 # use a resolution of 1 second
489 atime_ns = 5 * 10**9
490 mtime_ns = 8 * 10**9
491
492 set_time(filename, (atime_ns, mtime_ns))
493 st = os.stat(filename)
494
495 if support_subsecond:
496 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
497 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
498 else:
499 self.assertEqual(st.st_atime, atime_ns * 1e-9)
500 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
501 self.assertEqual(st.st_atime_ns, atime_ns)
502 self.assertEqual(st.st_mtime_ns, mtime_ns)
503
504 def test_utime(self):
505 def set_time(filename, ns):
506 # test the ns keyword parameter
507 os.utime(filename, ns=ns)
508 self._test_utime(set_time)
509
510 @staticmethod
511 def ns_to_sec(ns):
512 # Convert a number of nanosecond (int) to a number of seconds (float).
513 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
514 # issue, os.utime() rounds towards minus infinity.
515 return (ns * 1e-9) + 0.5e-9
516
517 def test_utime_by_indexed(self):
518 # pass times as floating point seconds as the second indexed parameter
519 def set_time(filename, ns):
520 atime_ns, mtime_ns = ns
521 atime = self.ns_to_sec(atime_ns)
522 mtime = self.ns_to_sec(mtime_ns)
523 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
524 # or utime(time_t)
525 os.utime(filename, (atime, mtime))
526 self._test_utime(set_time)
527
528 def test_utime_by_times(self):
529 def set_time(filename, ns):
530 atime_ns, mtime_ns = ns
531 atime = self.ns_to_sec(atime_ns)
532 mtime = self.ns_to_sec(mtime_ns)
533 # test the times keyword parameter
534 os.utime(filename, times=(atime, mtime))
535 self._test_utime(set_time)
536
537 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
538 "follow_symlinks support for utime required "
539 "for this test.")
540 def test_utime_nofollow_symlinks(self):
541 def set_time(filename, ns):
542 # use follow_symlinks=False to test utimensat(timespec)
543 # or lutimes(timeval)
544 os.utime(filename, ns=ns, follow_symlinks=False)
545 self._test_utime(set_time)
546
547 @unittest.skipUnless(os.utime in os.supports_fd,
548 "fd support for utime required for this test.")
549 def test_utime_fd(self):
550 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100551 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200552 # use a file descriptor to test futimens(timespec)
553 # or futimes(timeval)
554 os.utime(fp.fileno(), ns=ns)
555 self._test_utime(set_time)
556
557 @unittest.skipUnless(os.utime in os.supports_dir_fd,
558 "dir_fd support for utime required for this test.")
559 def test_utime_dir_fd(self):
560 def set_time(filename, ns):
561 dirname, name = os.path.split(filename)
562 dirfd = os.open(dirname, os.O_RDONLY)
563 try:
564 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
565 os.utime(name, dir_fd=dirfd, ns=ns)
566 finally:
567 os.close(dirfd)
568 self._test_utime(set_time)
569
570 def test_utime_directory(self):
571 def set_time(filename, ns):
572 # test calling os.utime() on a directory
573 os.utime(filename, ns=ns)
574 self._test_utime(set_time, filename=self.dirname)
575
576 def _test_utime_current(self, set_time):
577 # Get the system clock
578 current = time.time()
579
580 # Call os.utime() to set the timestamp to the current system clock
581 set_time(self.fname)
582
583 if not self.support_subsecond(self.fname):
584 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700585 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200586 # On Windows, the usual resolution of time.time() is 15.6 ms.
587 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700588 #
589 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
590 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200591 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200592 st = os.stat(self.fname)
593 msg = ("st_time=%r, current=%r, dt=%r"
594 % (st.st_mtime, current, st.st_mtime - current))
595 self.assertAlmostEqual(st.st_mtime, current,
596 delta=delta, msg=msg)
597
598 def test_utime_current(self):
599 def set_time(filename):
600 # Set to the current time in the new way
601 os.utime(self.fname)
602 self._test_utime_current(set_time)
603
604 def test_utime_current_old(self):
605 def set_time(filename):
606 # Set to the current time in the old explicit way.
607 os.utime(self.fname, None)
608 self._test_utime_current(set_time)
609
610 def get_file_system(self, path):
611 if sys.platform == 'win32':
612 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
613 import ctypes
614 kernel32 = ctypes.windll.kernel32
615 buf = ctypes.create_unicode_buffer("", 100)
616 ok = kernel32.GetVolumeInformationW(root, None, 0,
617 None, None, None,
618 buf, len(buf))
619 if ok:
620 return buf.value
621 # return None if the filesystem is unknown
622
623 def test_large_time(self):
624 # Many filesystems are limited to the year 2038. At least, the test
625 # pass with NTFS filesystem.
626 if self.get_file_system(self.dirname) != "NTFS":
627 self.skipTest("requires NTFS")
628
629 large = 5000000000 # some day in 2128
630 os.utime(self.fname, (large, large))
631 self.assertEqual(os.stat(self.fname).st_mtime, large)
632
633 def test_utime_invalid_arguments(self):
634 # seconds and nanoseconds parameters are mutually exclusive
635 with self.assertRaises(ValueError):
636 os.utime(self.fname, (5, 5), ns=(5, 5))
637
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300638 @support.cpython_only
639 def test_issue31577(self):
640 # The interpreter shouldn't crash in case utime() received a bad
641 # ns argument.
642 def get_bad_int(divmod_ret_val):
643 class BadInt:
644 def __divmod__(*args):
645 return divmod_ret_val
646 return BadInt()
647 with self.assertRaises(TypeError):
648 os.utime(self.fname, ns=(get_bad_int(42), 1))
649 with self.assertRaises(TypeError):
650 os.utime(self.fname, ns=(get_bad_int(()), 1))
651 with self.assertRaises(TypeError):
652 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
653
Victor Stinner47aacc82015-06-12 17:26:23 +0200654
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000655from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000656
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000657class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000658 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000659 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000660
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000661 def setUp(self):
662 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000663 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000664 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000665 for key, value in self._reference().items():
666 os.environ[key] = value
667
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000668 def tearDown(self):
669 os.environ.clear()
670 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000671 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000672 os.environb.clear()
673 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000674
Christian Heimes90333392007-11-01 19:08:42 +0000675 def _reference(self):
676 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
677
678 def _empty_mapping(self):
679 os.environ.clear()
680 return os.environ
681
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000682 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200683 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
684 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000685 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000686 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300687 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200688 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300689 value = popen.read().strip()
690 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000691
Xavier de Gayed1415312016-07-22 12:15:29 +0200692 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
693 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000694 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200695 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
696 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300697 it = iter(popen)
698 self.assertEqual(next(it), "line1\n")
699 self.assertEqual(next(it), "line2\n")
700 self.assertEqual(next(it), "line3\n")
701 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000702
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000703 # Verify environ keys and values from the OS are of the
704 # correct str type.
705 def test_keyvalue_types(self):
706 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000707 self.assertEqual(type(key), str)
708 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000709
Christian Heimes90333392007-11-01 19:08:42 +0000710 def test_items(self):
711 for key, value in self._reference().items():
712 self.assertEqual(os.environ.get(key), value)
713
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000714 # Issue 7310
715 def test___repr__(self):
716 """Check that the repr() of os.environ looks like environ({...})."""
717 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000718 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
719 '{!r}: {!r}'.format(key, value)
720 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000721
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000722 def test_get_exec_path(self):
723 defpath_list = os.defpath.split(os.pathsep)
724 test_path = ['/monty', '/python', '', '/flying/circus']
725 test_env = {'PATH': os.pathsep.join(test_path)}
726
727 saved_environ = os.environ
728 try:
729 os.environ = dict(test_env)
730 # Test that defaulting to os.environ works.
731 self.assertSequenceEqual(test_path, os.get_exec_path())
732 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
733 finally:
734 os.environ = saved_environ
735
736 # No PATH environment variable
737 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
738 # Empty PATH environment variable
739 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
740 # Supplied PATH environment variable
741 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
742
Victor Stinnerb745a742010-05-18 17:17:23 +0000743 if os.supports_bytes_environ:
744 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000745 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000746 # ignore BytesWarning warning
747 with warnings.catch_warnings(record=True):
748 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000749 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000750 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000751 pass
752 else:
753 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000754
755 # bytes key and/or value
756 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
757 ['abc'])
758 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
759 ['abc'])
760 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
761 ['abc'])
762
763 @unittest.skipUnless(os.supports_bytes_environ,
764 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000765 def test_environb(self):
766 # os.environ -> os.environb
767 value = 'euro\u20ac'
768 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000769 value_bytes = value.encode(sys.getfilesystemencoding(),
770 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000771 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000772 msg = "U+20AC character is not encodable to %s" % (
773 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000774 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000775 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000776 self.assertEqual(os.environ['unicode'], value)
777 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000778
779 # os.environb -> os.environ
780 value = b'\xff'
781 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000782 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000783 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000784 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000785
Victor Stinner13ff2452018-01-22 18:32:50 +0100786 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100787 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100788 def test_unset_error(self):
789 if sys.platform == "win32":
790 # an environment variable is limited to 32,767 characters
791 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100792 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100793 else:
794 # "=" is not allowed in a variable name
795 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100796 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100797
Victor Stinner6d101392013-04-14 16:35:04 +0200798 def test_key_type(self):
799 missing = 'missingkey'
800 self.assertNotIn(missing, os.environ)
801
Victor Stinner839e5ea2013-04-14 16:43:03 +0200802 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200803 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200804 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200805 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200806
Victor Stinner839e5ea2013-04-14 16:43:03 +0200807 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200808 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200809 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200810 self.assertTrue(cm.exception.__suppress_context__)
811
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300812 def _test_environ_iteration(self, collection):
813 iterator = iter(collection)
814 new_key = "__new_key__"
815
816 next(iterator) # start iteration over os.environ.items
817
818 # add a new key in os.environ mapping
819 os.environ[new_key] = "test_environ_iteration"
820
821 try:
822 next(iterator) # force iteration over modified mapping
823 self.assertEqual(os.environ[new_key], "test_environ_iteration")
824 finally:
825 del os.environ[new_key]
826
827 def test_iter_error_when_changing_os_environ(self):
828 self._test_environ_iteration(os.environ)
829
830 def test_iter_error_when_changing_os_environ_items(self):
831 self._test_environ_iteration(os.environ.items())
832
833 def test_iter_error_when_changing_os_environ_values(self):
834 self._test_environ_iteration(os.environ.values())
835
Victor Stinner6d101392013-04-14 16:35:04 +0200836
Tim Petersc4e09402003-04-25 07:11:48 +0000837class WalkTests(unittest.TestCase):
838 """Tests for os.walk()."""
839
Victor Stinner0561c532015-03-12 10:28:24 +0100840 # Wrapper to hide minor differences between os.walk and os.fwalk
841 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200842 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200843 if 'follow_symlinks' in kwargs:
844 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200845 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100846
Charles-François Natali7372b062012-02-05 15:15:38 +0100847 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100848 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100849 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000850
851 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000852 # TESTFN/
853 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000854 # tmp1
855 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000856 # tmp2
857 # SUB11/ no kids
858 # SUB2/ a file kid and a dirsymlink kid
859 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300860 # SUB21/ not readable
861 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000862 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200863 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300864 # broken_link2
865 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000866 # TEST2/
867 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100868 self.walk_path = join(support.TESTFN, "TEST1")
869 self.sub1_path = join(self.walk_path, "SUB1")
870 self.sub11_path = join(self.sub1_path, "SUB11")
871 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300872 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100873 tmp1_path = join(self.walk_path, "tmp1")
874 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000875 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300876 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100877 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000878 t2_path = join(support.TESTFN, "TEST2")
879 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200880 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300881 broken_link2_path = join(sub2_path, "broken_link2")
882 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000883
884 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100885 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000886 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300887 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000888 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100889
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300890 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100891 with open(path, "x") as f:
892 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000893
Victor Stinner0561c532015-03-12 10:28:24 +0100894 if support.can_symlink():
895 os.symlink(os.path.abspath(t2_path), self.link_path)
896 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300897 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
898 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300899 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300900 ["broken_link", "broken_link2", "broken_link3",
901 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100902 else:
903 self.sub2_tree = (sub2_path, [], ["tmp3"])
904
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300905 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300906 try:
907 os.listdir(sub21_path)
908 except PermissionError:
909 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
910 else:
911 os.chmod(sub21_path, stat.S_IRWXU)
912 os.unlink(tmp5_path)
913 os.rmdir(sub21_path)
914 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300915
Victor Stinner0561c532015-03-12 10:28:24 +0100916 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000917 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300918 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100919
Tim Petersc4e09402003-04-25 07:11:48 +0000920 self.assertEqual(len(all), 4)
921 # We can't know which order SUB1 and SUB2 will appear in.
922 # Not flipped: TESTFN, SUB1, SUB11, SUB2
923 # flipped: TESTFN, SUB2, SUB1, SUB11
924 flipped = all[0][1][0] != "SUB1"
925 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200926 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300927 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100928 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
929 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
930 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
931 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000932
Brett Cannon3f9183b2016-08-26 14:44:48 -0700933 def test_walk_prune(self, walk_path=None):
934 if walk_path is None:
935 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000936 # Prune the search.
937 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700938 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000939 all.append((root, dirs, files))
940 # Don't descend into SUB1.
941 if 'SUB1' in dirs:
942 # Note that this also mutates the dirs we appended to all!
943 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000944
Victor Stinner0561c532015-03-12 10:28:24 +0100945 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200946 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100947
948 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300949 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100950 self.assertEqual(all[1], self.sub2_tree)
951
Brett Cannon3f9183b2016-08-26 14:44:48 -0700952 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200953 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700954
Victor Stinner0561c532015-03-12 10:28:24 +0100955 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000956 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100957 all = list(self.walk(self.walk_path, topdown=False))
958
Victor Stinner53b0a412016-03-26 01:12:36 +0100959 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000960 # We can't know which order SUB1 and SUB2 will appear in.
961 # Not flipped: SUB11, SUB1, SUB2, TESTFN
962 # flipped: SUB2, SUB11, SUB1, TESTFN
963 flipped = all[3][1][0] != "SUB1"
964 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200965 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300966 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100967 self.assertEqual(all[3],
968 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
969 self.assertEqual(all[flipped],
970 (self.sub11_path, [], []))
971 self.assertEqual(all[flipped + 1],
972 (self.sub1_path, ["SUB11"], ["tmp2"]))
973 self.assertEqual(all[2 - 2 * flipped],
974 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000975
Victor Stinner0561c532015-03-12 10:28:24 +0100976 def test_walk_symlink(self):
977 if not support.can_symlink():
978 self.skipTest("need symlink support")
979
980 # Walk, following symlinks.
981 walk_it = self.walk(self.walk_path, follow_symlinks=True)
982 for root, dirs, files in walk_it:
983 if root == self.link_path:
984 self.assertEqual(dirs, [])
985 self.assertEqual(files, ["tmp4"])
986 break
987 else:
988 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000989
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200990 def test_walk_bad_dir(self):
991 # Walk top-down.
992 errors = []
993 walk_it = self.walk(self.walk_path, onerror=errors.append)
994 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300995 self.assertEqual(errors, [])
996 dir1 = 'SUB1'
997 path1 = os.path.join(root, dir1)
998 path1new = os.path.join(root, dir1 + '.new')
999 os.rename(path1, path1new)
1000 try:
1001 roots = [r for r, d, f in walk_it]
1002 self.assertTrue(errors)
1003 self.assertNotIn(path1, roots)
1004 self.assertNotIn(path1new, roots)
1005 for dir2 in dirs:
1006 if dir2 != dir1:
1007 self.assertIn(os.path.join(root, dir2), roots)
1008 finally:
1009 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001010
Charles-François Natali7372b062012-02-05 15:15:38 +01001011
1012@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1013class FwalkTests(WalkTests):
1014 """Tests for os.fwalk()."""
1015
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001016 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001017 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001018 yield (root, dirs, files)
1019
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001020 def fwalk(self, *args, **kwargs):
1021 return os.fwalk(*args, **kwargs)
1022
Larry Hastingsc48fe982012-06-25 04:49:05 -07001023 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1024 """
1025 compare with walk() results.
1026 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001027 walk_kwargs = walk_kwargs.copy()
1028 fwalk_kwargs = fwalk_kwargs.copy()
1029 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1030 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1031 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001032
Charles-François Natali7372b062012-02-05 15:15:38 +01001033 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001034 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001035 expected[root] = (set(dirs), set(files))
1036
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001037 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001038 self.assertIn(root, expected)
1039 self.assertEqual(expected[root], (set(dirs), set(files)))
1040
Larry Hastingsc48fe982012-06-25 04:49:05 -07001041 def test_compare_to_walk(self):
1042 kwargs = {'top': support.TESTFN}
1043 self._compare_to_walk(kwargs, kwargs)
1044
Charles-François Natali7372b062012-02-05 15:15:38 +01001045 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001046 try:
1047 fd = os.open(".", os.O_RDONLY)
1048 walk_kwargs = {'top': support.TESTFN}
1049 fwalk_kwargs = walk_kwargs.copy()
1050 fwalk_kwargs['dir_fd'] = fd
1051 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1052 finally:
1053 os.close(fd)
1054
1055 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001057 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1058 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001059 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001060 # check that the FD is valid
1061 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001062 # redundant check
1063 os.stat(rootfd)
1064 # check that listdir() returns consistent information
1065 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001066
1067 def test_fd_leak(self):
1068 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1069 # we both check that calling fwalk() a large number of times doesn't
1070 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1071 minfd = os.dup(1)
1072 os.close(minfd)
1073 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001074 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001075 pass
1076 newfd = os.dup(1)
1077 self.addCleanup(os.close, newfd)
1078 self.assertEqual(newfd, minfd)
1079
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001080class BytesWalkTests(WalkTests):
1081 """Tests for os.walk() with bytes."""
1082 def walk(self, top, **kwargs):
1083 if 'follow_symlinks' in kwargs:
1084 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1085 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1086 root = os.fsdecode(broot)
1087 dirs = list(map(os.fsdecode, bdirs))
1088 files = list(map(os.fsdecode, bfiles))
1089 yield (root, dirs, files)
1090 bdirs[:] = list(map(os.fsencode, dirs))
1091 bfiles[:] = list(map(os.fsencode, files))
1092
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001093@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1094class BytesFwalkTests(FwalkTests):
1095 """Tests for os.walk() with bytes."""
1096 def fwalk(self, top='.', *args, **kwargs):
1097 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1098 root = os.fsdecode(broot)
1099 dirs = list(map(os.fsdecode, bdirs))
1100 files = list(map(os.fsdecode, bfiles))
1101 yield (root, dirs, files, topfd)
1102 bdirs[:] = list(map(os.fsencode, dirs))
1103 bfiles[:] = list(map(os.fsencode, files))
1104
Charles-François Natali7372b062012-02-05 15:15:38 +01001105
Guido van Rossume7ba4952007-06-06 23:52:48 +00001106class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001107 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109
1110 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001111 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001112 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1113 os.makedirs(path) # Should work
1114 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1115 os.makedirs(path)
1116
1117 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001118 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001119 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1120 os.makedirs(path)
1121 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1122 'dir5', 'dir6')
1123 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001124
Serhiy Storchakae304e332017-03-24 13:27:42 +02001125 def test_mode(self):
1126 with support.temp_umask(0o002):
1127 base = support.TESTFN
1128 parent = os.path.join(base, 'dir1')
1129 path = os.path.join(parent, 'dir2')
1130 os.makedirs(path, 0o555)
1131 self.assertTrue(os.path.exists(path))
1132 self.assertTrue(os.path.isdir(path))
1133 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001134 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1135 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001136
Terry Reedy5a22b652010-12-02 07:05:56 +00001137 def test_exist_ok_existing_directory(self):
1138 path = os.path.join(support.TESTFN, 'dir1')
1139 mode = 0o777
1140 old_mask = os.umask(0o022)
1141 os.makedirs(path, mode)
1142 self.assertRaises(OSError, os.makedirs, path, mode)
1143 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001144 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001145 os.makedirs(path, mode=mode, exist_ok=True)
1146 os.umask(old_mask)
1147
Martin Pantera82642f2015-11-19 04:48:44 +00001148 # Issue #25583: A drive root could raise PermissionError on Windows
1149 os.makedirs(os.path.abspath('/'), exist_ok=True)
1150
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001151 def test_exist_ok_s_isgid_directory(self):
1152 path = os.path.join(support.TESTFN, 'dir1')
1153 S_ISGID = stat.S_ISGID
1154 mode = 0o777
1155 old_mask = os.umask(0o022)
1156 try:
1157 existing_testfn_mode = stat.S_IMODE(
1158 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001159 try:
1160 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001161 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001162 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001163 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1164 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1165 # The os should apply S_ISGID from the parent dir for us, but
1166 # this test need not depend on that behavior. Be explicit.
1167 os.makedirs(path, mode | S_ISGID)
1168 # http://bugs.python.org/issue14992
1169 # Should not fail when the bit is already set.
1170 os.makedirs(path, mode, exist_ok=True)
1171 # remove the bit.
1172 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001173 # May work even when the bit is not already set when demanded.
1174 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001175 finally:
1176 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001177
1178 def test_exist_ok_existing_regular_file(self):
1179 base = support.TESTFN
1180 path = os.path.join(support.TESTFN, 'dir1')
1181 f = open(path, 'w')
1182 f.write('abc')
1183 f.close()
1184 self.assertRaises(OSError, os.makedirs, path)
1185 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1186 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1187 os.remove(path)
1188
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001189 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001190 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001191 'dir4', 'dir5', 'dir6')
1192 # If the tests failed, the bottom-most directory ('../dir6')
1193 # may not have been created, so we look for the outermost directory
1194 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001195 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001196 path = os.path.dirname(path)
1197
1198 os.removedirs(path)
1199
Andrew Svetlov405faed2012-12-25 12:18:09 +02001200
R David Murrayf2ad1732014-12-25 18:36:56 -05001201@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1202class ChownFileTests(unittest.TestCase):
1203
Berker Peksag036a71b2015-07-21 09:29:48 +03001204 @classmethod
1205 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001206 os.mkdir(support.TESTFN)
1207
1208 def test_chown_uid_gid_arguments_must_be_index(self):
1209 stat = os.stat(support.TESTFN)
1210 uid = stat.st_uid
1211 gid = stat.st_gid
1212 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1213 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1214 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1215 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1216 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1217
1218 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1219 def test_chown(self):
1220 gid_1, gid_2 = groups[:2]
1221 uid = os.stat(support.TESTFN).st_uid
1222 os.chown(support.TESTFN, uid, gid_1)
1223 gid = os.stat(support.TESTFN).st_gid
1224 self.assertEqual(gid, gid_1)
1225 os.chown(support.TESTFN, uid, gid_2)
1226 gid = os.stat(support.TESTFN).st_gid
1227 self.assertEqual(gid, gid_2)
1228
1229 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1230 "test needs root privilege and more than one user")
1231 def test_chown_with_root(self):
1232 uid_1, uid_2 = all_users[:2]
1233 gid = os.stat(support.TESTFN).st_gid
1234 os.chown(support.TESTFN, uid_1, gid)
1235 uid = os.stat(support.TESTFN).st_uid
1236 self.assertEqual(uid, uid_1)
1237 os.chown(support.TESTFN, uid_2, gid)
1238 uid = os.stat(support.TESTFN).st_uid
1239 self.assertEqual(uid, uid_2)
1240
1241 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1242 "test needs non-root account and more than one user")
1243 def test_chown_without_permission(self):
1244 uid_1, uid_2 = all_users[:2]
1245 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001246 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001247 os.chown(support.TESTFN, uid_1, gid)
1248 os.chown(support.TESTFN, uid_2, gid)
1249
Berker Peksag036a71b2015-07-21 09:29:48 +03001250 @classmethod
1251 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001252 os.rmdir(support.TESTFN)
1253
1254
Andrew Svetlov405faed2012-12-25 12:18:09 +02001255class RemoveDirsTests(unittest.TestCase):
1256 def setUp(self):
1257 os.makedirs(support.TESTFN)
1258
1259 def tearDown(self):
1260 support.rmtree(support.TESTFN)
1261
1262 def test_remove_all(self):
1263 dira = os.path.join(support.TESTFN, 'dira')
1264 os.mkdir(dira)
1265 dirb = os.path.join(dira, 'dirb')
1266 os.mkdir(dirb)
1267 os.removedirs(dirb)
1268 self.assertFalse(os.path.exists(dirb))
1269 self.assertFalse(os.path.exists(dira))
1270 self.assertFalse(os.path.exists(support.TESTFN))
1271
1272 def test_remove_partial(self):
1273 dira = os.path.join(support.TESTFN, 'dira')
1274 os.mkdir(dira)
1275 dirb = os.path.join(dira, 'dirb')
1276 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001277 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001278 os.removedirs(dirb)
1279 self.assertFalse(os.path.exists(dirb))
1280 self.assertTrue(os.path.exists(dira))
1281 self.assertTrue(os.path.exists(support.TESTFN))
1282
1283 def test_remove_nothing(self):
1284 dira = os.path.join(support.TESTFN, 'dira')
1285 os.mkdir(dira)
1286 dirb = os.path.join(dira, 'dirb')
1287 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001288 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001289 with self.assertRaises(OSError):
1290 os.removedirs(dirb)
1291 self.assertTrue(os.path.exists(dirb))
1292 self.assertTrue(os.path.exists(dira))
1293 self.assertTrue(os.path.exists(support.TESTFN))
1294
1295
Guido van Rossume7ba4952007-06-06 23:52:48 +00001296class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001297 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001298 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001299 f.write(b'hello')
1300 f.close()
1301 with open(os.devnull, 'rb') as f:
1302 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001303
Andrew Svetlov405faed2012-12-25 12:18:09 +02001304
Guido van Rossume7ba4952007-06-06 23:52:48 +00001305class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001306 def test_urandom_length(self):
1307 self.assertEqual(len(os.urandom(0)), 0)
1308 self.assertEqual(len(os.urandom(1)), 1)
1309 self.assertEqual(len(os.urandom(10)), 10)
1310 self.assertEqual(len(os.urandom(100)), 100)
1311 self.assertEqual(len(os.urandom(1000)), 1000)
1312
1313 def test_urandom_value(self):
1314 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001315 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001316 data2 = os.urandom(16)
1317 self.assertNotEqual(data1, data2)
1318
1319 def get_urandom_subprocess(self, count):
1320 code = '\n'.join((
1321 'import os, sys',
1322 'data = os.urandom(%s)' % count,
1323 'sys.stdout.buffer.write(data)',
1324 'sys.stdout.buffer.flush()'))
1325 out = assert_python_ok('-c', code)
1326 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001327 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001328 return stdout
1329
1330 def test_urandom_subprocess(self):
1331 data1 = self.get_urandom_subprocess(16)
1332 data2 = self.get_urandom_subprocess(16)
1333 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001334
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001335
Victor Stinner9b1f4742016-09-06 16:18:52 -07001336@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1337class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001338 @classmethod
1339 def setUpClass(cls):
1340 try:
1341 os.getrandom(1)
1342 except OSError as exc:
1343 if exc.errno == errno.ENOSYS:
1344 # Python compiled on a more recent Linux version
1345 # than the current Linux kernel
1346 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1347 else:
1348 raise
1349
Victor Stinner9b1f4742016-09-06 16:18:52 -07001350 def test_getrandom_type(self):
1351 data = os.getrandom(16)
1352 self.assertIsInstance(data, bytes)
1353 self.assertEqual(len(data), 16)
1354
1355 def test_getrandom0(self):
1356 empty = os.getrandom(0)
1357 self.assertEqual(empty, b'')
1358
1359 def test_getrandom_random(self):
1360 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1361
1362 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1363 # resource /dev/random
1364
1365 def test_getrandom_nonblock(self):
1366 # The call must not fail. Check also that the flag exists
1367 try:
1368 os.getrandom(1, os.GRND_NONBLOCK)
1369 except BlockingIOError:
1370 # System urandom is not initialized yet
1371 pass
1372
1373 def test_getrandom_value(self):
1374 data1 = os.getrandom(16)
1375 data2 = os.getrandom(16)
1376 self.assertNotEqual(data1, data2)
1377
1378
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001379# os.urandom() doesn't use a file descriptor when it is implemented with the
1380# getentropy() function, the getrandom() function or the getrandom() syscall
1381OS_URANDOM_DONT_USE_FD = (
1382 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1383 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1384 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001385
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001386@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1387 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001388class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001389 @unittest.skipUnless(resource, "test requires the resource module")
1390 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001391 # Check urandom() failing when it is not able to open /dev/random.
1392 # We spawn a new process to make the test more robust (if getrlimit()
1393 # failed to restore the file descriptor limit after this, the whole
1394 # test suite would crash; this actually happened on the OS X Tiger
1395 # buildbot).
1396 code = """if 1:
1397 import errno
1398 import os
1399 import resource
1400
1401 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1402 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1403 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001404 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001405 except OSError as e:
1406 assert e.errno == errno.EMFILE, e.errno
1407 else:
1408 raise AssertionError("OSError not raised")
1409 """
1410 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001411
Antoine Pitroue472aea2014-04-26 14:33:03 +02001412 def test_urandom_fd_closed(self):
1413 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1414 # closed.
1415 code = """if 1:
1416 import os
1417 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001418 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001419 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001420 with test.support.SuppressCrashReport():
1421 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001422 sys.stdout.buffer.write(os.urandom(4))
1423 """
1424 rc, out, err = assert_python_ok('-Sc', code)
1425
1426 def test_urandom_fd_reopened(self):
1427 # Issue #21207: urandom() should detect its fd to /dev/urandom
1428 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001429 self.addCleanup(support.unlink, support.TESTFN)
1430 create_file(support.TESTFN, b"x" * 256)
1431
Antoine Pitroue472aea2014-04-26 14:33:03 +02001432 code = """if 1:
1433 import os
1434 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001435 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001436 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001437 with test.support.SuppressCrashReport():
1438 for fd in range(3, 256):
1439 try:
1440 os.close(fd)
1441 except OSError:
1442 pass
1443 else:
1444 # Found the urandom fd (XXX hopefully)
1445 break
1446 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001447 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001448 new_fd = f.fileno()
1449 # Issue #26935: posix allows new_fd and fd to be equal but
1450 # some libc implementations have dup2 return an error in this
1451 # case.
1452 if new_fd != fd:
1453 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001454 sys.stdout.buffer.write(os.urandom(4))
1455 sys.stdout.buffer.write(os.urandom(4))
1456 """.format(TESTFN=support.TESTFN)
1457 rc, out, err = assert_python_ok('-Sc', code)
1458 self.assertEqual(len(out), 8)
1459 self.assertNotEqual(out[0:4], out[4:8])
1460 rc, out2, err2 = assert_python_ok('-Sc', code)
1461 self.assertEqual(len(out2), 8)
1462 self.assertNotEqual(out2, out)
1463
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001464
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001465@contextlib.contextmanager
1466def _execvpe_mockup(defpath=None):
1467 """
1468 Stubs out execv and execve functions when used as context manager.
1469 Records exec calls. The mock execv and execve functions always raise an
1470 exception as they would normally never return.
1471 """
1472 # A list of tuples containing (function name, first arg, args)
1473 # of calls to execv or execve that have been made.
1474 calls = []
1475
1476 def mock_execv(name, *args):
1477 calls.append(('execv', name, args))
1478 raise RuntimeError("execv called")
1479
1480 def mock_execve(name, *args):
1481 calls.append(('execve', name, args))
1482 raise OSError(errno.ENOTDIR, "execve called")
1483
1484 try:
1485 orig_execv = os.execv
1486 orig_execve = os.execve
1487 orig_defpath = os.defpath
1488 os.execv = mock_execv
1489 os.execve = mock_execve
1490 if defpath is not None:
1491 os.defpath = defpath
1492 yield calls
1493 finally:
1494 os.execv = orig_execv
1495 os.execve = orig_execve
1496 os.defpath = orig_defpath
1497
Victor Stinner4659ccf2016-09-14 10:57:00 +02001498
Guido van Rossume7ba4952007-06-06 23:52:48 +00001499class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001500 @unittest.skipIf(USING_LINUXTHREADS,
1501 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001502 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001503 self.assertRaises(OSError, os.execvpe, 'no such app-',
1504 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001505
Steve Dowerbce26262016-11-19 19:17:26 -08001506 def test_execv_with_bad_arglist(self):
1507 self.assertRaises(ValueError, os.execv, 'notepad', ())
1508 self.assertRaises(ValueError, os.execv, 'notepad', [])
1509 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1510 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1511
Thomas Heller6790d602007-08-30 17:15:14 +00001512 def test_execvpe_with_bad_arglist(self):
1513 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001514 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1515 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001516
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001517 @unittest.skipUnless(hasattr(os, '_execvpe'),
1518 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001519 def _test_internal_execvpe(self, test_type):
1520 program_path = os.sep + 'absolutepath'
1521 if test_type is bytes:
1522 program = b'executable'
1523 fullpath = os.path.join(os.fsencode(program_path), program)
1524 native_fullpath = fullpath
1525 arguments = [b'progname', 'arg1', 'arg2']
1526 else:
1527 program = 'executable'
1528 arguments = ['progname', 'arg1', 'arg2']
1529 fullpath = os.path.join(program_path, program)
1530 if os.name != "nt":
1531 native_fullpath = os.fsencode(fullpath)
1532 else:
1533 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001534 env = {'spam': 'beans'}
1535
Victor Stinnerb745a742010-05-18 17:17:23 +00001536 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001537 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001538 self.assertRaises(RuntimeError,
1539 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001540 self.assertEqual(len(calls), 1)
1541 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1542
Victor Stinnerb745a742010-05-18 17:17:23 +00001543 # test os._execvpe() with a relative path:
1544 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001545 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001546 self.assertRaises(OSError,
1547 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001548 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001549 self.assertSequenceEqual(calls[0],
1550 ('execve', native_fullpath, (arguments, env)))
1551
1552 # test os._execvpe() with a relative path:
1553 # os.get_exec_path() reads the 'PATH' variable
1554 with _execvpe_mockup() as calls:
1555 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001556 if test_type is bytes:
1557 env_path[b'PATH'] = program_path
1558 else:
1559 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001560 self.assertRaises(OSError,
1561 os._execvpe, program, arguments, env=env_path)
1562 self.assertEqual(len(calls), 1)
1563 self.assertSequenceEqual(calls[0],
1564 ('execve', native_fullpath, (arguments, env_path)))
1565
1566 def test_internal_execvpe_str(self):
1567 self._test_internal_execvpe(str)
1568 if os.name != "nt":
1569 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001570
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001571 def test_execve_invalid_env(self):
1572 args = [sys.executable, '-c', 'pass']
1573
Ville Skyttä49b27342017-08-03 09:00:59 +03001574 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001575 newenv = os.environ.copy()
1576 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1577 with self.assertRaises(ValueError):
1578 os.execve(args[0], args, newenv)
1579
Ville Skyttä49b27342017-08-03 09:00:59 +03001580 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001581 newenv = os.environ.copy()
1582 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1583 with self.assertRaises(ValueError):
1584 os.execve(args[0], args, newenv)
1585
Ville Skyttä49b27342017-08-03 09:00:59 +03001586 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001587 newenv = os.environ.copy()
1588 newenv["FRUIT=ORANGE"] = "lemon"
1589 with self.assertRaises(ValueError):
1590 os.execve(args[0], args, newenv)
1591
Alexey Izbyshev83460312018-10-20 03:28:22 +03001592 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1593 def test_execve_with_empty_path(self):
1594 # bpo-32890: Check GetLastError() misuse
1595 try:
1596 os.execve('', ['arg'], {})
1597 except OSError as e:
1598 self.assertTrue(e.winerror is None or e.winerror != 0)
1599 else:
1600 self.fail('No OSError raised')
1601
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001602
Serhiy Storchaka43767632013-11-03 21:31:38 +02001603@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001604class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001605 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001606 try:
1607 os.stat(support.TESTFN)
1608 except FileNotFoundError:
1609 exists = False
1610 except OSError as exc:
1611 exists = True
1612 self.fail("file %s must not exist; os.stat failed with %s"
1613 % (support.TESTFN, exc))
1614 else:
1615 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001616
Thomas Wouters477c8d52006-05-27 19:21:47 +00001617 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001618 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001619
1620 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001621 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001622
1623 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001624 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001625
1626 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001627 self.addCleanup(support.unlink, support.TESTFN)
1628
Victor Stinnere77c9742016-03-25 10:28:23 +01001629 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001630 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001631
1632 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001633 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001634
Thomas Wouters477c8d52006-05-27 19:21:47 +00001635 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001636 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001637
Victor Stinnere77c9742016-03-25 10:28:23 +01001638
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001639class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001640 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001641 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1642 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001643 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001644 def get_single(f):
1645 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001646 if hasattr(os, f):
1647 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001648 return helper
1649 for f in singles:
1650 locals()["test_"+f] = get_single(f)
1651
Benjamin Peterson7522c742009-01-19 21:00:09 +00001652 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001653 try:
1654 f(support.make_bad_fd(), *args)
1655 except OSError as e:
1656 self.assertEqual(e.errno, errno.EBADF)
1657 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001658 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001659 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001660
Serhiy Storchaka43767632013-11-03 21:31:38 +02001661 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001667 fd = support.make_bad_fd()
1668 # Make sure none of the descriptors we are about to close are
1669 # currently valid (issue 6542).
1670 for i in range(10):
1671 try: os.fstat(fd+i)
1672 except OSError:
1673 pass
1674 else:
1675 break
1676 if i < 2:
1677 raise unittest.SkipTest(
1678 "Unable to acquire a range of invalid file descriptors")
1679 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001680
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001682 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001686 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001687 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001688
Serhiy Storchaka43767632013-11-03 21:31:38 +02001689 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001690 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001691 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001692
Serhiy Storchaka43767632013-11-03 21:31:38 +02001693 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001694 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001695 self.check(os.pathconf, "PC_NAME_MAX")
1696 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001697
Serhiy Storchaka43767632013-11-03 21:31:38 +02001698 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001699 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001700 self.check(os.truncate, 0)
1701 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001702
Serhiy Storchaka43767632013-11-03 21:31:38 +02001703 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001704 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001705 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001706
Serhiy Storchaka43767632013-11-03 21:31:38 +02001707 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001708 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001709 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001710
Victor Stinner57ddf782014-01-08 15:21:28 +01001711 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1712 def test_readv(self):
1713 buf = bytearray(10)
1714 self.check(os.readv, [buf])
1715
Serhiy Storchaka43767632013-11-03 21:31:38 +02001716 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001717 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001718 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001719
Serhiy Storchaka43767632013-11-03 21:31:38 +02001720 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001721 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001723
Victor Stinner57ddf782014-01-08 15:21:28 +01001724 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1725 def test_writev(self):
1726 self.check(os.writev, [b'abc'])
1727
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001728 def test_inheritable(self):
1729 self.check(os.get_inheritable)
1730 self.check(os.set_inheritable, True)
1731
1732 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1733 'needs os.get_blocking() and os.set_blocking()')
1734 def test_blocking(self):
1735 self.check(os.get_blocking)
1736 self.check(os.set_blocking, True)
1737
Brian Curtin1b9df392010-11-24 20:24:31 +00001738
1739class LinkTests(unittest.TestCase):
1740 def setUp(self):
1741 self.file1 = support.TESTFN
1742 self.file2 = os.path.join(support.TESTFN + "2")
1743
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001744 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001745 for file in (self.file1, self.file2):
1746 if os.path.exists(file):
1747 os.unlink(file)
1748
Brian Curtin1b9df392010-11-24 20:24:31 +00001749 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001750 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001751
xdegaye6a55d092017-11-12 17:57:04 +01001752 try:
1753 os.link(file1, file2)
1754 except PermissionError as e:
1755 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001756 with open(file1, "r") as f1, open(file2, "r") as f2:
1757 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1758
1759 def test_link(self):
1760 self._test_link(self.file1, self.file2)
1761
1762 def test_link_bytes(self):
1763 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1764 bytes(self.file2, sys.getfilesystemencoding()))
1765
Brian Curtinf498b752010-11-30 15:54:04 +00001766 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001767 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001768 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001769 except UnicodeError:
1770 raise unittest.SkipTest("Unable to encode for this platform.")
1771
Brian Curtinf498b752010-11-30 15:54:04 +00001772 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001773 self.file2 = self.file1 + "2"
1774 self._test_link(self.file1, self.file2)
1775
Serhiy Storchaka43767632013-11-03 21:31:38 +02001776@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1777class PosixUidGidTests(unittest.TestCase):
1778 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1779 def test_setuid(self):
1780 if os.getuid() != 0:
1781 self.assertRaises(OSError, os.setuid, 0)
1782 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001783
Serhiy Storchaka43767632013-11-03 21:31:38 +02001784 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1785 def test_setgid(self):
1786 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1787 self.assertRaises(OSError, os.setgid, 0)
1788 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001789
Serhiy Storchaka43767632013-11-03 21:31:38 +02001790 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1791 def test_seteuid(self):
1792 if os.getuid() != 0:
1793 self.assertRaises(OSError, os.seteuid, 0)
1794 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001795
Serhiy Storchaka43767632013-11-03 21:31:38 +02001796 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1797 def test_setegid(self):
1798 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1799 self.assertRaises(OSError, os.setegid, 0)
1800 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001801
Serhiy Storchaka43767632013-11-03 21:31:38 +02001802 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1803 def test_setreuid(self):
1804 if os.getuid() != 0:
1805 self.assertRaises(OSError, os.setreuid, 0, 0)
1806 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1807 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001808
Serhiy Storchaka43767632013-11-03 21:31:38 +02001809 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1810 def test_setreuid_neg1(self):
1811 # Needs to accept -1. We run this in a subprocess to avoid
1812 # altering the test runner's process state (issue8045).
1813 subprocess.check_call([
1814 sys.executable, '-c',
1815 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001816
Serhiy Storchaka43767632013-11-03 21:31:38 +02001817 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1818 def test_setregid(self):
1819 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1820 self.assertRaises(OSError, os.setregid, 0, 0)
1821 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1822 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001823
Serhiy Storchaka43767632013-11-03 21:31:38 +02001824 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1825 def test_setregid_neg1(self):
1826 # Needs to accept -1. We run this in a subprocess to avoid
1827 # altering the test runner's process state (issue8045).
1828 subprocess.check_call([
1829 sys.executable, '-c',
1830 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001831
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1833class Pep383Tests(unittest.TestCase):
1834 def setUp(self):
1835 if support.TESTFN_UNENCODABLE:
1836 self.dir = support.TESTFN_UNENCODABLE
1837 elif support.TESTFN_NONASCII:
1838 self.dir = support.TESTFN_NONASCII
1839 else:
1840 self.dir = support.TESTFN
1841 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001842
Serhiy Storchaka43767632013-11-03 21:31:38 +02001843 bytesfn = []
1844 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001845 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001846 fn = os.fsencode(fn)
1847 except UnicodeEncodeError:
1848 return
1849 bytesfn.append(fn)
1850 add_filename(support.TESTFN_UNICODE)
1851 if support.TESTFN_UNENCODABLE:
1852 add_filename(support.TESTFN_UNENCODABLE)
1853 if support.TESTFN_NONASCII:
1854 add_filename(support.TESTFN_NONASCII)
1855 if not bytesfn:
1856 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001857
Serhiy Storchaka43767632013-11-03 21:31:38 +02001858 self.unicodefn = set()
1859 os.mkdir(self.dir)
1860 try:
1861 for fn in bytesfn:
1862 support.create_empty_file(os.path.join(self.bdir, fn))
1863 fn = os.fsdecode(fn)
1864 if fn in self.unicodefn:
1865 raise ValueError("duplicate filename")
1866 self.unicodefn.add(fn)
1867 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001868 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001869 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001870
Serhiy Storchaka43767632013-11-03 21:31:38 +02001871 def tearDown(self):
1872 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001873
Serhiy Storchaka43767632013-11-03 21:31:38 +02001874 def test_listdir(self):
1875 expected = self.unicodefn
1876 found = set(os.listdir(self.dir))
1877 self.assertEqual(found, expected)
1878 # test listdir without arguments
1879 current_directory = os.getcwd()
1880 try:
1881 os.chdir(os.sep)
1882 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1883 finally:
1884 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001885
Serhiy Storchaka43767632013-11-03 21:31:38 +02001886 def test_open(self):
1887 for fn in self.unicodefn:
1888 f = open(os.path.join(self.dir, fn), 'rb')
1889 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001890
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 @unittest.skipUnless(hasattr(os, 'statvfs'),
1892 "need os.statvfs()")
1893 def test_statvfs(self):
1894 # issue #9645
1895 for fn in self.unicodefn:
1896 # should not fail with file not found error
1897 fullname = os.path.join(self.dir, fn)
1898 os.statvfs(fullname)
1899
1900 def test_stat(self):
1901 for fn in self.unicodefn:
1902 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001903
Brian Curtineb24d742010-04-12 17:16:38 +00001904@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1905class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001906 def _kill(self, sig):
1907 # Start sys.executable as a subprocess and communicate from the
1908 # subprocess to the parent that the interpreter is ready. When it
1909 # becomes ready, send *sig* via os.kill to the subprocess and check
1910 # that the return code is equal to *sig*.
1911 import ctypes
1912 from ctypes import wintypes
1913 import msvcrt
1914
1915 # Since we can't access the contents of the process' stdout until the
1916 # process has exited, use PeekNamedPipe to see what's inside stdout
1917 # without waiting. This is done so we can tell that the interpreter
1918 # is started and running at a point where it could handle a signal.
1919 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1920 PeekNamedPipe.restype = wintypes.BOOL
1921 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1922 ctypes.POINTER(ctypes.c_char), # stdout buf
1923 wintypes.DWORD, # Buffer size
1924 ctypes.POINTER(wintypes.DWORD), # bytes read
1925 ctypes.POINTER(wintypes.DWORD), # bytes avail
1926 ctypes.POINTER(wintypes.DWORD)) # bytes left
1927 msg = "running"
1928 proc = subprocess.Popen([sys.executable, "-c",
1929 "import sys;"
1930 "sys.stdout.write('{}');"
1931 "sys.stdout.flush();"
1932 "input()".format(msg)],
1933 stdout=subprocess.PIPE,
1934 stderr=subprocess.PIPE,
1935 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001936 self.addCleanup(proc.stdout.close)
1937 self.addCleanup(proc.stderr.close)
1938 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001939
1940 count, max = 0, 100
1941 while count < max and proc.poll() is None:
1942 # Create a string buffer to store the result of stdout from the pipe
1943 buf = ctypes.create_string_buffer(len(msg))
1944 # Obtain the text currently in proc.stdout
1945 # Bytes read/avail/left are left as NULL and unused
1946 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1947 buf, ctypes.sizeof(buf), None, None, None)
1948 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1949 if buf.value:
1950 self.assertEqual(msg, buf.value.decode())
1951 break
1952 time.sleep(0.1)
1953 count += 1
1954 else:
1955 self.fail("Did not receive communication from the subprocess")
1956
Brian Curtineb24d742010-04-12 17:16:38 +00001957 os.kill(proc.pid, sig)
1958 self.assertEqual(proc.wait(), sig)
1959
1960 def test_kill_sigterm(self):
1961 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001962 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001963
1964 def test_kill_int(self):
1965 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001966 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001967
1968 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001969 tagname = "test_os_%s" % uuid.uuid1()
1970 m = mmap.mmap(-1, 1, tagname)
1971 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001972 # Run a script which has console control handling enabled.
1973 proc = subprocess.Popen([sys.executable,
1974 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001975 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001976 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1977 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001978 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001979 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001980 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001981 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001982 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001983 count += 1
1984 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001985 # Forcefully kill the process if we weren't able to signal it.
1986 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001987 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001988 os.kill(proc.pid, event)
1989 # proc.send_signal(event) could also be done here.
1990 # Allow time for the signal to be passed and the process to exit.
1991 time.sleep(0.5)
1992 if not proc.poll():
1993 # Forcefully kill the process if we weren't able to signal it.
1994 os.kill(proc.pid, signal.SIGINT)
1995 self.fail("subprocess did not stop on {}".format(name))
1996
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001997 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001998 def test_CTRL_C_EVENT(self):
1999 from ctypes import wintypes
2000 import ctypes
2001
2002 # Make a NULL value by creating a pointer with no argument.
2003 NULL = ctypes.POINTER(ctypes.c_int)()
2004 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2005 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2006 wintypes.BOOL)
2007 SetConsoleCtrlHandler.restype = wintypes.BOOL
2008
2009 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002010 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002011 # by subprocesses.
2012 SetConsoleCtrlHandler(NULL, 0)
2013
2014 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2015
2016 def test_CTRL_BREAK_EVENT(self):
2017 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2018
2019
Brian Curtind40e6f72010-07-08 21:39:08 +00002020@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002021class Win32ListdirTests(unittest.TestCase):
2022 """Test listdir on Windows."""
2023
2024 def setUp(self):
2025 self.created_paths = []
2026 for i in range(2):
2027 dir_name = 'SUB%d' % i
2028 dir_path = os.path.join(support.TESTFN, dir_name)
2029 file_name = 'FILE%d' % i
2030 file_path = os.path.join(support.TESTFN, file_name)
2031 os.makedirs(dir_path)
2032 with open(file_path, 'w') as f:
2033 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2034 self.created_paths.extend([dir_name, file_name])
2035 self.created_paths.sort()
2036
2037 def tearDown(self):
2038 shutil.rmtree(support.TESTFN)
2039
2040 def test_listdir_no_extended_path(self):
2041 """Test when the path is not an "extended" path."""
2042 # unicode
2043 self.assertEqual(
2044 sorted(os.listdir(support.TESTFN)),
2045 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002046
Tim Golden781bbeb2013-10-25 20:24:06 +01002047 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002048 self.assertEqual(
2049 sorted(os.listdir(os.fsencode(support.TESTFN))),
2050 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002051
2052 def test_listdir_extended_path(self):
2053 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002054 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002055 # unicode
2056 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2057 self.assertEqual(
2058 sorted(os.listdir(path)),
2059 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002060
Tim Golden781bbeb2013-10-25 20:24:06 +01002061 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002062 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2063 self.assertEqual(
2064 sorted(os.listdir(path)),
2065 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002066
2067
Berker Peksage0b5b202018-08-15 13:03:41 +03002068@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2069class ReadlinkTests(unittest.TestCase):
2070 filelink = 'readlinktest'
2071 filelink_target = os.path.abspath(__file__)
2072 filelinkb = os.fsencode(filelink)
2073 filelinkb_target = os.fsencode(filelink_target)
2074
2075 def setUp(self):
2076 self.assertTrue(os.path.exists(self.filelink_target))
2077 self.assertTrue(os.path.exists(self.filelinkb_target))
2078 self.assertFalse(os.path.exists(self.filelink))
2079 self.assertFalse(os.path.exists(self.filelinkb))
2080
2081 def test_not_symlink(self):
2082 filelink_target = FakePath(self.filelink_target)
2083 self.assertRaises(OSError, os.readlink, self.filelink_target)
2084 self.assertRaises(OSError, os.readlink, filelink_target)
2085
2086 def test_missing_link(self):
2087 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2088 self.assertRaises(FileNotFoundError, os.readlink,
2089 FakePath('missing-link'))
2090
2091 @support.skip_unless_symlink
2092 def test_pathlike(self):
2093 os.symlink(self.filelink_target, self.filelink)
2094 self.addCleanup(support.unlink, self.filelink)
2095 filelink = FakePath(self.filelink)
2096 self.assertEqual(os.readlink(filelink), self.filelink_target)
2097
2098 @support.skip_unless_symlink
2099 def test_pathlike_bytes(self):
2100 os.symlink(self.filelinkb_target, self.filelinkb)
2101 self.addCleanup(support.unlink, self.filelinkb)
2102 path = os.readlink(FakePath(self.filelinkb))
2103 self.assertEqual(path, self.filelinkb_target)
2104 self.assertIsInstance(path, bytes)
2105
2106 @support.skip_unless_symlink
2107 def test_bytes(self):
2108 os.symlink(self.filelinkb_target, self.filelinkb)
2109 self.addCleanup(support.unlink, self.filelinkb)
2110 path = os.readlink(self.filelinkb)
2111 self.assertEqual(path, self.filelinkb_target)
2112 self.assertIsInstance(path, bytes)
2113
2114
Tim Golden781bbeb2013-10-25 20:24:06 +01002115@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002116@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002117class Win32SymlinkTests(unittest.TestCase):
2118 filelink = 'filelinktest'
2119 filelink_target = os.path.abspath(__file__)
2120 dirlink = 'dirlinktest'
2121 dirlink_target = os.path.dirname(filelink_target)
2122 missing_link = 'missing link'
2123
2124 def setUp(self):
2125 assert os.path.exists(self.dirlink_target)
2126 assert os.path.exists(self.filelink_target)
2127 assert not os.path.exists(self.dirlink)
2128 assert not os.path.exists(self.filelink)
2129 assert not os.path.exists(self.missing_link)
2130
2131 def tearDown(self):
2132 if os.path.exists(self.filelink):
2133 os.remove(self.filelink)
2134 if os.path.exists(self.dirlink):
2135 os.rmdir(self.dirlink)
2136 if os.path.lexists(self.missing_link):
2137 os.remove(self.missing_link)
2138
2139 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002140 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002141 self.assertTrue(os.path.exists(self.dirlink))
2142 self.assertTrue(os.path.isdir(self.dirlink))
2143 self.assertTrue(os.path.islink(self.dirlink))
2144 self.check_stat(self.dirlink, self.dirlink_target)
2145
2146 def test_file_link(self):
2147 os.symlink(self.filelink_target, self.filelink)
2148 self.assertTrue(os.path.exists(self.filelink))
2149 self.assertTrue(os.path.isfile(self.filelink))
2150 self.assertTrue(os.path.islink(self.filelink))
2151 self.check_stat(self.filelink, self.filelink_target)
2152
2153 def _create_missing_dir_link(self):
2154 'Create a "directory" link to a non-existent target'
2155 linkname = self.missing_link
2156 if os.path.lexists(linkname):
2157 os.remove(linkname)
2158 target = r'c:\\target does not exist.29r3c740'
2159 assert not os.path.exists(target)
2160 target_is_dir = True
2161 os.symlink(target, linkname, target_is_dir)
2162
2163 def test_remove_directory_link_to_missing_target(self):
2164 self._create_missing_dir_link()
2165 # For compatibility with Unix, os.remove will check the
2166 # directory status and call RemoveDirectory if the symlink
2167 # was created with target_is_dir==True.
2168 os.remove(self.missing_link)
2169
2170 @unittest.skip("currently fails; consider for improvement")
2171 def test_isdir_on_directory_link_to_missing_target(self):
2172 self._create_missing_dir_link()
2173 # consider having isdir return true for directory links
2174 self.assertTrue(os.path.isdir(self.missing_link))
2175
2176 @unittest.skip("currently fails; consider for improvement")
2177 def test_rmdir_on_directory_link_to_missing_target(self):
2178 self._create_missing_dir_link()
2179 # consider allowing rmdir to remove directory links
2180 os.rmdir(self.missing_link)
2181
2182 def check_stat(self, link, target):
2183 self.assertEqual(os.stat(link), os.stat(target))
2184 self.assertNotEqual(os.lstat(link), os.stat(link))
2185
Brian Curtind25aef52011-06-13 15:16:04 -05002186 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002187 self.assertEqual(os.stat(bytes_link), os.stat(target))
2188 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002189
2190 def test_12084(self):
2191 level1 = os.path.abspath(support.TESTFN)
2192 level2 = os.path.join(level1, "level2")
2193 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002194 self.addCleanup(support.rmtree, level1)
2195
2196 os.mkdir(level1)
2197 os.mkdir(level2)
2198 os.mkdir(level3)
2199
2200 file1 = os.path.abspath(os.path.join(level1, "file1"))
2201 create_file(file1)
2202
2203 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002204 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002205 os.chdir(level2)
2206 link = os.path.join(level2, "link")
2207 os.symlink(os.path.relpath(file1), "link")
2208 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002209
Victor Stinnerae39d232016-03-24 17:12:55 +01002210 # Check os.stat calls from the same dir as the link
2211 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002212
Victor Stinnerae39d232016-03-24 17:12:55 +01002213 # Check os.stat calls from a dir below the link
2214 os.chdir(level1)
2215 self.assertEqual(os.stat(file1),
2216 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002217
Victor Stinnerae39d232016-03-24 17:12:55 +01002218 # Check os.stat calls from a dir above the link
2219 os.chdir(level3)
2220 self.assertEqual(os.stat(file1),
2221 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002222 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002223 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002224
SSE43c34aad2018-02-13 00:10:35 +07002225 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2226 and os.path.exists(r'C:\ProgramData'),
2227 'Test directories not found')
2228 def test_29248(self):
2229 # os.symlink() calls CreateSymbolicLink, which creates
2230 # the reparse data buffer with the print name stored
2231 # first, so the offset is always 0. CreateSymbolicLink
2232 # stores the "PrintName" DOS path (e.g. "C:\") first,
2233 # with an offset of 0, followed by the "SubstituteName"
2234 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2235 # the other hand, seems to have been created manually
2236 # with an inverted order.
2237 target = os.readlink(r'C:\Users\All Users')
2238 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2239
Steve Dower6921e732018-03-05 14:26:08 -08002240 def test_buffer_overflow(self):
2241 # Older versions would have a buffer overflow when detecting
2242 # whether a link source was a directory. This test ensures we
2243 # no longer crash, but does not otherwise validate the behavior
2244 segment = 'X' * 27
2245 path = os.path.join(*[segment] * 10)
2246 test_cases = [
2247 # overflow with absolute src
2248 ('\\' + path, segment),
2249 # overflow dest with relative src
2250 (segment, path),
2251 # overflow when joining src
2252 (path[:180], path[:180]),
2253 ]
2254 for src, dest in test_cases:
2255 try:
2256 os.symlink(src, dest)
2257 except FileNotFoundError:
2258 pass
2259 else:
2260 try:
2261 os.remove(dest)
2262 except OSError:
2263 pass
2264 # Also test with bytes, since that is a separate code path.
2265 try:
2266 os.symlink(os.fsencode(src), os.fsencode(dest))
2267 except FileNotFoundError:
2268 pass
2269 else:
2270 try:
2271 os.remove(dest)
2272 except OSError:
2273 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002274
Tim Golden0321cf22014-05-05 19:46:17 +01002275@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2276class Win32JunctionTests(unittest.TestCase):
2277 junction = 'junctiontest'
2278 junction_target = os.path.dirname(os.path.abspath(__file__))
2279
2280 def setUp(self):
2281 assert os.path.exists(self.junction_target)
2282 assert not os.path.exists(self.junction)
2283
2284 def tearDown(self):
2285 if os.path.exists(self.junction):
2286 # os.rmdir delegates to Windows' RemoveDirectoryW,
2287 # which removes junction points safely.
2288 os.rmdir(self.junction)
2289
2290 def test_create_junction(self):
2291 _winapi.CreateJunction(self.junction_target, self.junction)
2292 self.assertTrue(os.path.exists(self.junction))
2293 self.assertTrue(os.path.isdir(self.junction))
2294
2295 # Junctions are not recognized as links.
2296 self.assertFalse(os.path.islink(self.junction))
2297
2298 def test_unlink_removes_junction(self):
2299 _winapi.CreateJunction(self.junction_target, self.junction)
2300 self.assertTrue(os.path.exists(self.junction))
2301
2302 os.unlink(self.junction)
2303 self.assertFalse(os.path.exists(self.junction))
2304
2305
Jason R. Coombs3a092862013-05-27 23:21:28 -04002306@support.skip_unless_symlink
2307class NonLocalSymlinkTests(unittest.TestCase):
2308
2309 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002310 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002311 Create this structure:
2312
2313 base
2314 \___ some_dir
2315 """
2316 os.makedirs('base/some_dir')
2317
2318 def tearDown(self):
2319 shutil.rmtree('base')
2320
2321 def test_directory_link_nonlocal(self):
2322 """
2323 The symlink target should resolve relative to the link, not relative
2324 to the current directory.
2325
2326 Then, link base/some_link -> base/some_dir and ensure that some_link
2327 is resolved as a directory.
2328
2329 In issue13772, it was discovered that directory detection failed if
2330 the symlink target was not specified relative to the current
2331 directory, which was a defect in the implementation.
2332 """
2333 src = os.path.join('base', 'some_link')
2334 os.symlink('some_dir', src)
2335 assert os.path.isdir(src)
2336
2337
Victor Stinnere8d51452010-08-19 01:05:19 +00002338class FSEncodingTests(unittest.TestCase):
2339 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002340 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2341 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002342
Victor Stinnere8d51452010-08-19 01:05:19 +00002343 def test_identity(self):
2344 # assert fsdecode(fsencode(x)) == x
2345 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2346 try:
2347 bytesfn = os.fsencode(fn)
2348 except UnicodeEncodeError:
2349 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002351
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002352
Brett Cannonefb00c02012-02-29 18:31:31 -05002353
2354class DeviceEncodingTests(unittest.TestCase):
2355
2356 def test_bad_fd(self):
2357 # Return None when an fd doesn't actually exist.
2358 self.assertIsNone(os.device_encoding(123456))
2359
Philip Jenveye308b7c2012-02-29 16:16:15 -08002360 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2361 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002362 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002363 def test_device_encoding(self):
2364 encoding = os.device_encoding(0)
2365 self.assertIsNotNone(encoding)
2366 self.assertTrue(codecs.lookup(encoding))
2367
2368
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002369class PidTests(unittest.TestCase):
2370 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2371 def test_getppid(self):
2372 p = subprocess.Popen([sys.executable, '-c',
2373 'import os; print(os.getppid())'],
2374 stdout=subprocess.PIPE)
2375 stdout, _ = p.communicate()
2376 # We are the parent of our subprocess
2377 self.assertEqual(int(stdout), os.getpid())
2378
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002379 def test_waitpid(self):
2380 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002381 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002382 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002383 status = os.waitpid(pid, 0)
2384 self.assertEqual(status, (pid, 0))
2385
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002386
Victor Stinner4659ccf2016-09-14 10:57:00 +02002387class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002388 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002389 self.exitcode = 17
2390
2391 filename = support.TESTFN
2392 self.addCleanup(support.unlink, filename)
2393
2394 if not with_env:
2395 code = 'import sys; sys.exit(%s)' % self.exitcode
2396 else:
2397 self.env = dict(os.environ)
2398 # create an unique key
2399 self.key = str(uuid.uuid4())
2400 self.env[self.key] = self.key
2401 # read the variable from os.environ to check that it exists
2402 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2403 % (self.key, self.exitcode))
2404
2405 with open(filename, "w") as fp:
2406 fp.write(code)
2407
Berker Peksag81816462016-09-15 20:19:47 +03002408 args = [sys.executable, filename]
2409 if use_bytes:
2410 args = [os.fsencode(a) for a in args]
2411 self.env = {os.fsencode(k): os.fsencode(v)
2412 for k, v in self.env.items()}
2413
2414 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002415
Berker Peksag4af23d72016-09-15 20:32:44 +03002416 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002417 def test_spawnl(self):
2418 args = self.create_args()
2419 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2420 self.assertEqual(exitcode, self.exitcode)
2421
Berker Peksag4af23d72016-09-15 20:32:44 +03002422 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002423 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002424 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002425 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2426 self.assertEqual(exitcode, self.exitcode)
2427
Berker Peksag4af23d72016-09-15 20:32:44 +03002428 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002429 def test_spawnlp(self):
2430 args = self.create_args()
2431 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2432 self.assertEqual(exitcode, self.exitcode)
2433
Berker Peksag4af23d72016-09-15 20:32:44 +03002434 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002435 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002436 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002437 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2438 self.assertEqual(exitcode, self.exitcode)
2439
Berker Peksag4af23d72016-09-15 20:32:44 +03002440 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002441 def test_spawnv(self):
2442 args = self.create_args()
2443 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2444 self.assertEqual(exitcode, self.exitcode)
2445
Berker Peksag4af23d72016-09-15 20:32:44 +03002446 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002447 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002448 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002449 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2450 self.assertEqual(exitcode, self.exitcode)
2451
Berker Peksag4af23d72016-09-15 20:32:44 +03002452 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002453 def test_spawnvp(self):
2454 args = self.create_args()
2455 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2456 self.assertEqual(exitcode, self.exitcode)
2457
Berker Peksag4af23d72016-09-15 20:32:44 +03002458 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002459 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002460 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002461 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2462 self.assertEqual(exitcode, self.exitcode)
2463
Berker Peksag4af23d72016-09-15 20:32:44 +03002464 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002465 def test_nowait(self):
2466 args = self.create_args()
2467 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2468 result = os.waitpid(pid, 0)
2469 self.assertEqual(result[0], pid)
2470 status = result[1]
2471 if hasattr(os, 'WIFEXITED'):
2472 self.assertTrue(os.WIFEXITED(status))
2473 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2474 else:
2475 self.assertEqual(status, self.exitcode << 8)
2476
Berker Peksag4af23d72016-09-15 20:32:44 +03002477 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002478 def test_spawnve_bytes(self):
2479 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2480 args = self.create_args(with_env=True, use_bytes=True)
2481 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2482 self.assertEqual(exitcode, self.exitcode)
2483
Steve Dower859fd7b2016-11-19 18:53:19 -08002484 @requires_os_func('spawnl')
2485 def test_spawnl_noargs(self):
2486 args = self.create_args()
2487 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002488 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002489
2490 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002491 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002492 args = self.create_args()
2493 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002494 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002495
2496 @requires_os_func('spawnv')
2497 def test_spawnv_noargs(self):
2498 args = self.create_args()
2499 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2500 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002501 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2502 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002503
2504 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002505 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002506 args = self.create_args()
2507 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2508 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002509 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2510 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002511
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002512 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002513 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002514
Ville Skyttä49b27342017-08-03 09:00:59 +03002515 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002516 newenv = os.environ.copy()
2517 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2518 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002519 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002520 except ValueError:
2521 pass
2522 else:
2523 self.assertEqual(exitcode, 127)
2524
Ville Skyttä49b27342017-08-03 09:00:59 +03002525 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002526 newenv = os.environ.copy()
2527 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2528 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002529 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002530 except ValueError:
2531 pass
2532 else:
2533 self.assertEqual(exitcode, 127)
2534
Ville Skyttä49b27342017-08-03 09:00:59 +03002535 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002536 newenv = os.environ.copy()
2537 newenv["FRUIT=ORANGE"] = "lemon"
2538 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002539 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002540 except ValueError:
2541 pass
2542 else:
2543 self.assertEqual(exitcode, 127)
2544
Ville Skyttä49b27342017-08-03 09:00:59 +03002545 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002546 filename = support.TESTFN
2547 self.addCleanup(support.unlink, filename)
2548 with open(filename, "w") as fp:
2549 fp.write('import sys, os\n'
2550 'if os.getenv("FRUIT") != "orange=lemon":\n'
2551 ' raise AssertionError')
2552 args = [sys.executable, filename]
2553 newenv = os.environ.copy()
2554 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002555 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002556 self.assertEqual(exitcode, 0)
2557
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002558 @requires_os_func('spawnve')
2559 def test_spawnve_invalid_env(self):
2560 self._test_invalid_env(os.spawnve)
2561
2562 @requires_os_func('spawnvpe')
2563 def test_spawnvpe_invalid_env(self):
2564 self._test_invalid_env(os.spawnvpe)
2565
Serhiy Storchaka77703942017-06-25 07:33:01 +03002566
Brian Curtin0151b8e2010-09-24 13:43:43 +00002567# The introduction of this TestCase caused at least two different errors on
2568# *nix buildbots. Temporarily skip this to let the buildbots move along.
2569@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002570@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2571class LoginTests(unittest.TestCase):
2572 def test_getlogin(self):
2573 user_name = os.getlogin()
2574 self.assertNotEqual(len(user_name), 0)
2575
2576
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002577@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2578 "needs os.getpriority and os.setpriority")
2579class ProgramPriorityTests(unittest.TestCase):
2580 """Tests for os.getpriority() and os.setpriority()."""
2581
2582 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002583
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002584 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2585 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2586 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002587 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2588 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002589 raise unittest.SkipTest("unable to reliably test setpriority "
2590 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002591 else:
2592 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002593 finally:
2594 try:
2595 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2596 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002597 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002598 raise
2599
2600
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002601class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002602
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002603 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002604
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002605 def __init__(self, conn):
2606 asynchat.async_chat.__init__(self, conn)
2607 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002608 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002609 self.closed = False
2610 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002611
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612 def handle_read(self):
2613 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002614 if self.accumulate:
2615 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002616
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002617 def get_data(self):
2618 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002619
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002620 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002621 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002622 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002623
2624 def handle_error(self):
2625 raise
2626
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002627 def __init__(self, address):
2628 threading.Thread.__init__(self)
2629 asyncore.dispatcher.__init__(self)
2630 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2631 self.bind(address)
2632 self.listen(5)
2633 self.host, self.port = self.socket.getsockname()[:2]
2634 self.handler_instance = None
2635 self._active = False
2636 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002637
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002638 # --- public API
2639
2640 @property
2641 def running(self):
2642 return self._active
2643
2644 def start(self):
2645 assert not self.running
2646 self.__flag = threading.Event()
2647 threading.Thread.start(self)
2648 self.__flag.wait()
2649
2650 def stop(self):
2651 assert self.running
2652 self._active = False
2653 self.join()
2654
2655 def wait(self):
2656 # wait for handler connection to be closed, then stop the server
2657 while not getattr(self.handler_instance, "closed", False):
2658 time.sleep(0.001)
2659 self.stop()
2660
2661 # --- internals
2662
2663 def run(self):
2664 self._active = True
2665 self.__flag.set()
2666 while self._active and asyncore.socket_map:
2667 self._active_lock.acquire()
2668 asyncore.loop(timeout=0.001, count=1)
2669 self._active_lock.release()
2670 asyncore.close_all()
2671
2672 def handle_accept(self):
2673 conn, addr = self.accept()
2674 self.handler_instance = self.Handler(conn)
2675
2676 def handle_connect(self):
2677 self.close()
2678 handle_read = handle_connect
2679
2680 def writable(self):
2681 return 0
2682
2683 def handle_error(self):
2684 raise
2685
2686
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002687@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2688class TestSendfile(unittest.TestCase):
2689
Victor Stinner8c663fd2017-11-08 14:44:44 -08002690 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002691 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002692 not sys.platform.startswith("solaris") and \
2693 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002694 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2695 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002696 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2697 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002698
2699 @classmethod
2700 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002701 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002702 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002703
2704 @classmethod
2705 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002706 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002707 support.unlink(support.TESTFN)
2708
2709 def setUp(self):
2710 self.server = SendfileTestServer((support.HOST, 0))
2711 self.server.start()
2712 self.client = socket.socket()
2713 self.client.connect((self.server.host, self.server.port))
2714 self.client.settimeout(1)
2715 # synchronize by waiting for "220 ready" response
2716 self.client.recv(1024)
2717 self.sockno = self.client.fileno()
2718 self.file = open(support.TESTFN, 'rb')
2719 self.fileno = self.file.fileno()
2720
2721 def tearDown(self):
2722 self.file.close()
2723 self.client.close()
2724 if self.server.running:
2725 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002726 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002727
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002728 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002729 """A higher level wrapper representing how an application is
2730 supposed to use sendfile().
2731 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002732 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002733 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002734 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002735 except OSError as err:
2736 if err.errno == errno.ECONNRESET:
2737 # disconnected
2738 raise
2739 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2740 # we have to retry send data
2741 continue
2742 else:
2743 raise
2744
2745 def test_send_whole_file(self):
2746 # normal send
2747 total_sent = 0
2748 offset = 0
2749 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002750 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002751 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2752 if sent == 0:
2753 break
2754 offset += sent
2755 total_sent += sent
2756 self.assertTrue(sent <= nbytes)
2757 self.assertEqual(offset, total_sent)
2758
2759 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002760 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002761 self.client.close()
2762 self.server.wait()
2763 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002764 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002765 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002766
2767 def test_send_at_certain_offset(self):
2768 # start sending a file at a certain offset
2769 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002770 offset = len(self.DATA) // 2
2771 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002772 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002773 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002774 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2775 if sent == 0:
2776 break
2777 offset += sent
2778 total_sent += sent
2779 self.assertTrue(sent <= nbytes)
2780
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002781 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002782 self.client.close()
2783 self.server.wait()
2784 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002785 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002786 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002787 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002788 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002789
2790 def test_offset_overflow(self):
2791 # specify an offset > file size
2792 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002793 try:
2794 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2795 except OSError as e:
2796 # Solaris can raise EINVAL if offset >= file length, ignore.
2797 if e.errno != errno.EINVAL:
2798 raise
2799 else:
2800 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002801 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002802 self.client.close()
2803 self.server.wait()
2804 data = self.server.handler_instance.get_data()
2805 self.assertEqual(data, b'')
2806
2807 def test_invalid_offset(self):
2808 with self.assertRaises(OSError) as cm:
2809 os.sendfile(self.sockno, self.fileno, -1, 4096)
2810 self.assertEqual(cm.exception.errno, errno.EINVAL)
2811
Martin Panterbf19d162015-09-09 01:01:13 +00002812 def test_keywords(self):
2813 # Keyword arguments should be supported
2814 os.sendfile(out=self.sockno, offset=0, count=4096,
2815 **{'in': self.fileno})
2816 if self.SUPPORT_HEADERS_TRAILERS:
2817 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002818 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002819
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002820 # --- headers / trailers tests
2821
Serhiy Storchaka43767632013-11-03 21:31:38 +02002822 @requires_headers_trailers
2823 def test_headers(self):
2824 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002825 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002826 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002827 headers=[b"x" * 512, b"y" * 256])
2828 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002829 total_sent += sent
2830 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002831 while total_sent < len(expected_data):
2832 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002833 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2834 offset, nbytes)
2835 if sent == 0:
2836 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002837 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002838 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002839 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002840
Serhiy Storchaka43767632013-11-03 21:31:38 +02002841 self.assertEqual(total_sent, len(expected_data))
2842 self.client.close()
2843 self.server.wait()
2844 data = self.server.handler_instance.get_data()
2845 self.assertEqual(hash(data), hash(expected_data))
2846
2847 @requires_headers_trailers
2848 def test_trailers(self):
2849 TESTFN2 = support.TESTFN + "2"
2850 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002851
2852 self.addCleanup(support.unlink, TESTFN2)
2853 create_file(TESTFN2, file_data)
2854
2855 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002856 os.sendfile(self.sockno, f.fileno(), 0, 5,
2857 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002858 self.client.close()
2859 self.server.wait()
2860 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002861 self.assertEqual(data, b"abcde123456789")
2862
2863 @requires_headers_trailers
2864 @requires_32b
2865 def test_headers_overflow_32bits(self):
2866 self.server.handler_instance.accumulate = False
2867 with self.assertRaises(OSError) as cm:
2868 os.sendfile(self.sockno, self.fileno, 0, 0,
2869 headers=[b"x" * 2**16] * 2**15)
2870 self.assertEqual(cm.exception.errno, errno.EINVAL)
2871
2872 @requires_headers_trailers
2873 @requires_32b
2874 def test_trailers_overflow_32bits(self):
2875 self.server.handler_instance.accumulate = False
2876 with self.assertRaises(OSError) as cm:
2877 os.sendfile(self.sockno, self.fileno, 0, 0,
2878 trailers=[b"x" * 2**16] * 2**15)
2879 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002880
Serhiy Storchaka43767632013-11-03 21:31:38 +02002881 @requires_headers_trailers
2882 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2883 'test needs os.SF_NODISKIO')
2884 def test_flags(self):
2885 try:
2886 os.sendfile(self.sockno, self.fileno, 0, 4096,
2887 flags=os.SF_NODISKIO)
2888 except OSError as err:
2889 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2890 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002891
2892
Larry Hastings9cf065c2012-06-22 16:30:09 -07002893def supports_extended_attributes():
2894 if not hasattr(os, "setxattr"):
2895 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002896
Larry Hastings9cf065c2012-06-22 16:30:09 -07002897 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002898 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002899 try:
2900 os.setxattr(fp.fileno(), b"user.test", b"")
2901 except OSError:
2902 return False
2903 finally:
2904 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002905
2906 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002907
2908
2909@unittest.skipUnless(supports_extended_attributes(),
2910 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002911# Kernels < 2.6.39 don't respect setxattr flags.
2912@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002913class ExtendedAttributeTests(unittest.TestCase):
2914
Larry Hastings9cf065c2012-06-22 16:30:09 -07002915 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002916 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002917 self.addCleanup(support.unlink, fn)
2918 create_file(fn)
2919
Benjamin Peterson799bd802011-08-31 22:15:17 -04002920 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002921 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002922 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002923
Victor Stinnerf12e5062011-10-16 22:12:03 +02002924 init_xattr = listxattr(fn)
2925 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002926
Larry Hastings9cf065c2012-06-22 16:30:09 -07002927 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002928 xattr = set(init_xattr)
2929 xattr.add("user.test")
2930 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002931 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2932 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2933 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002934
Benjamin Peterson799bd802011-08-31 22:15:17 -04002935 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002936 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002937 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002938
Benjamin Peterson799bd802011-08-31 22:15:17 -04002939 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002940 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002941 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002942
Larry Hastings9cf065c2012-06-22 16:30:09 -07002943 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002944 xattr.add("user.test2")
2945 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002946 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002947
Benjamin Peterson799bd802011-08-31 22:15:17 -04002948 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002949 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002950 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002951
Victor Stinnerf12e5062011-10-16 22:12:03 +02002952 xattr.remove("user.test")
2953 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002954 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2955 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2956 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2957 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002958 many = sorted("user.test{}".format(i) for i in range(100))
2959 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002960 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002961 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002962
Larry Hastings9cf065c2012-06-22 16:30:09 -07002963 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002964 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002965 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002966
2967 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2968 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002969
2970 def test_simple(self):
2971 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2972 os.listxattr)
2973
2974 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002975 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2976 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002977
2978 def test_fds(self):
2979 def getxattr(path, *args):
2980 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002981 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002982 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002983 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002984 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002985 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002986 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002987 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002988 def listxattr(path, *args):
2989 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002990 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002991 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2992
2993
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002994@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2995class TermsizeTests(unittest.TestCase):
2996 def test_does_not_crash(self):
2997 """Check if get_terminal_size() returns a meaningful value.
2998
2999 There's no easy portable way to actually check the size of the
3000 terminal, so let's check if it returns something sensible instead.
3001 """
3002 try:
3003 size = os.get_terminal_size()
3004 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003005 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003006 # Under win32 a generic OSError can be thrown if the
3007 # handle cannot be retrieved
3008 self.skipTest("failed to query terminal size")
3009 raise
3010
Antoine Pitroucfade362012-02-08 23:48:59 +01003011 self.assertGreaterEqual(size.columns, 0)
3012 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003013
3014 def test_stty_match(self):
3015 """Check if stty returns the same results
3016
3017 stty actually tests stdin, so get_terminal_size is invoked on
3018 stdin explicitly. If stty succeeded, then get_terminal_size()
3019 should work too.
3020 """
3021 try:
3022 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003023 except (FileNotFoundError, subprocess.CalledProcessError,
3024 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003025 self.skipTest("stty invocation failed")
3026 expected = (int(size[1]), int(size[0])) # reversed order
3027
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003028 try:
3029 actual = os.get_terminal_size(sys.__stdin__.fileno())
3030 except OSError as e:
3031 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3032 # Under win32 a generic OSError can be thrown if the
3033 # handle cannot be retrieved
3034 self.skipTest("failed to query terminal size")
3035 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003036 self.assertEqual(expected, actual)
3037
3038
Victor Stinner292c8352012-10-30 02:17:38 +01003039class OSErrorTests(unittest.TestCase):
3040 def setUp(self):
3041 class Str(str):
3042 pass
3043
Victor Stinnerafe17062012-10-31 22:47:43 +01003044 self.bytes_filenames = []
3045 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003046 if support.TESTFN_UNENCODABLE is not None:
3047 decoded = support.TESTFN_UNENCODABLE
3048 else:
3049 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003050 self.unicode_filenames.append(decoded)
3051 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003052 if support.TESTFN_UNDECODABLE is not None:
3053 encoded = support.TESTFN_UNDECODABLE
3054 else:
3055 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003056 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003057 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003058 self.bytes_filenames.append(memoryview(encoded))
3059
3060 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003061
3062 def test_oserror_filename(self):
3063 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003064 (self.filenames, os.chdir,),
3065 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003066 (self.filenames, os.lstat,),
3067 (self.filenames, os.open, os.O_RDONLY),
3068 (self.filenames, os.rmdir,),
3069 (self.filenames, os.stat,),
3070 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003071 ]
3072 if sys.platform == "win32":
3073 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003074 (self.bytes_filenames, os.rename, b"dst"),
3075 (self.bytes_filenames, os.replace, b"dst"),
3076 (self.unicode_filenames, os.rename, "dst"),
3077 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003078 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003079 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003080 else:
3081 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003082 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003083 (self.filenames, os.rename, "dst"),
3084 (self.filenames, os.replace, "dst"),
3085 ))
3086 if hasattr(os, "chown"):
3087 funcs.append((self.filenames, os.chown, 0, 0))
3088 if hasattr(os, "lchown"):
3089 funcs.append((self.filenames, os.lchown, 0, 0))
3090 if hasattr(os, "truncate"):
3091 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003092 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003093 funcs.append((self.filenames, os.chflags, 0))
3094 if hasattr(os, "lchflags"):
3095 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003096 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003097 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003098 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003099 if sys.platform == "win32":
3100 funcs.append((self.bytes_filenames, os.link, b"dst"))
3101 funcs.append((self.unicode_filenames, os.link, "dst"))
3102 else:
3103 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003104 if hasattr(os, "listxattr"):
3105 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003106 (self.filenames, os.listxattr,),
3107 (self.filenames, os.getxattr, "user.test"),
3108 (self.filenames, os.setxattr, "user.test", b'user'),
3109 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003110 ))
3111 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003112 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003113 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003114 if sys.platform == "win32":
3115 funcs.append((self.unicode_filenames, os.readlink,))
3116 else:
3117 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003118
Steve Dowercc16be82016-09-08 10:35:16 -07003119
Victor Stinnerafe17062012-10-31 22:47:43 +01003120 for filenames, func, *func_args in funcs:
3121 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003122 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003123 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003124 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003125 else:
3126 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3127 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003128 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003129 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003130 except UnicodeDecodeError:
3131 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003132 else:
3133 self.fail("No exception thrown by {}".format(func))
3134
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003135class CPUCountTests(unittest.TestCase):
3136 def test_cpu_count(self):
3137 cpus = os.cpu_count()
3138 if cpus is not None:
3139 self.assertIsInstance(cpus, int)
3140 self.assertGreater(cpus, 0)
3141 else:
3142 self.skipTest("Could not determine the number of CPUs")
3143
Victor Stinnerdaf45552013-08-28 00:53:59 +02003144
3145class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003146 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003147 fd = os.open(__file__, os.O_RDONLY)
3148 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003149 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003150
Victor Stinnerdaf45552013-08-28 00:53:59 +02003151 os.set_inheritable(fd, True)
3152 self.assertEqual(os.get_inheritable(fd), True)
3153
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003154 @unittest.skipIf(fcntl is None, "need fcntl")
3155 def test_get_inheritable_cloexec(self):
3156 fd = os.open(__file__, os.O_RDONLY)
3157 self.addCleanup(os.close, fd)
3158 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003159
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003160 # clear FD_CLOEXEC flag
3161 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3162 flags &= ~fcntl.FD_CLOEXEC
3163 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003164
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003165 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003166
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003167 @unittest.skipIf(fcntl is None, "need fcntl")
3168 def test_set_inheritable_cloexec(self):
3169 fd = os.open(__file__, os.O_RDONLY)
3170 self.addCleanup(os.close, fd)
3171 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3172 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003173
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003174 os.set_inheritable(fd, True)
3175 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3176 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003177
Victor Stinnerdaf45552013-08-28 00:53:59 +02003178 def test_open(self):
3179 fd = os.open(__file__, os.O_RDONLY)
3180 self.addCleanup(os.close, fd)
3181 self.assertEqual(os.get_inheritable(fd), False)
3182
3183 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3184 def test_pipe(self):
3185 rfd, wfd = os.pipe()
3186 self.addCleanup(os.close, rfd)
3187 self.addCleanup(os.close, wfd)
3188 self.assertEqual(os.get_inheritable(rfd), False)
3189 self.assertEqual(os.get_inheritable(wfd), False)
3190
3191 def test_dup(self):
3192 fd1 = os.open(__file__, os.O_RDONLY)
3193 self.addCleanup(os.close, fd1)
3194
3195 fd2 = os.dup(fd1)
3196 self.addCleanup(os.close, fd2)
3197 self.assertEqual(os.get_inheritable(fd2), False)
3198
3199 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3200 def test_dup2(self):
3201 fd = os.open(__file__, os.O_RDONLY)
3202 self.addCleanup(os.close, fd)
3203
3204 # inheritable by default
3205 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003206 self.addCleanup(os.close, fd2)
3207 self.assertEqual(os.dup2(fd, fd2), fd2)
3208 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003209
3210 # force non-inheritable
3211 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003212 self.addCleanup(os.close, fd3)
3213 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3214 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003215
3216 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3217 def test_openpty(self):
3218 master_fd, slave_fd = os.openpty()
3219 self.addCleanup(os.close, master_fd)
3220 self.addCleanup(os.close, slave_fd)
3221 self.assertEqual(os.get_inheritable(master_fd), False)
3222 self.assertEqual(os.get_inheritable(slave_fd), False)
3223
3224
Brett Cannon3f9183b2016-08-26 14:44:48 -07003225class PathTConverterTests(unittest.TestCase):
3226 # tuples of (function name, allows fd arguments, additional arguments to
3227 # function, cleanup function)
3228 functions = [
3229 ('stat', True, (), None),
3230 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003231 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003232 ('chflags', False, (0,), None),
3233 ('lchflags', False, (0,), None),
3234 ('open', False, (0,), getattr(os, 'close', None)),
3235 ]
3236
3237 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003238 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003239 if os.name == 'nt':
3240 bytes_fspath = bytes_filename = None
3241 else:
3242 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003243 bytes_fspath = FakePath(bytes_filename)
3244 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003245 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003246 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003247
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003248 int_fspath = FakePath(fd)
3249 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003250
3251 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3252 with self.subTest(name=name):
3253 try:
3254 fn = getattr(os, name)
3255 except AttributeError:
3256 continue
3257
Brett Cannon8f96a302016-08-26 19:30:11 -07003258 for path in (str_filename, bytes_filename, str_fspath,
3259 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003260 if path is None:
3261 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003262 with self.subTest(name=name, path=path):
3263 result = fn(path, *extra_args)
3264 if cleanup_fn is not None:
3265 cleanup_fn(result)
3266
3267 with self.assertRaisesRegex(
3268 TypeError, 'should be string, bytes'):
3269 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003270
3271 if allow_fd:
3272 result = fn(fd, *extra_args) # should not fail
3273 if cleanup_fn is not None:
3274 cleanup_fn(result)
3275 else:
3276 with self.assertRaisesRegex(
3277 TypeError,
3278 'os.PathLike'):
3279 fn(fd, *extra_args)
3280
3281
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003282@unittest.skipUnless(hasattr(os, 'get_blocking'),
3283 'needs os.get_blocking() and os.set_blocking()')
3284class BlockingTests(unittest.TestCase):
3285 def test_blocking(self):
3286 fd = os.open(__file__, os.O_RDONLY)
3287 self.addCleanup(os.close, fd)
3288 self.assertEqual(os.get_blocking(fd), True)
3289
3290 os.set_blocking(fd, False)
3291 self.assertEqual(os.get_blocking(fd), False)
3292
3293 os.set_blocking(fd, True)
3294 self.assertEqual(os.get_blocking(fd), True)
3295
3296
Yury Selivanov97e2e062014-09-26 12:33:06 -04003297
3298class ExportsTests(unittest.TestCase):
3299 def test_os_all(self):
3300 self.assertIn('open', os.__all__)
3301 self.assertIn('walk', os.__all__)
3302
3303
Victor Stinner6036e442015-03-08 01:58:04 +01003304class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003305 check_no_resource_warning = support.check_no_resource_warning
3306
Victor Stinner6036e442015-03-08 01:58:04 +01003307 def setUp(self):
3308 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003309 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003310 self.addCleanup(support.rmtree, self.path)
3311 os.mkdir(self.path)
3312
3313 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003314 path = self.bytes_path if isinstance(name, bytes) else self.path
3315 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003316 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003317 return filename
3318
3319 def get_entries(self, names):
3320 entries = dict((entry.name, entry)
3321 for entry in os.scandir(self.path))
3322 self.assertEqual(sorted(entries.keys()), names)
3323 return entries
3324
3325 def assert_stat_equal(self, stat1, stat2, skip_fields):
3326 if skip_fields:
3327 for attr in dir(stat1):
3328 if not attr.startswith("st_"):
3329 continue
3330 if attr in ("st_dev", "st_ino", "st_nlink"):
3331 continue
3332 self.assertEqual(getattr(stat1, attr),
3333 getattr(stat2, attr),
3334 (stat1, stat2, attr))
3335 else:
3336 self.assertEqual(stat1, stat2)
3337
3338 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003339 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003340 self.assertEqual(entry.name, name)
3341 self.assertEqual(entry.path, os.path.join(self.path, name))
3342 self.assertEqual(entry.inode(),
3343 os.stat(entry.path, follow_symlinks=False).st_ino)
3344
3345 entry_stat = os.stat(entry.path)
3346 self.assertEqual(entry.is_dir(),
3347 stat.S_ISDIR(entry_stat.st_mode))
3348 self.assertEqual(entry.is_file(),
3349 stat.S_ISREG(entry_stat.st_mode))
3350 self.assertEqual(entry.is_symlink(),
3351 os.path.islink(entry.path))
3352
3353 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3354 self.assertEqual(entry.is_dir(follow_symlinks=False),
3355 stat.S_ISDIR(entry_lstat.st_mode))
3356 self.assertEqual(entry.is_file(follow_symlinks=False),
3357 stat.S_ISREG(entry_lstat.st_mode))
3358
3359 self.assert_stat_equal(entry.stat(),
3360 entry_stat,
3361 os.name == 'nt' and not is_symlink)
3362 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3363 entry_lstat,
3364 os.name == 'nt')
3365
3366 def test_attributes(self):
3367 link = hasattr(os, 'link')
3368 symlink = support.can_symlink()
3369
3370 dirname = os.path.join(self.path, "dir")
3371 os.mkdir(dirname)
3372 filename = self.create_file("file.txt")
3373 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003374 try:
3375 os.link(filename, os.path.join(self.path, "link_file.txt"))
3376 except PermissionError as e:
3377 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003378 if symlink:
3379 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3380 target_is_directory=True)
3381 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3382
3383 names = ['dir', 'file.txt']
3384 if link:
3385 names.append('link_file.txt')
3386 if symlink:
3387 names.extend(('symlink_dir', 'symlink_file.txt'))
3388 entries = self.get_entries(names)
3389
3390 entry = entries['dir']
3391 self.check_entry(entry, 'dir', True, False, False)
3392
3393 entry = entries['file.txt']
3394 self.check_entry(entry, 'file.txt', False, True, False)
3395
3396 if link:
3397 entry = entries['link_file.txt']
3398 self.check_entry(entry, 'link_file.txt', False, True, False)
3399
3400 if symlink:
3401 entry = entries['symlink_dir']
3402 self.check_entry(entry, 'symlink_dir', True, False, True)
3403
3404 entry = entries['symlink_file.txt']
3405 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3406
3407 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003408 path = self.bytes_path if isinstance(name, bytes) else self.path
3409 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003410 self.assertEqual(len(entries), 1)
3411
3412 entry = entries[0]
3413 self.assertEqual(entry.name, name)
3414 return entry
3415
Brett Cannon96881cd2016-06-10 14:37:21 -07003416 def create_file_entry(self, name='file.txt'):
3417 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003418 return self.get_entry(os.path.basename(filename))
3419
3420 def test_current_directory(self):
3421 filename = self.create_file()
3422 old_dir = os.getcwd()
3423 try:
3424 os.chdir(self.path)
3425
3426 # call scandir() without parameter: it must list the content
3427 # of the current directory
3428 entries = dict((entry.name, entry) for entry in os.scandir())
3429 self.assertEqual(sorted(entries.keys()),
3430 [os.path.basename(filename)])
3431 finally:
3432 os.chdir(old_dir)
3433
3434 def test_repr(self):
3435 entry = self.create_file_entry()
3436 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3437
Brett Cannon96881cd2016-06-10 14:37:21 -07003438 def test_fspath_protocol(self):
3439 entry = self.create_file_entry()
3440 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3441
3442 def test_fspath_protocol_bytes(self):
3443 bytes_filename = os.fsencode('bytesfile.txt')
3444 bytes_entry = self.create_file_entry(name=bytes_filename)
3445 fspath = os.fspath(bytes_entry)
3446 self.assertIsInstance(fspath, bytes)
3447 self.assertEqual(fspath,
3448 os.path.join(os.fsencode(self.path),bytes_filename))
3449
Victor Stinner6036e442015-03-08 01:58:04 +01003450 def test_removed_dir(self):
3451 path = os.path.join(self.path, 'dir')
3452
3453 os.mkdir(path)
3454 entry = self.get_entry('dir')
3455 os.rmdir(path)
3456
3457 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3458 if os.name == 'nt':
3459 self.assertTrue(entry.is_dir())
3460 self.assertFalse(entry.is_file())
3461 self.assertFalse(entry.is_symlink())
3462 if os.name == 'nt':
3463 self.assertRaises(FileNotFoundError, entry.inode)
3464 # don't fail
3465 entry.stat()
3466 entry.stat(follow_symlinks=False)
3467 else:
3468 self.assertGreater(entry.inode(), 0)
3469 self.assertRaises(FileNotFoundError, entry.stat)
3470 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3471
3472 def test_removed_file(self):
3473 entry = self.create_file_entry()
3474 os.unlink(entry.path)
3475
3476 self.assertFalse(entry.is_dir())
3477 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3478 if os.name == 'nt':
3479 self.assertTrue(entry.is_file())
3480 self.assertFalse(entry.is_symlink())
3481 if os.name == 'nt':
3482 self.assertRaises(FileNotFoundError, entry.inode)
3483 # don't fail
3484 entry.stat()
3485 entry.stat(follow_symlinks=False)
3486 else:
3487 self.assertGreater(entry.inode(), 0)
3488 self.assertRaises(FileNotFoundError, entry.stat)
3489 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3490
3491 def test_broken_symlink(self):
3492 if not support.can_symlink():
3493 return self.skipTest('cannot create symbolic link')
3494
3495 filename = self.create_file("file.txt")
3496 os.symlink(filename,
3497 os.path.join(self.path, "symlink.txt"))
3498 entries = self.get_entries(['file.txt', 'symlink.txt'])
3499 entry = entries['symlink.txt']
3500 os.unlink(filename)
3501
3502 self.assertGreater(entry.inode(), 0)
3503 self.assertFalse(entry.is_dir())
3504 self.assertFalse(entry.is_file()) # broken symlink returns False
3505 self.assertFalse(entry.is_dir(follow_symlinks=False))
3506 self.assertFalse(entry.is_file(follow_symlinks=False))
3507 self.assertTrue(entry.is_symlink())
3508 self.assertRaises(FileNotFoundError, entry.stat)
3509 # don't fail
3510 entry.stat(follow_symlinks=False)
3511
3512 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003513 self.create_file("file.txt")
3514
3515 path_bytes = os.fsencode(self.path)
3516 entries = list(os.scandir(path_bytes))
3517 self.assertEqual(len(entries), 1, entries)
3518 entry = entries[0]
3519
3520 self.assertEqual(entry.name, b'file.txt')
3521 self.assertEqual(entry.path,
3522 os.fsencode(os.path.join(self.path, 'file.txt')))
3523
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003524 def test_bytes_like(self):
3525 self.create_file("file.txt")
3526
3527 for cls in bytearray, memoryview:
3528 path_bytes = cls(os.fsencode(self.path))
3529 with self.assertWarns(DeprecationWarning):
3530 entries = list(os.scandir(path_bytes))
3531 self.assertEqual(len(entries), 1, entries)
3532 entry = entries[0]
3533
3534 self.assertEqual(entry.name, b'file.txt')
3535 self.assertEqual(entry.path,
3536 os.fsencode(os.path.join(self.path, 'file.txt')))
3537 self.assertIs(type(entry.name), bytes)
3538 self.assertIs(type(entry.path), bytes)
3539
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003540 @unittest.skipUnless(os.listdir in os.supports_fd,
3541 'fd support for listdir required for this test.')
3542 def test_fd(self):
3543 self.assertIn(os.scandir, os.supports_fd)
3544 self.create_file('file.txt')
3545 expected_names = ['file.txt']
3546 if support.can_symlink():
3547 os.symlink('file.txt', os.path.join(self.path, 'link'))
3548 expected_names.append('link')
3549
3550 fd = os.open(self.path, os.O_RDONLY)
3551 try:
3552 with os.scandir(fd) as it:
3553 entries = list(it)
3554 names = [entry.name for entry in entries]
3555 self.assertEqual(sorted(names), expected_names)
3556 self.assertEqual(names, os.listdir(fd))
3557 for entry in entries:
3558 self.assertEqual(entry.path, entry.name)
3559 self.assertEqual(os.fspath(entry), entry.name)
3560 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3561 if os.stat in os.supports_dir_fd:
3562 st = os.stat(entry.name, dir_fd=fd)
3563 self.assertEqual(entry.stat(), st)
3564 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3565 self.assertEqual(entry.stat(follow_symlinks=False), st)
3566 finally:
3567 os.close(fd)
3568
Victor Stinner6036e442015-03-08 01:58:04 +01003569 def test_empty_path(self):
3570 self.assertRaises(FileNotFoundError, os.scandir, '')
3571
3572 def test_consume_iterator_twice(self):
3573 self.create_file("file.txt")
3574 iterator = os.scandir(self.path)
3575
3576 entries = list(iterator)
3577 self.assertEqual(len(entries), 1, entries)
3578
3579 # check than consuming the iterator twice doesn't raise exception
3580 entries2 = list(iterator)
3581 self.assertEqual(len(entries2), 0, entries2)
3582
3583 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003584 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003585 self.assertRaises(TypeError, os.scandir, obj)
3586
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003587 def test_close(self):
3588 self.create_file("file.txt")
3589 self.create_file("file2.txt")
3590 iterator = os.scandir(self.path)
3591 next(iterator)
3592 iterator.close()
3593 # multiple closes
3594 iterator.close()
3595 with self.check_no_resource_warning():
3596 del iterator
3597
3598 def test_context_manager(self):
3599 self.create_file("file.txt")
3600 self.create_file("file2.txt")
3601 with os.scandir(self.path) as iterator:
3602 next(iterator)
3603 with self.check_no_resource_warning():
3604 del iterator
3605
3606 def test_context_manager_close(self):
3607 self.create_file("file.txt")
3608 self.create_file("file2.txt")
3609 with os.scandir(self.path) as iterator:
3610 next(iterator)
3611 iterator.close()
3612
3613 def test_context_manager_exception(self):
3614 self.create_file("file.txt")
3615 self.create_file("file2.txt")
3616 with self.assertRaises(ZeroDivisionError):
3617 with os.scandir(self.path) as iterator:
3618 next(iterator)
3619 1/0
3620 with self.check_no_resource_warning():
3621 del iterator
3622
3623 def test_resource_warning(self):
3624 self.create_file("file.txt")
3625 self.create_file("file2.txt")
3626 iterator = os.scandir(self.path)
3627 next(iterator)
3628 with self.assertWarns(ResourceWarning):
3629 del iterator
3630 support.gc_collect()
3631 # exhausted iterator
3632 iterator = os.scandir(self.path)
3633 list(iterator)
3634 with self.check_no_resource_warning():
3635 del iterator
3636
Victor Stinner6036e442015-03-08 01:58:04 +01003637
Ethan Furmancdc08792016-06-02 15:06:09 -07003638class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003639
3640 # Abstracted so it can be overridden to test pure Python implementation
3641 # if a C version is provided.
3642 fspath = staticmethod(os.fspath)
3643
Ethan Furmancdc08792016-06-02 15:06:09 -07003644 def test_return_bytes(self):
3645 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003646 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003647
3648 def test_return_string(self):
3649 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003650 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003651
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003652 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003653 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003654 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003655
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003656 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003657 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3658 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3659
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003660 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003661 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3662 self.assertTrue(issubclass(FakePath, os.PathLike))
3663 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003664
Ethan Furmancdc08792016-06-02 15:06:09 -07003665 def test_garbage_in_exception_out(self):
3666 vapor = type('blah', (), {})
3667 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003668 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003669
3670 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003671 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003672
Brett Cannon044283a2016-07-15 10:41:49 -07003673 def test_bad_pathlike(self):
3674 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003675 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003676 # __fspath__ attribute that is not callable.
3677 c = type('foo', (), {})
3678 c.__fspath__ = 1
3679 self.assertRaises(TypeError, self.fspath, c())
3680 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003681 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003682 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003683
Victor Stinnerc29b5852017-11-02 07:28:27 -07003684
3685class TimesTests(unittest.TestCase):
3686 def test_times(self):
3687 times = os.times()
3688 self.assertIsInstance(times, os.times_result)
3689
3690 for field in ('user', 'system', 'children_user', 'children_system',
3691 'elapsed'):
3692 value = getattr(times, field)
3693 self.assertIsInstance(value, float)
3694
3695 if os.name == 'nt':
3696 self.assertEqual(times.children_user, 0)
3697 self.assertEqual(times.children_system, 0)
3698 self.assertEqual(times.elapsed, 0)
3699
3700
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003701# Only test if the C version is provided, otherwise TestPEP519 already tested
3702# the pure Python implementation.
3703if hasattr(os, "_fspath"):
3704 class TestPEP519PurePython(TestPEP519):
3705
3706 """Explicitly test the pure Python implementation of os.fspath()."""
3707
3708 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003709
3710
Fred Drake2e2be372001-09-20 21:33:42 +00003711if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003712 unittest.main()