blob: 2340b848d9b7022571869566119d7ad74ad94671 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020064from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Victor Stinnerae39d232016-03-24 17:12:55 +010088def create_file(filename, content=b'content'):
89 with open(filename, "xb", 0) as fp:
90 fp.write(content)
91
92
Thomas Wouters0e3f5912006-08-11 14:57:12 +000093# Tests creating TESTFN
94class FileTests(unittest.TestCase):
95 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000096 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000097 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000098 tearDown = setUp
99
100 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000101 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000102 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000103 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000104
Christian Heimesfdab48e2008-01-20 09:06:41 +0000105 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000106 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
107 # We must allocate two consecutive file descriptors, otherwise
108 # it will mess up other file descriptors (perhaps even the three
109 # standard ones).
110 second = os.dup(first)
111 try:
112 retries = 0
113 while second != first + 1:
114 os.close(first)
115 retries += 1
116 if retries > 10:
117 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000118 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000119 first, second = second, os.dup(second)
120 finally:
121 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000122 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000123 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000124 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000126 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000127 def test_rename(self):
128 path = support.TESTFN
129 old = sys.getrefcount(path)
130 self.assertRaises(TypeError, os.rename, path, 0)
131 new = sys.getrefcount(path)
132 self.assertEqual(old, new)
133
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000134 def test_read(self):
135 with open(support.TESTFN, "w+b") as fobj:
136 fobj.write(b"spam")
137 fobj.flush()
138 fd = fobj.fileno()
139 os.lseek(fd, 0, 0)
140 s = os.read(fd, 4)
141 self.assertEqual(type(s), bytes)
142 self.assertEqual(s, b"spam")
143
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200144 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200145 # Skip the test on 32-bit platforms: the number of bytes must fit in a
146 # Py_ssize_t type
147 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
148 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200149 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
150 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200151 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100152 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200153
154 # Issue #21932: Make sure that os.read() does not raise an
155 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200156 with open(support.TESTFN, "rb") as fp:
157 data = os.read(fp.fileno(), size)
158
Victor Stinner8c663fd2017-11-08 14:44:44 -0800159 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200160 # operating system is free to return less bytes than requested.
161 self.assertEqual(data, b'test')
162
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000163 def test_write(self):
164 # os.write() accepts bytes- and buffer-like objects but not strings
165 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
166 self.assertRaises(TypeError, os.write, fd, "beans")
167 os.write(fd, b"bacon\n")
168 os.write(fd, bytearray(b"eggs\n"))
169 os.write(fd, memoryview(b"spam\n"))
170 os.close(fd)
171 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000172 self.assertEqual(fobj.read().splitlines(),
173 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000174
Victor Stinnere0daff12011-03-20 23:36:35 +0100175 def write_windows_console(self, *args):
176 retcode = subprocess.call(args,
177 # use a new console to not flood the test output
178 creationflags=subprocess.CREATE_NEW_CONSOLE,
179 # use a shell to hide the console window (SW_HIDE)
180 shell=True)
181 self.assertEqual(retcode, 0)
182
183 @unittest.skipUnless(sys.platform == 'win32',
184 'test specific to the Windows console')
185 def test_write_windows_console(self):
186 # Issue #11395: the Windows console returns an error (12: not enough
187 # space error) on writing into stdout if stdout mode is binary and the
188 # length is greater than 66,000 bytes (or less, depending on heap
189 # usage).
190 code = "print('x' * 100000)"
191 self.write_windows_console(sys.executable, "-c", code)
192 self.write_windows_console(sys.executable, "-u", "-c", code)
193
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000194 def fdopen_helper(self, *args):
195 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200196 f = os.fdopen(fd, *args)
197 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000198
199 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200200 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
201 os.close(fd)
202
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000203 self.fdopen_helper()
204 self.fdopen_helper('r')
205 self.fdopen_helper('r', 100)
206
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100207 def test_replace(self):
208 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 self.addCleanup(support.unlink, support.TESTFN)
210 self.addCleanup(support.unlink, TESTFN2)
211
212 create_file(support.TESTFN, b"1")
213 create_file(TESTFN2, b"2")
214
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100215 os.replace(support.TESTFN, TESTFN2)
216 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
217 with open(TESTFN2, 'r') as f:
218 self.assertEqual(f.read(), "1")
219
Martin Panterbf19d162015-09-09 01:01:13 +0000220 def test_open_keywords(self):
221 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
222 dir_fd=None)
223 os.close(f)
224
225 def test_symlink_keywords(self):
226 symlink = support.get_attribute(os, "symlink")
227 try:
228 symlink(src='target', dst=support.TESTFN,
229 target_is_directory=False, dir_fd=None)
230 except (NotImplementedError, OSError):
231 pass # No OS support or unprivileged user
232
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200233
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234# Test attributes on return values from os.*stat* family.
235class StatAttributeTests(unittest.TestCase):
236 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200237 self.fname = support.TESTFN
238 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100239 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000242 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243
244 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(result[stat.ST_SIZE], 3)
246 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248 # Make sure all the attributes are there
249 members = dir(result)
250 for name in dir(stat):
251 if name[:3] == 'ST_':
252 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000253 if name.endswith("TIME"):
254 def trunc(x): return int(x)
255 else:
256 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000259 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260
Larry Hastings6fe20b32012-04-19 15:07:49 -0700261 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700262 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700263 for name in 'st_atime st_mtime st_ctime'.split():
264 floaty = int(getattr(result, name) * 100000)
265 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700266 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700267
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268 try:
269 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200270 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271 except IndexError:
272 pass
273
274 # Make sure that assignment fails
275 try:
276 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200277 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000278 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000279 pass
280
281 try:
282 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200283 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000284 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285 pass
286
287 try:
288 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200289 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000290 except AttributeError:
291 pass
292
293 # Use the stat_result constructor with a too-short tuple.
294 try:
295 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200296 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000297 except TypeError:
298 pass
299
Ezio Melotti42da6632011-03-15 05:18:48 +0200300 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 try:
302 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
303 except TypeError:
304 pass
305
Antoine Pitrou38425292010-09-21 18:19:07 +0000306 def test_stat_attributes(self):
307 self.check_stat_attributes(self.fname)
308
309 def test_stat_attributes_bytes(self):
310 try:
311 fname = self.fname.encode(sys.getfilesystemencoding())
312 except UnicodeEncodeError:
313 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700314 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000315
Christian Heimes25827622013-10-12 01:27:08 +0200316 def test_stat_result_pickle(self):
317 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200318 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
319 p = pickle.dumps(result, proto)
320 self.assertIn(b'stat_result', p)
321 if proto < 4:
322 self.assertIn(b'cos\nstat_result\n', p)
323 unpickled = pickle.loads(p)
324 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200325
Serhiy Storchaka43767632013-11-03 21:31:38 +0200326 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000327 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700328 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000329
330 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000331 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000333 # Make sure all the attributes are there.
334 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
335 'ffree', 'favail', 'flag', 'namemax')
336 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000337 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000338
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100339 self.assertTrue(isinstance(result.f_fsid, int))
340
341 # Test that the size of the tuple doesn't change
342 self.assertEqual(len(result), 10)
343
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344 # Make sure that assignment really fails
345 try:
346 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200347 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000348 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000349 pass
350
351 try:
352 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200353 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354 except AttributeError:
355 pass
356
357 # Use the constructor with a too-short tuple.
358 try:
359 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200360 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 except TypeError:
362 pass
363
Ezio Melotti42da6632011-03-15 05:18:48 +0200364 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 try:
366 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
367 except TypeError:
368 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000369
Christian Heimes25827622013-10-12 01:27:08 +0200370 @unittest.skipUnless(hasattr(os, 'statvfs'),
371 "need os.statvfs()")
372 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700373 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200374
Serhiy Storchakabad12572014-12-15 14:03:42 +0200375 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
376 p = pickle.dumps(result, proto)
377 self.assertIn(b'statvfs_result', p)
378 if proto < 4:
379 self.assertIn(b'cos\nstatvfs_result\n', p)
380 unpickled = pickle.loads(p)
381 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200382
Serhiy Storchaka43767632013-11-03 21:31:38 +0200383 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
384 def test_1686475(self):
385 # Verify that an open file can be stat'ed
386 try:
387 os.stat(r"c:\pagefile.sys")
388 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600389 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200390 except OSError as e:
391 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000392
Serhiy Storchaka43767632013-11-03 21:31:38 +0200393 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
394 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
395 def test_15261(self):
396 # Verify that stat'ing a closed fd does not cause crash
397 r, w = os.pipe()
398 try:
399 os.stat(r) # should not raise error
400 finally:
401 os.close(r)
402 os.close(w)
403 with self.assertRaises(OSError) as ctx:
404 os.stat(r)
405 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100406
Zachary Ware63f277b2014-06-19 09:46:37 -0500407 def check_file_attributes(self, result):
408 self.assertTrue(hasattr(result, 'st_file_attributes'))
409 self.assertTrue(isinstance(result.st_file_attributes, int))
410 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
411
412 @unittest.skipUnless(sys.platform == "win32",
413 "st_file_attributes is Win32 specific")
414 def test_file_attributes(self):
415 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
416 result = os.stat(self.fname)
417 self.check_file_attributes(result)
418 self.assertEqual(
419 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
420 0)
421
422 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200423 dirname = support.TESTFN + "dir"
424 os.mkdir(dirname)
425 self.addCleanup(os.rmdir, dirname)
426
427 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500428 self.check_file_attributes(result)
429 self.assertEqual(
430 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
431 stat.FILE_ATTRIBUTE_DIRECTORY)
432
Berker Peksag0b4dc482016-09-17 15:49:59 +0300433 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
434 def test_access_denied(self):
435 # Default to FindFirstFile WIN32_FIND_DATA when access is
436 # denied. See issue 28075.
437 # os.environ['TEMP'] should be located on a volume that
438 # supports file ACLs.
439 fname = os.path.join(os.environ['TEMP'], self.fname)
440 self.addCleanup(support.unlink, fname)
441 create_file(fname, b'ABC')
442 # Deny the right to [S]YNCHRONIZE on the file to
443 # force CreateFile to fail with ERROR_ACCESS_DENIED.
444 DETACHED_PROCESS = 8
445 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500446 # bpo-30584: Use security identifier *S-1-5-32-545 instead
447 # of localized "Users" to not depend on the locale.
448 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300449 creationflags=DETACHED_PROCESS
450 )
451 result = os.stat(fname)
452 self.assertNotEqual(result.st_size, 0)
453
Victor Stinner47aacc82015-06-12 17:26:23 +0200454
455class UtimeTests(unittest.TestCase):
456 def setUp(self):
457 self.dirname = support.TESTFN
458 self.fname = os.path.join(self.dirname, "f1")
459
460 self.addCleanup(support.rmtree, self.dirname)
461 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100462 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200463
Victor Stinner47aacc82015-06-12 17:26:23 +0200464 def support_subsecond(self, filename):
465 # Heuristic to check if the filesystem supports timestamp with
466 # subsecond resolution: check if float and int timestamps are different
467 st = os.stat(filename)
468 return ((st.st_atime != st[7])
469 or (st.st_mtime != st[8])
470 or (st.st_ctime != st[9]))
471
472 def _test_utime(self, set_time, filename=None):
473 if not filename:
474 filename = self.fname
475
476 support_subsecond = self.support_subsecond(filename)
477 if support_subsecond:
478 # Timestamp with a resolution of 1 microsecond (10^-6).
479 #
480 # The resolution of the C internal function used by os.utime()
481 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
482 # test with a resolution of 1 ns requires more work:
483 # see the issue #15745.
484 atime_ns = 1002003000 # 1.002003 seconds
485 mtime_ns = 4005006000 # 4.005006 seconds
486 else:
487 # use a resolution of 1 second
488 atime_ns = 5 * 10**9
489 mtime_ns = 8 * 10**9
490
491 set_time(filename, (atime_ns, mtime_ns))
492 st = os.stat(filename)
493
494 if support_subsecond:
495 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
496 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
497 else:
498 self.assertEqual(st.st_atime, atime_ns * 1e-9)
499 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
500 self.assertEqual(st.st_atime_ns, atime_ns)
501 self.assertEqual(st.st_mtime_ns, mtime_ns)
502
503 def test_utime(self):
504 def set_time(filename, ns):
505 # test the ns keyword parameter
506 os.utime(filename, ns=ns)
507 self._test_utime(set_time)
508
509 @staticmethod
510 def ns_to_sec(ns):
511 # Convert a number of nanosecond (int) to a number of seconds (float).
512 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
513 # issue, os.utime() rounds towards minus infinity.
514 return (ns * 1e-9) + 0.5e-9
515
516 def test_utime_by_indexed(self):
517 # pass times as floating point seconds as the second indexed parameter
518 def set_time(filename, ns):
519 atime_ns, mtime_ns = ns
520 atime = self.ns_to_sec(atime_ns)
521 mtime = self.ns_to_sec(mtime_ns)
522 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
523 # or utime(time_t)
524 os.utime(filename, (atime, mtime))
525 self._test_utime(set_time)
526
527 def test_utime_by_times(self):
528 def set_time(filename, ns):
529 atime_ns, mtime_ns = ns
530 atime = self.ns_to_sec(atime_ns)
531 mtime = self.ns_to_sec(mtime_ns)
532 # test the times keyword parameter
533 os.utime(filename, times=(atime, mtime))
534 self._test_utime(set_time)
535
536 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
537 "follow_symlinks support for utime required "
538 "for this test.")
539 def test_utime_nofollow_symlinks(self):
540 def set_time(filename, ns):
541 # use follow_symlinks=False to test utimensat(timespec)
542 # or lutimes(timeval)
543 os.utime(filename, ns=ns, follow_symlinks=False)
544 self._test_utime(set_time)
545
546 @unittest.skipUnless(os.utime in os.supports_fd,
547 "fd support for utime required for this test.")
548 def test_utime_fd(self):
549 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100550 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200551 # use a file descriptor to test futimens(timespec)
552 # or futimes(timeval)
553 os.utime(fp.fileno(), ns=ns)
554 self._test_utime(set_time)
555
556 @unittest.skipUnless(os.utime in os.supports_dir_fd,
557 "dir_fd support for utime required for this test.")
558 def test_utime_dir_fd(self):
559 def set_time(filename, ns):
560 dirname, name = os.path.split(filename)
561 dirfd = os.open(dirname, os.O_RDONLY)
562 try:
563 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
564 os.utime(name, dir_fd=dirfd, ns=ns)
565 finally:
566 os.close(dirfd)
567 self._test_utime(set_time)
568
569 def test_utime_directory(self):
570 def set_time(filename, ns):
571 # test calling os.utime() on a directory
572 os.utime(filename, ns=ns)
573 self._test_utime(set_time, filename=self.dirname)
574
575 def _test_utime_current(self, set_time):
576 # Get the system clock
577 current = time.time()
578
579 # Call os.utime() to set the timestamp to the current system clock
580 set_time(self.fname)
581
582 if not self.support_subsecond(self.fname):
583 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700584 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200585 # On Windows, the usual resolution of time.time() is 15.6 ms.
586 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700587 #
588 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
589 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200590 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200591 st = os.stat(self.fname)
592 msg = ("st_time=%r, current=%r, dt=%r"
593 % (st.st_mtime, current, st.st_mtime - current))
594 self.assertAlmostEqual(st.st_mtime, current,
595 delta=delta, msg=msg)
596
597 def test_utime_current(self):
598 def set_time(filename):
599 # Set to the current time in the new way
600 os.utime(self.fname)
601 self._test_utime_current(set_time)
602
603 def test_utime_current_old(self):
604 def set_time(filename):
605 # Set to the current time in the old explicit way.
606 os.utime(self.fname, None)
607 self._test_utime_current(set_time)
608
609 def get_file_system(self, path):
610 if sys.platform == 'win32':
611 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
612 import ctypes
613 kernel32 = ctypes.windll.kernel32
614 buf = ctypes.create_unicode_buffer("", 100)
615 ok = kernel32.GetVolumeInformationW(root, None, 0,
616 None, None, None,
617 buf, len(buf))
618 if ok:
619 return buf.value
620 # return None if the filesystem is unknown
621
622 def test_large_time(self):
623 # Many filesystems are limited to the year 2038. At least, the test
624 # pass with NTFS filesystem.
625 if self.get_file_system(self.dirname) != "NTFS":
626 self.skipTest("requires NTFS")
627
628 large = 5000000000 # some day in 2128
629 os.utime(self.fname, (large, large))
630 self.assertEqual(os.stat(self.fname).st_mtime, large)
631
632 def test_utime_invalid_arguments(self):
633 # seconds and nanoseconds parameters are mutually exclusive
634 with self.assertRaises(ValueError):
635 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200636 with self.assertRaises(TypeError):
637 os.utime(self.fname, [5, 5])
638 with self.assertRaises(TypeError):
639 os.utime(self.fname, (5,))
640 with self.assertRaises(TypeError):
641 os.utime(self.fname, (5, 5, 5))
642 with self.assertRaises(TypeError):
643 os.utime(self.fname, ns=[5, 5])
644 with self.assertRaises(TypeError):
645 os.utime(self.fname, ns=(5,))
646 with self.assertRaises(TypeError):
647 os.utime(self.fname, ns=(5, 5, 5))
648
649 if os.utime not in os.supports_follow_symlinks:
650 with self.assertRaises(NotImplementedError):
651 os.utime(self.fname, (5, 5), follow_symlinks=False)
652 if os.utime not in os.supports_fd:
653 with open(self.fname, 'wb', 0) as fp:
654 with self.assertRaises(TypeError):
655 os.utime(fp.fileno(), (5, 5))
656 if os.utime not in os.supports_dir_fd:
657 with self.assertRaises(NotImplementedError):
658 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200659
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300660 @support.cpython_only
661 def test_issue31577(self):
662 # The interpreter shouldn't crash in case utime() received a bad
663 # ns argument.
664 def get_bad_int(divmod_ret_val):
665 class BadInt:
666 def __divmod__(*args):
667 return divmod_ret_val
668 return BadInt()
669 with self.assertRaises(TypeError):
670 os.utime(self.fname, ns=(get_bad_int(42), 1))
671 with self.assertRaises(TypeError):
672 os.utime(self.fname, ns=(get_bad_int(()), 1))
673 with self.assertRaises(TypeError):
674 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
675
Victor Stinner47aacc82015-06-12 17:26:23 +0200676
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000677from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000679class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000680 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000681 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000682
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000683 def setUp(self):
684 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000685 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000686 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000687 for key, value in self._reference().items():
688 os.environ[key] = value
689
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000690 def tearDown(self):
691 os.environ.clear()
692 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000693 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000694 os.environb.clear()
695 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000696
Christian Heimes90333392007-11-01 19:08:42 +0000697 def _reference(self):
698 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
699
700 def _empty_mapping(self):
701 os.environ.clear()
702 return os.environ
703
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000704 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200705 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
706 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000707 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000708 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300709 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200710 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300711 value = popen.read().strip()
712 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000713
Xavier de Gayed1415312016-07-22 12:15:29 +0200714 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
715 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000716 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200717 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
718 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300719 it = iter(popen)
720 self.assertEqual(next(it), "line1\n")
721 self.assertEqual(next(it), "line2\n")
722 self.assertEqual(next(it), "line3\n")
723 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000724
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000725 # Verify environ keys and values from the OS are of the
726 # correct str type.
727 def test_keyvalue_types(self):
728 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000729 self.assertEqual(type(key), str)
730 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000731
Christian Heimes90333392007-11-01 19:08:42 +0000732 def test_items(self):
733 for key, value in self._reference().items():
734 self.assertEqual(os.environ.get(key), value)
735
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000736 # Issue 7310
737 def test___repr__(self):
738 """Check that the repr() of os.environ looks like environ({...})."""
739 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000740 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
741 '{!r}: {!r}'.format(key, value)
742 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000743
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000744 def test_get_exec_path(self):
745 defpath_list = os.defpath.split(os.pathsep)
746 test_path = ['/monty', '/python', '', '/flying/circus']
747 test_env = {'PATH': os.pathsep.join(test_path)}
748
749 saved_environ = os.environ
750 try:
751 os.environ = dict(test_env)
752 # Test that defaulting to os.environ works.
753 self.assertSequenceEqual(test_path, os.get_exec_path())
754 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
755 finally:
756 os.environ = saved_environ
757
758 # No PATH environment variable
759 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
760 # Empty PATH environment variable
761 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
762 # Supplied PATH environment variable
763 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
764
Victor Stinnerb745a742010-05-18 17:17:23 +0000765 if os.supports_bytes_environ:
766 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000767 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000768 # ignore BytesWarning warning
769 with warnings.catch_warnings(record=True):
770 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000771 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000772 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000773 pass
774 else:
775 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000776
777 # bytes key and/or value
778 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
779 ['abc'])
780 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
781 ['abc'])
782 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
783 ['abc'])
784
785 @unittest.skipUnless(os.supports_bytes_environ,
786 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000787 def test_environb(self):
788 # os.environ -> os.environb
789 value = 'euro\u20ac'
790 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000791 value_bytes = value.encode(sys.getfilesystemencoding(),
792 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000793 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000794 msg = "U+20AC character is not encodable to %s" % (
795 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000796 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000797 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000798 self.assertEqual(os.environ['unicode'], value)
799 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800
801 # os.environb -> os.environ
802 value = b'\xff'
803 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000805 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000807
Victor Stinner13ff2452018-01-22 18:32:50 +0100808 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100809 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100810 def test_unset_error(self):
811 if sys.platform == "win32":
812 # an environment variable is limited to 32,767 characters
813 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100814 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100815 else:
816 # "=" is not allowed in a variable name
817 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100818 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100819
Victor Stinner6d101392013-04-14 16:35:04 +0200820 def test_key_type(self):
821 missing = 'missingkey'
822 self.assertNotIn(missing, os.environ)
823
Victor Stinner839e5ea2013-04-14 16:43:03 +0200824 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200825 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200827 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200828
Victor Stinner839e5ea2013-04-14 16:43:03 +0200829 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200830 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200831 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200832 self.assertTrue(cm.exception.__suppress_context__)
833
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300834 def _test_environ_iteration(self, collection):
835 iterator = iter(collection)
836 new_key = "__new_key__"
837
838 next(iterator) # start iteration over os.environ.items
839
840 # add a new key in os.environ mapping
841 os.environ[new_key] = "test_environ_iteration"
842
843 try:
844 next(iterator) # force iteration over modified mapping
845 self.assertEqual(os.environ[new_key], "test_environ_iteration")
846 finally:
847 del os.environ[new_key]
848
849 def test_iter_error_when_changing_os_environ(self):
850 self._test_environ_iteration(os.environ)
851
852 def test_iter_error_when_changing_os_environ_items(self):
853 self._test_environ_iteration(os.environ.items())
854
855 def test_iter_error_when_changing_os_environ_values(self):
856 self._test_environ_iteration(os.environ.values())
857
Victor Stinner6d101392013-04-14 16:35:04 +0200858
Tim Petersc4e09402003-04-25 07:11:48 +0000859class WalkTests(unittest.TestCase):
860 """Tests for os.walk()."""
861
Victor Stinner0561c532015-03-12 10:28:24 +0100862 # Wrapper to hide minor differences between os.walk and os.fwalk
863 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200864 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200865 if 'follow_symlinks' in kwargs:
866 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200867 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100868
Charles-François Natali7372b062012-02-05 15:15:38 +0100869 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100870 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100871 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000872
873 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000874 # TESTFN/
875 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000876 # tmp1
877 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000878 # tmp2
879 # SUB11/ no kids
880 # SUB2/ a file kid and a dirsymlink kid
881 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300882 # SUB21/ not readable
883 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000884 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200885 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300886 # broken_link2
887 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000888 # TEST2/
889 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100890 self.walk_path = join(support.TESTFN, "TEST1")
891 self.sub1_path = join(self.walk_path, "SUB1")
892 self.sub11_path = join(self.sub1_path, "SUB11")
893 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300894 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100895 tmp1_path = join(self.walk_path, "tmp1")
896 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000897 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300898 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100899 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000900 t2_path = join(support.TESTFN, "TEST2")
901 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200902 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300903 broken_link2_path = join(sub2_path, "broken_link2")
904 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000905
906 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100907 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000908 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300909 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000910 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100911
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300912 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100913 with open(path, "x") as f:
914 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000915
Victor Stinner0561c532015-03-12 10:28:24 +0100916 if support.can_symlink():
917 os.symlink(os.path.abspath(t2_path), self.link_path)
918 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300919 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
920 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300921 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300922 ["broken_link", "broken_link2", "broken_link3",
923 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100924 else:
pxinwr3e028b22019-02-15 13:04:47 +0800925 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100926
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300927 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300928 try:
929 os.listdir(sub21_path)
930 except PermissionError:
931 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
932 else:
933 os.chmod(sub21_path, stat.S_IRWXU)
934 os.unlink(tmp5_path)
935 os.rmdir(sub21_path)
936 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300937
Victor Stinner0561c532015-03-12 10:28:24 +0100938 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000939 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300940 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100941
Tim Petersc4e09402003-04-25 07:11:48 +0000942 self.assertEqual(len(all), 4)
943 # We can't know which order SUB1 and SUB2 will appear in.
944 # Not flipped: TESTFN, SUB1, SUB11, SUB2
945 # flipped: TESTFN, SUB2, SUB1, SUB11
946 flipped = all[0][1][0] != "SUB1"
947 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200948 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300949 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100950 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
951 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
952 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
953 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000954
Brett Cannon3f9183b2016-08-26 14:44:48 -0700955 def test_walk_prune(self, walk_path=None):
956 if walk_path is None:
957 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000958 # Prune the search.
959 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700960 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000961 all.append((root, dirs, files))
962 # Don't descend into SUB1.
963 if 'SUB1' in dirs:
964 # Note that this also mutates the dirs we appended to all!
965 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000966
Victor Stinner0561c532015-03-12 10:28:24 +0100967 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200968 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100969
970 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300971 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100972 self.assertEqual(all[1], self.sub2_tree)
973
Brett Cannon3f9183b2016-08-26 14:44:48 -0700974 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200975 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700976
Victor Stinner0561c532015-03-12 10:28:24 +0100977 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000978 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100979 all = list(self.walk(self.walk_path, topdown=False))
980
Victor Stinner53b0a412016-03-26 01:12:36 +0100981 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000982 # We can't know which order SUB1 and SUB2 will appear in.
983 # Not flipped: SUB11, SUB1, SUB2, TESTFN
984 # flipped: SUB2, SUB11, SUB1, TESTFN
985 flipped = all[3][1][0] != "SUB1"
986 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200987 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300988 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100989 self.assertEqual(all[3],
990 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
991 self.assertEqual(all[flipped],
992 (self.sub11_path, [], []))
993 self.assertEqual(all[flipped + 1],
994 (self.sub1_path, ["SUB11"], ["tmp2"]))
995 self.assertEqual(all[2 - 2 * flipped],
996 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000997
Victor Stinner0561c532015-03-12 10:28:24 +0100998 def test_walk_symlink(self):
999 if not support.can_symlink():
1000 self.skipTest("need symlink support")
1001
1002 # Walk, following symlinks.
1003 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1004 for root, dirs, files in walk_it:
1005 if root == self.link_path:
1006 self.assertEqual(dirs, [])
1007 self.assertEqual(files, ["tmp4"])
1008 break
1009 else:
1010 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001011
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001012 def test_walk_bad_dir(self):
1013 # Walk top-down.
1014 errors = []
1015 walk_it = self.walk(self.walk_path, onerror=errors.append)
1016 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001017 self.assertEqual(errors, [])
1018 dir1 = 'SUB1'
1019 path1 = os.path.join(root, dir1)
1020 path1new = os.path.join(root, dir1 + '.new')
1021 os.rename(path1, path1new)
1022 try:
1023 roots = [r for r, d, f in walk_it]
1024 self.assertTrue(errors)
1025 self.assertNotIn(path1, roots)
1026 self.assertNotIn(path1new, roots)
1027 for dir2 in dirs:
1028 if dir2 != dir1:
1029 self.assertIn(os.path.join(root, dir2), roots)
1030 finally:
1031 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001032
Charles-François Natali7372b062012-02-05 15:15:38 +01001033
1034@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1035class FwalkTests(WalkTests):
1036 """Tests for os.fwalk()."""
1037
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001038 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001039 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001040 yield (root, dirs, files)
1041
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001042 def fwalk(self, *args, **kwargs):
1043 return os.fwalk(*args, **kwargs)
1044
Larry Hastingsc48fe982012-06-25 04:49:05 -07001045 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1046 """
1047 compare with walk() results.
1048 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001049 walk_kwargs = walk_kwargs.copy()
1050 fwalk_kwargs = fwalk_kwargs.copy()
1051 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1052 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1053 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001054
Charles-François Natali7372b062012-02-05 15:15:38 +01001055 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001056 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001057 expected[root] = (set(dirs), set(files))
1058
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001059 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001060 self.assertIn(root, expected)
1061 self.assertEqual(expected[root], (set(dirs), set(files)))
1062
Larry Hastingsc48fe982012-06-25 04:49:05 -07001063 def test_compare_to_walk(self):
1064 kwargs = {'top': support.TESTFN}
1065 self._compare_to_walk(kwargs, kwargs)
1066
Charles-François Natali7372b062012-02-05 15:15:38 +01001067 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001068 try:
1069 fd = os.open(".", os.O_RDONLY)
1070 walk_kwargs = {'top': support.TESTFN}
1071 fwalk_kwargs = walk_kwargs.copy()
1072 fwalk_kwargs['dir_fd'] = fd
1073 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1074 finally:
1075 os.close(fd)
1076
1077 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001078 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001079 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1080 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001081 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001082 # check that the FD is valid
1083 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001084 # redundant check
1085 os.stat(rootfd)
1086 # check that listdir() returns consistent information
1087 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001088
1089 def test_fd_leak(self):
1090 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1091 # we both check that calling fwalk() a large number of times doesn't
1092 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1093 minfd = os.dup(1)
1094 os.close(minfd)
1095 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001096 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001097 pass
1098 newfd = os.dup(1)
1099 self.addCleanup(os.close, newfd)
1100 self.assertEqual(newfd, minfd)
1101
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001102class BytesWalkTests(WalkTests):
1103 """Tests for os.walk() with bytes."""
1104 def walk(self, top, **kwargs):
1105 if 'follow_symlinks' in kwargs:
1106 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1107 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1108 root = os.fsdecode(broot)
1109 dirs = list(map(os.fsdecode, bdirs))
1110 files = list(map(os.fsdecode, bfiles))
1111 yield (root, dirs, files)
1112 bdirs[:] = list(map(os.fsencode, dirs))
1113 bfiles[:] = list(map(os.fsencode, files))
1114
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001115@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1116class BytesFwalkTests(FwalkTests):
1117 """Tests for os.walk() with bytes."""
1118 def fwalk(self, top='.', *args, **kwargs):
1119 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1120 root = os.fsdecode(broot)
1121 dirs = list(map(os.fsdecode, bdirs))
1122 files = list(map(os.fsdecode, bfiles))
1123 yield (root, dirs, files, topfd)
1124 bdirs[:] = list(map(os.fsencode, dirs))
1125 bfiles[:] = list(map(os.fsencode, files))
1126
Charles-François Natali7372b062012-02-05 15:15:38 +01001127
Guido van Rossume7ba4952007-06-06 23:52:48 +00001128class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001129 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001130 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001131
1132 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001133 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001134 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1135 os.makedirs(path) # Should work
1136 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1137 os.makedirs(path)
1138
1139 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001140 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001141 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1142 os.makedirs(path)
1143 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1144 'dir5', 'dir6')
1145 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001146
Serhiy Storchakae304e332017-03-24 13:27:42 +02001147 def test_mode(self):
1148 with support.temp_umask(0o002):
1149 base = support.TESTFN
1150 parent = os.path.join(base, 'dir1')
1151 path = os.path.join(parent, 'dir2')
1152 os.makedirs(path, 0o555)
1153 self.assertTrue(os.path.exists(path))
1154 self.assertTrue(os.path.isdir(path))
1155 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001156 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1157 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001158
Terry Reedy5a22b652010-12-02 07:05:56 +00001159 def test_exist_ok_existing_directory(self):
1160 path = os.path.join(support.TESTFN, 'dir1')
1161 mode = 0o777
1162 old_mask = os.umask(0o022)
1163 os.makedirs(path, mode)
1164 self.assertRaises(OSError, os.makedirs, path, mode)
1165 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001166 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001167 os.makedirs(path, mode=mode, exist_ok=True)
1168 os.umask(old_mask)
1169
Martin Pantera82642f2015-11-19 04:48:44 +00001170 # Issue #25583: A drive root could raise PermissionError on Windows
1171 os.makedirs(os.path.abspath('/'), exist_ok=True)
1172
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001173 def test_exist_ok_s_isgid_directory(self):
1174 path = os.path.join(support.TESTFN, 'dir1')
1175 S_ISGID = stat.S_ISGID
1176 mode = 0o777
1177 old_mask = os.umask(0o022)
1178 try:
1179 existing_testfn_mode = stat.S_IMODE(
1180 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001181 try:
1182 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001183 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001184 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001185 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1186 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1187 # The os should apply S_ISGID from the parent dir for us, but
1188 # this test need not depend on that behavior. Be explicit.
1189 os.makedirs(path, mode | S_ISGID)
1190 # http://bugs.python.org/issue14992
1191 # Should not fail when the bit is already set.
1192 os.makedirs(path, mode, exist_ok=True)
1193 # remove the bit.
1194 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001195 # May work even when the bit is not already set when demanded.
1196 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001197 finally:
1198 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001199
1200 def test_exist_ok_existing_regular_file(self):
1201 base = support.TESTFN
1202 path = os.path.join(support.TESTFN, 'dir1')
1203 f = open(path, 'w')
1204 f.write('abc')
1205 f.close()
1206 self.assertRaises(OSError, os.makedirs, path)
1207 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1208 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1209 os.remove(path)
1210
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001211 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001212 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001213 'dir4', 'dir5', 'dir6')
1214 # If the tests failed, the bottom-most directory ('../dir6')
1215 # may not have been created, so we look for the outermost directory
1216 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001217 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001218 path = os.path.dirname(path)
1219
1220 os.removedirs(path)
1221
Andrew Svetlov405faed2012-12-25 12:18:09 +02001222
R David Murrayf2ad1732014-12-25 18:36:56 -05001223@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1224class ChownFileTests(unittest.TestCase):
1225
Berker Peksag036a71b2015-07-21 09:29:48 +03001226 @classmethod
1227 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001228 os.mkdir(support.TESTFN)
1229
1230 def test_chown_uid_gid_arguments_must_be_index(self):
1231 stat = os.stat(support.TESTFN)
1232 uid = stat.st_uid
1233 gid = stat.st_gid
1234 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1235 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1236 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1237 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1238 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1239
1240 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1241 def test_chown(self):
1242 gid_1, gid_2 = groups[:2]
1243 uid = os.stat(support.TESTFN).st_uid
1244 os.chown(support.TESTFN, uid, gid_1)
1245 gid = os.stat(support.TESTFN).st_gid
1246 self.assertEqual(gid, gid_1)
1247 os.chown(support.TESTFN, uid, gid_2)
1248 gid = os.stat(support.TESTFN).st_gid
1249 self.assertEqual(gid, gid_2)
1250
1251 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1252 "test needs root privilege and more than one user")
1253 def test_chown_with_root(self):
1254 uid_1, uid_2 = all_users[:2]
1255 gid = os.stat(support.TESTFN).st_gid
1256 os.chown(support.TESTFN, uid_1, gid)
1257 uid = os.stat(support.TESTFN).st_uid
1258 self.assertEqual(uid, uid_1)
1259 os.chown(support.TESTFN, uid_2, gid)
1260 uid = os.stat(support.TESTFN).st_uid
1261 self.assertEqual(uid, uid_2)
1262
1263 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1264 "test needs non-root account and more than one user")
1265 def test_chown_without_permission(self):
1266 uid_1, uid_2 = all_users[:2]
1267 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001268 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001269 os.chown(support.TESTFN, uid_1, gid)
1270 os.chown(support.TESTFN, uid_2, gid)
1271
Berker Peksag036a71b2015-07-21 09:29:48 +03001272 @classmethod
1273 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001274 os.rmdir(support.TESTFN)
1275
1276
Andrew Svetlov405faed2012-12-25 12:18:09 +02001277class RemoveDirsTests(unittest.TestCase):
1278 def setUp(self):
1279 os.makedirs(support.TESTFN)
1280
1281 def tearDown(self):
1282 support.rmtree(support.TESTFN)
1283
1284 def test_remove_all(self):
1285 dira = os.path.join(support.TESTFN, 'dira')
1286 os.mkdir(dira)
1287 dirb = os.path.join(dira, 'dirb')
1288 os.mkdir(dirb)
1289 os.removedirs(dirb)
1290 self.assertFalse(os.path.exists(dirb))
1291 self.assertFalse(os.path.exists(dira))
1292 self.assertFalse(os.path.exists(support.TESTFN))
1293
1294 def test_remove_partial(self):
1295 dira = os.path.join(support.TESTFN, 'dira')
1296 os.mkdir(dira)
1297 dirb = os.path.join(dira, 'dirb')
1298 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001299 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001300 os.removedirs(dirb)
1301 self.assertFalse(os.path.exists(dirb))
1302 self.assertTrue(os.path.exists(dira))
1303 self.assertTrue(os.path.exists(support.TESTFN))
1304
1305 def test_remove_nothing(self):
1306 dira = os.path.join(support.TESTFN, 'dira')
1307 os.mkdir(dira)
1308 dirb = os.path.join(dira, 'dirb')
1309 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001310 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001311 with self.assertRaises(OSError):
1312 os.removedirs(dirb)
1313 self.assertTrue(os.path.exists(dirb))
1314 self.assertTrue(os.path.exists(dira))
1315 self.assertTrue(os.path.exists(support.TESTFN))
1316
1317
Guido van Rossume7ba4952007-06-06 23:52:48 +00001318class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001319 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001320 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001321 f.write(b'hello')
1322 f.close()
1323 with open(os.devnull, 'rb') as f:
1324 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001325
Andrew Svetlov405faed2012-12-25 12:18:09 +02001326
Guido van Rossume7ba4952007-06-06 23:52:48 +00001327class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001328 def test_urandom_length(self):
1329 self.assertEqual(len(os.urandom(0)), 0)
1330 self.assertEqual(len(os.urandom(1)), 1)
1331 self.assertEqual(len(os.urandom(10)), 10)
1332 self.assertEqual(len(os.urandom(100)), 100)
1333 self.assertEqual(len(os.urandom(1000)), 1000)
1334
1335 def test_urandom_value(self):
1336 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001337 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001338 data2 = os.urandom(16)
1339 self.assertNotEqual(data1, data2)
1340
1341 def get_urandom_subprocess(self, count):
1342 code = '\n'.join((
1343 'import os, sys',
1344 'data = os.urandom(%s)' % count,
1345 'sys.stdout.buffer.write(data)',
1346 'sys.stdout.buffer.flush()'))
1347 out = assert_python_ok('-c', code)
1348 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001349 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001350 return stdout
1351
1352 def test_urandom_subprocess(self):
1353 data1 = self.get_urandom_subprocess(16)
1354 data2 = self.get_urandom_subprocess(16)
1355 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001356
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001357
Victor Stinner9b1f4742016-09-06 16:18:52 -07001358@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1359class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001360 @classmethod
1361 def setUpClass(cls):
1362 try:
1363 os.getrandom(1)
1364 except OSError as exc:
1365 if exc.errno == errno.ENOSYS:
1366 # Python compiled on a more recent Linux version
1367 # than the current Linux kernel
1368 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1369 else:
1370 raise
1371
Victor Stinner9b1f4742016-09-06 16:18:52 -07001372 def test_getrandom_type(self):
1373 data = os.getrandom(16)
1374 self.assertIsInstance(data, bytes)
1375 self.assertEqual(len(data), 16)
1376
1377 def test_getrandom0(self):
1378 empty = os.getrandom(0)
1379 self.assertEqual(empty, b'')
1380
1381 def test_getrandom_random(self):
1382 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1383
1384 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1385 # resource /dev/random
1386
1387 def test_getrandom_nonblock(self):
1388 # The call must not fail. Check also that the flag exists
1389 try:
1390 os.getrandom(1, os.GRND_NONBLOCK)
1391 except BlockingIOError:
1392 # System urandom is not initialized yet
1393 pass
1394
1395 def test_getrandom_value(self):
1396 data1 = os.getrandom(16)
1397 data2 = os.getrandom(16)
1398 self.assertNotEqual(data1, data2)
1399
1400
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001401# os.urandom() doesn't use a file descriptor when it is implemented with the
1402# getentropy() function, the getrandom() function or the getrandom() syscall
1403OS_URANDOM_DONT_USE_FD = (
1404 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1405 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1406 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001407
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001408@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1409 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001410class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001411 @unittest.skipUnless(resource, "test requires the resource module")
1412 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001413 # Check urandom() failing when it is not able to open /dev/random.
1414 # We spawn a new process to make the test more robust (if getrlimit()
1415 # failed to restore the file descriptor limit after this, the whole
1416 # test suite would crash; this actually happened on the OS X Tiger
1417 # buildbot).
1418 code = """if 1:
1419 import errno
1420 import os
1421 import resource
1422
1423 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1424 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1425 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001426 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001427 except OSError as e:
1428 assert e.errno == errno.EMFILE, e.errno
1429 else:
1430 raise AssertionError("OSError not raised")
1431 """
1432 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001433
Antoine Pitroue472aea2014-04-26 14:33:03 +02001434 def test_urandom_fd_closed(self):
1435 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1436 # closed.
1437 code = """if 1:
1438 import os
1439 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001440 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001441 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001442 with test.support.SuppressCrashReport():
1443 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001444 sys.stdout.buffer.write(os.urandom(4))
1445 """
1446 rc, out, err = assert_python_ok('-Sc', code)
1447
1448 def test_urandom_fd_reopened(self):
1449 # Issue #21207: urandom() should detect its fd to /dev/urandom
1450 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001451 self.addCleanup(support.unlink, support.TESTFN)
1452 create_file(support.TESTFN, b"x" * 256)
1453
Antoine Pitroue472aea2014-04-26 14:33:03 +02001454 code = """if 1:
1455 import os
1456 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001457 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001458 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001459 with test.support.SuppressCrashReport():
1460 for fd in range(3, 256):
1461 try:
1462 os.close(fd)
1463 except OSError:
1464 pass
1465 else:
1466 # Found the urandom fd (XXX hopefully)
1467 break
1468 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001469 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001470 new_fd = f.fileno()
1471 # Issue #26935: posix allows new_fd and fd to be equal but
1472 # some libc implementations have dup2 return an error in this
1473 # case.
1474 if new_fd != fd:
1475 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001476 sys.stdout.buffer.write(os.urandom(4))
1477 sys.stdout.buffer.write(os.urandom(4))
1478 """.format(TESTFN=support.TESTFN)
1479 rc, out, err = assert_python_ok('-Sc', code)
1480 self.assertEqual(len(out), 8)
1481 self.assertNotEqual(out[0:4], out[4:8])
1482 rc, out2, err2 = assert_python_ok('-Sc', code)
1483 self.assertEqual(len(out2), 8)
1484 self.assertNotEqual(out2, out)
1485
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001486
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001487@contextlib.contextmanager
1488def _execvpe_mockup(defpath=None):
1489 """
1490 Stubs out execv and execve functions when used as context manager.
1491 Records exec calls. The mock execv and execve functions always raise an
1492 exception as they would normally never return.
1493 """
1494 # A list of tuples containing (function name, first arg, args)
1495 # of calls to execv or execve that have been made.
1496 calls = []
1497
1498 def mock_execv(name, *args):
1499 calls.append(('execv', name, args))
1500 raise RuntimeError("execv called")
1501
1502 def mock_execve(name, *args):
1503 calls.append(('execve', name, args))
1504 raise OSError(errno.ENOTDIR, "execve called")
1505
1506 try:
1507 orig_execv = os.execv
1508 orig_execve = os.execve
1509 orig_defpath = os.defpath
1510 os.execv = mock_execv
1511 os.execve = mock_execve
1512 if defpath is not None:
1513 os.defpath = defpath
1514 yield calls
1515 finally:
1516 os.execv = orig_execv
1517 os.execve = orig_execve
1518 os.defpath = orig_defpath
1519
Victor Stinner4659ccf2016-09-14 10:57:00 +02001520
Guido van Rossume7ba4952007-06-06 23:52:48 +00001521class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001522 @unittest.skipIf(USING_LINUXTHREADS,
1523 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001524 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001525 self.assertRaises(OSError, os.execvpe, 'no such app-',
1526 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001527
Steve Dowerbce26262016-11-19 19:17:26 -08001528 def test_execv_with_bad_arglist(self):
1529 self.assertRaises(ValueError, os.execv, 'notepad', ())
1530 self.assertRaises(ValueError, os.execv, 'notepad', [])
1531 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1532 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1533
Thomas Heller6790d602007-08-30 17:15:14 +00001534 def test_execvpe_with_bad_arglist(self):
1535 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001536 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1537 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001538
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001539 @unittest.skipUnless(hasattr(os, '_execvpe'),
1540 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001541 def _test_internal_execvpe(self, test_type):
1542 program_path = os.sep + 'absolutepath'
1543 if test_type is bytes:
1544 program = b'executable'
1545 fullpath = os.path.join(os.fsencode(program_path), program)
1546 native_fullpath = fullpath
1547 arguments = [b'progname', 'arg1', 'arg2']
1548 else:
1549 program = 'executable'
1550 arguments = ['progname', 'arg1', 'arg2']
1551 fullpath = os.path.join(program_path, program)
1552 if os.name != "nt":
1553 native_fullpath = os.fsencode(fullpath)
1554 else:
1555 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001556 env = {'spam': 'beans'}
1557
Victor Stinnerb745a742010-05-18 17:17:23 +00001558 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001559 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001560 self.assertRaises(RuntimeError,
1561 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001562 self.assertEqual(len(calls), 1)
1563 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1564
Victor Stinnerb745a742010-05-18 17:17:23 +00001565 # test os._execvpe() with a relative path:
1566 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001567 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001568 self.assertRaises(OSError,
1569 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001570 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001571 self.assertSequenceEqual(calls[0],
1572 ('execve', native_fullpath, (arguments, env)))
1573
1574 # test os._execvpe() with a relative path:
1575 # os.get_exec_path() reads the 'PATH' variable
1576 with _execvpe_mockup() as calls:
1577 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001578 if test_type is bytes:
1579 env_path[b'PATH'] = program_path
1580 else:
1581 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001582 self.assertRaises(OSError,
1583 os._execvpe, program, arguments, env=env_path)
1584 self.assertEqual(len(calls), 1)
1585 self.assertSequenceEqual(calls[0],
1586 ('execve', native_fullpath, (arguments, env_path)))
1587
1588 def test_internal_execvpe_str(self):
1589 self._test_internal_execvpe(str)
1590 if os.name != "nt":
1591 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001592
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001593 def test_execve_invalid_env(self):
1594 args = [sys.executable, '-c', 'pass']
1595
Ville Skyttä49b27342017-08-03 09:00:59 +03001596 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001597 newenv = os.environ.copy()
1598 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1599 with self.assertRaises(ValueError):
1600 os.execve(args[0], args, newenv)
1601
Ville Skyttä49b27342017-08-03 09:00:59 +03001602 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001603 newenv = os.environ.copy()
1604 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1605 with self.assertRaises(ValueError):
1606 os.execve(args[0], args, newenv)
1607
Ville Skyttä49b27342017-08-03 09:00:59 +03001608 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001609 newenv = os.environ.copy()
1610 newenv["FRUIT=ORANGE"] = "lemon"
1611 with self.assertRaises(ValueError):
1612 os.execve(args[0], args, newenv)
1613
Alexey Izbyshev83460312018-10-20 03:28:22 +03001614 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1615 def test_execve_with_empty_path(self):
1616 # bpo-32890: Check GetLastError() misuse
1617 try:
1618 os.execve('', ['arg'], {})
1619 except OSError as e:
1620 self.assertTrue(e.winerror is None or e.winerror != 0)
1621 else:
1622 self.fail('No OSError raised')
1623
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001624
Serhiy Storchaka43767632013-11-03 21:31:38 +02001625@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001626class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001627 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001628 try:
1629 os.stat(support.TESTFN)
1630 except FileNotFoundError:
1631 exists = False
1632 except OSError as exc:
1633 exists = True
1634 self.fail("file %s must not exist; os.stat failed with %s"
1635 % (support.TESTFN, exc))
1636 else:
1637 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001638
Thomas Wouters477c8d52006-05-27 19:21:47 +00001639 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001640 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001641
1642 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001643 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001644
1645 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001646 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001647
1648 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001649 self.addCleanup(support.unlink, support.TESTFN)
1650
Victor Stinnere77c9742016-03-25 10:28:23 +01001651 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001652 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001653
1654 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001655 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001656
Thomas Wouters477c8d52006-05-27 19:21:47 +00001657 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001658 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001659
Victor Stinnere77c9742016-03-25 10:28:23 +01001660
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001661class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001662 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001663 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1664 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001665 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666 def get_single(f):
1667 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001668 if hasattr(os, f):
1669 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001670 return helper
1671 for f in singles:
1672 locals()["test_"+f] = get_single(f)
1673
Benjamin Peterson7522c742009-01-19 21:00:09 +00001674 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001675 try:
1676 f(support.make_bad_fd(), *args)
1677 except OSError as e:
1678 self.assertEqual(e.errno, errno.EBADF)
1679 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001680 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001681 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001682
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001686
Serhiy Storchaka43767632013-11-03 21:31:38 +02001687 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001688 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001689 fd = support.make_bad_fd()
1690 # Make sure none of the descriptors we are about to close are
1691 # currently valid (issue 6542).
1692 for i in range(10):
1693 try: os.fstat(fd+i)
1694 except OSError:
1695 pass
1696 else:
1697 break
1698 if i < 2:
1699 raise unittest.SkipTest(
1700 "Unable to acquire a range of invalid file descriptors")
1701 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001702
Serhiy Storchaka43767632013-11-03 21:31:38 +02001703 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001704 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001705 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001706
Serhiy Storchaka43767632013-11-03 21:31:38 +02001707 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001708 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001709 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001710
Serhiy Storchaka43767632013-11-03 21:31:38 +02001711 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001712 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001713 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001714
Serhiy Storchaka43767632013-11-03 21:31:38 +02001715 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001716 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001717 self.check(os.pathconf, "PC_NAME_MAX")
1718 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001719
Serhiy Storchaka43767632013-11-03 21:31:38 +02001720 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001721 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722 self.check(os.truncate, 0)
1723 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001724
Serhiy Storchaka43767632013-11-03 21:31:38 +02001725 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001726 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001727 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001728
Serhiy Storchaka43767632013-11-03 21:31:38 +02001729 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001730 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001731 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001732
Victor Stinner57ddf782014-01-08 15:21:28 +01001733 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1734 def test_readv(self):
1735 buf = bytearray(10)
1736 self.check(os.readv, [buf])
1737
Serhiy Storchaka43767632013-11-03 21:31:38 +02001738 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001739 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001740 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001741
Serhiy Storchaka43767632013-11-03 21:31:38 +02001742 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001743 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001744 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001745
Victor Stinner57ddf782014-01-08 15:21:28 +01001746 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1747 def test_writev(self):
1748 self.check(os.writev, [b'abc'])
1749
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001750 def test_inheritable(self):
1751 self.check(os.get_inheritable)
1752 self.check(os.set_inheritable, True)
1753
1754 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1755 'needs os.get_blocking() and os.set_blocking()')
1756 def test_blocking(self):
1757 self.check(os.get_blocking)
1758 self.check(os.set_blocking, True)
1759
Brian Curtin1b9df392010-11-24 20:24:31 +00001760
1761class LinkTests(unittest.TestCase):
1762 def setUp(self):
1763 self.file1 = support.TESTFN
1764 self.file2 = os.path.join(support.TESTFN + "2")
1765
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001766 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001767 for file in (self.file1, self.file2):
1768 if os.path.exists(file):
1769 os.unlink(file)
1770
Brian Curtin1b9df392010-11-24 20:24:31 +00001771 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001772 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001773
xdegaye6a55d092017-11-12 17:57:04 +01001774 try:
1775 os.link(file1, file2)
1776 except PermissionError as e:
1777 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001778 with open(file1, "r") as f1, open(file2, "r") as f2:
1779 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1780
1781 def test_link(self):
1782 self._test_link(self.file1, self.file2)
1783
1784 def test_link_bytes(self):
1785 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1786 bytes(self.file2, sys.getfilesystemencoding()))
1787
Brian Curtinf498b752010-11-30 15:54:04 +00001788 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001789 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001790 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001791 except UnicodeError:
1792 raise unittest.SkipTest("Unable to encode for this platform.")
1793
Brian Curtinf498b752010-11-30 15:54:04 +00001794 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001795 self.file2 = self.file1 + "2"
1796 self._test_link(self.file1, self.file2)
1797
Serhiy Storchaka43767632013-11-03 21:31:38 +02001798@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1799class PosixUidGidTests(unittest.TestCase):
1800 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1801 def test_setuid(self):
1802 if os.getuid() != 0:
1803 self.assertRaises(OSError, os.setuid, 0)
1804 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001805
Serhiy Storchaka43767632013-11-03 21:31:38 +02001806 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1807 def test_setgid(self):
1808 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1809 self.assertRaises(OSError, os.setgid, 0)
1810 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001811
Serhiy Storchaka43767632013-11-03 21:31:38 +02001812 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1813 def test_seteuid(self):
1814 if os.getuid() != 0:
1815 self.assertRaises(OSError, os.seteuid, 0)
1816 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001817
Serhiy Storchaka43767632013-11-03 21:31:38 +02001818 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1819 def test_setegid(self):
1820 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1821 self.assertRaises(OSError, os.setegid, 0)
1822 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001823
Serhiy Storchaka43767632013-11-03 21:31:38 +02001824 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1825 def test_setreuid(self):
1826 if os.getuid() != 0:
1827 self.assertRaises(OSError, os.setreuid, 0, 0)
1828 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1829 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001830
Serhiy Storchaka43767632013-11-03 21:31:38 +02001831 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1832 def test_setreuid_neg1(self):
1833 # Needs to accept -1. We run this in a subprocess to avoid
1834 # altering the test runner's process state (issue8045).
1835 subprocess.check_call([
1836 sys.executable, '-c',
1837 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001838
Serhiy Storchaka43767632013-11-03 21:31:38 +02001839 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1840 def test_setregid(self):
1841 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1842 self.assertRaises(OSError, os.setregid, 0, 0)
1843 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1844 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001845
Serhiy Storchaka43767632013-11-03 21:31:38 +02001846 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1847 def test_setregid_neg1(self):
1848 # Needs to accept -1. We run this in a subprocess to avoid
1849 # altering the test runner's process state (issue8045).
1850 subprocess.check_call([
1851 sys.executable, '-c',
1852 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001853
Serhiy Storchaka43767632013-11-03 21:31:38 +02001854@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1855class Pep383Tests(unittest.TestCase):
1856 def setUp(self):
1857 if support.TESTFN_UNENCODABLE:
1858 self.dir = support.TESTFN_UNENCODABLE
1859 elif support.TESTFN_NONASCII:
1860 self.dir = support.TESTFN_NONASCII
1861 else:
1862 self.dir = support.TESTFN
1863 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001864
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865 bytesfn = []
1866 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001867 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001868 fn = os.fsencode(fn)
1869 except UnicodeEncodeError:
1870 return
1871 bytesfn.append(fn)
1872 add_filename(support.TESTFN_UNICODE)
1873 if support.TESTFN_UNENCODABLE:
1874 add_filename(support.TESTFN_UNENCODABLE)
1875 if support.TESTFN_NONASCII:
1876 add_filename(support.TESTFN_NONASCII)
1877 if not bytesfn:
1878 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001879
Serhiy Storchaka43767632013-11-03 21:31:38 +02001880 self.unicodefn = set()
1881 os.mkdir(self.dir)
1882 try:
1883 for fn in bytesfn:
1884 support.create_empty_file(os.path.join(self.bdir, fn))
1885 fn = os.fsdecode(fn)
1886 if fn in self.unicodefn:
1887 raise ValueError("duplicate filename")
1888 self.unicodefn.add(fn)
1889 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001890 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001892
Serhiy Storchaka43767632013-11-03 21:31:38 +02001893 def tearDown(self):
1894 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001895
Serhiy Storchaka43767632013-11-03 21:31:38 +02001896 def test_listdir(self):
1897 expected = self.unicodefn
1898 found = set(os.listdir(self.dir))
1899 self.assertEqual(found, expected)
1900 # test listdir without arguments
1901 current_directory = os.getcwd()
1902 try:
1903 os.chdir(os.sep)
1904 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1905 finally:
1906 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001907
Serhiy Storchaka43767632013-11-03 21:31:38 +02001908 def test_open(self):
1909 for fn in self.unicodefn:
1910 f = open(os.path.join(self.dir, fn), 'rb')
1911 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001912
Serhiy Storchaka43767632013-11-03 21:31:38 +02001913 @unittest.skipUnless(hasattr(os, 'statvfs'),
1914 "need os.statvfs()")
1915 def test_statvfs(self):
1916 # issue #9645
1917 for fn in self.unicodefn:
1918 # should not fail with file not found error
1919 fullname = os.path.join(self.dir, fn)
1920 os.statvfs(fullname)
1921
1922 def test_stat(self):
1923 for fn in self.unicodefn:
1924 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001925
Brian Curtineb24d742010-04-12 17:16:38 +00001926@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1927class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001928 def _kill(self, sig):
1929 # Start sys.executable as a subprocess and communicate from the
1930 # subprocess to the parent that the interpreter is ready. When it
1931 # becomes ready, send *sig* via os.kill to the subprocess and check
1932 # that the return code is equal to *sig*.
1933 import ctypes
1934 from ctypes import wintypes
1935 import msvcrt
1936
1937 # Since we can't access the contents of the process' stdout until the
1938 # process has exited, use PeekNamedPipe to see what's inside stdout
1939 # without waiting. This is done so we can tell that the interpreter
1940 # is started and running at a point where it could handle a signal.
1941 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1942 PeekNamedPipe.restype = wintypes.BOOL
1943 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1944 ctypes.POINTER(ctypes.c_char), # stdout buf
1945 wintypes.DWORD, # Buffer size
1946 ctypes.POINTER(wintypes.DWORD), # bytes read
1947 ctypes.POINTER(wintypes.DWORD), # bytes avail
1948 ctypes.POINTER(wintypes.DWORD)) # bytes left
1949 msg = "running"
1950 proc = subprocess.Popen([sys.executable, "-c",
1951 "import sys;"
1952 "sys.stdout.write('{}');"
1953 "sys.stdout.flush();"
1954 "input()".format(msg)],
1955 stdout=subprocess.PIPE,
1956 stderr=subprocess.PIPE,
1957 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001958 self.addCleanup(proc.stdout.close)
1959 self.addCleanup(proc.stderr.close)
1960 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001961
1962 count, max = 0, 100
1963 while count < max and proc.poll() is None:
1964 # Create a string buffer to store the result of stdout from the pipe
1965 buf = ctypes.create_string_buffer(len(msg))
1966 # Obtain the text currently in proc.stdout
1967 # Bytes read/avail/left are left as NULL and unused
1968 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1969 buf, ctypes.sizeof(buf), None, None, None)
1970 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1971 if buf.value:
1972 self.assertEqual(msg, buf.value.decode())
1973 break
1974 time.sleep(0.1)
1975 count += 1
1976 else:
1977 self.fail("Did not receive communication from the subprocess")
1978
Brian Curtineb24d742010-04-12 17:16:38 +00001979 os.kill(proc.pid, sig)
1980 self.assertEqual(proc.wait(), sig)
1981
1982 def test_kill_sigterm(self):
1983 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001984 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001985
1986 def test_kill_int(self):
1987 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001988 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001989
1990 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001991 tagname = "test_os_%s" % uuid.uuid1()
1992 m = mmap.mmap(-1, 1, tagname)
1993 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001994 # Run a script which has console control handling enabled.
1995 proc = subprocess.Popen([sys.executable,
1996 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001997 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001998 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1999 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002000 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002001 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002002 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002003 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002004 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002005 count += 1
2006 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002007 # Forcefully kill the process if we weren't able to signal it.
2008 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002009 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002010 os.kill(proc.pid, event)
2011 # proc.send_signal(event) could also be done here.
2012 # Allow time for the signal to be passed and the process to exit.
2013 time.sleep(0.5)
2014 if not proc.poll():
2015 # Forcefully kill the process if we weren't able to signal it.
2016 os.kill(proc.pid, signal.SIGINT)
2017 self.fail("subprocess did not stop on {}".format(name))
2018
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002019 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002020 def test_CTRL_C_EVENT(self):
2021 from ctypes import wintypes
2022 import ctypes
2023
2024 # Make a NULL value by creating a pointer with no argument.
2025 NULL = ctypes.POINTER(ctypes.c_int)()
2026 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2027 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2028 wintypes.BOOL)
2029 SetConsoleCtrlHandler.restype = wintypes.BOOL
2030
2031 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002032 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002033 # by subprocesses.
2034 SetConsoleCtrlHandler(NULL, 0)
2035
2036 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2037
2038 def test_CTRL_BREAK_EVENT(self):
2039 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2040
2041
Brian Curtind40e6f72010-07-08 21:39:08 +00002042@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002043class Win32ListdirTests(unittest.TestCase):
2044 """Test listdir on Windows."""
2045
2046 def setUp(self):
2047 self.created_paths = []
2048 for i in range(2):
2049 dir_name = 'SUB%d' % i
2050 dir_path = os.path.join(support.TESTFN, dir_name)
2051 file_name = 'FILE%d' % i
2052 file_path = os.path.join(support.TESTFN, file_name)
2053 os.makedirs(dir_path)
2054 with open(file_path, 'w') as f:
2055 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2056 self.created_paths.extend([dir_name, file_name])
2057 self.created_paths.sort()
2058
2059 def tearDown(self):
2060 shutil.rmtree(support.TESTFN)
2061
2062 def test_listdir_no_extended_path(self):
2063 """Test when the path is not an "extended" path."""
2064 # unicode
2065 self.assertEqual(
2066 sorted(os.listdir(support.TESTFN)),
2067 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002068
Tim Golden781bbeb2013-10-25 20:24:06 +01002069 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002070 self.assertEqual(
2071 sorted(os.listdir(os.fsencode(support.TESTFN))),
2072 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002073
2074 def test_listdir_extended_path(self):
2075 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002076 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002077 # unicode
2078 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2079 self.assertEqual(
2080 sorted(os.listdir(path)),
2081 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002082
Tim Golden781bbeb2013-10-25 20:24:06 +01002083 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002084 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2085 self.assertEqual(
2086 sorted(os.listdir(path)),
2087 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002088
2089
Berker Peksage0b5b202018-08-15 13:03:41 +03002090@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2091class ReadlinkTests(unittest.TestCase):
2092 filelink = 'readlinktest'
2093 filelink_target = os.path.abspath(__file__)
2094 filelinkb = os.fsencode(filelink)
2095 filelinkb_target = os.fsencode(filelink_target)
2096
2097 def setUp(self):
2098 self.assertTrue(os.path.exists(self.filelink_target))
2099 self.assertTrue(os.path.exists(self.filelinkb_target))
2100 self.assertFalse(os.path.exists(self.filelink))
2101 self.assertFalse(os.path.exists(self.filelinkb))
2102
2103 def test_not_symlink(self):
2104 filelink_target = FakePath(self.filelink_target)
2105 self.assertRaises(OSError, os.readlink, self.filelink_target)
2106 self.assertRaises(OSError, os.readlink, filelink_target)
2107
2108 def test_missing_link(self):
2109 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2110 self.assertRaises(FileNotFoundError, os.readlink,
2111 FakePath('missing-link'))
2112
2113 @support.skip_unless_symlink
2114 def test_pathlike(self):
2115 os.symlink(self.filelink_target, self.filelink)
2116 self.addCleanup(support.unlink, self.filelink)
2117 filelink = FakePath(self.filelink)
2118 self.assertEqual(os.readlink(filelink), self.filelink_target)
2119
2120 @support.skip_unless_symlink
2121 def test_pathlike_bytes(self):
2122 os.symlink(self.filelinkb_target, self.filelinkb)
2123 self.addCleanup(support.unlink, self.filelinkb)
2124 path = os.readlink(FakePath(self.filelinkb))
2125 self.assertEqual(path, self.filelinkb_target)
2126 self.assertIsInstance(path, bytes)
2127
2128 @support.skip_unless_symlink
2129 def test_bytes(self):
2130 os.symlink(self.filelinkb_target, self.filelinkb)
2131 self.addCleanup(support.unlink, self.filelinkb)
2132 path = os.readlink(self.filelinkb)
2133 self.assertEqual(path, self.filelinkb_target)
2134 self.assertIsInstance(path, bytes)
2135
2136
Tim Golden781bbeb2013-10-25 20:24:06 +01002137@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002138@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002139class Win32SymlinkTests(unittest.TestCase):
2140 filelink = 'filelinktest'
2141 filelink_target = os.path.abspath(__file__)
2142 dirlink = 'dirlinktest'
2143 dirlink_target = os.path.dirname(filelink_target)
2144 missing_link = 'missing link'
2145
2146 def setUp(self):
2147 assert os.path.exists(self.dirlink_target)
2148 assert os.path.exists(self.filelink_target)
2149 assert not os.path.exists(self.dirlink)
2150 assert not os.path.exists(self.filelink)
2151 assert not os.path.exists(self.missing_link)
2152
2153 def tearDown(self):
2154 if os.path.exists(self.filelink):
2155 os.remove(self.filelink)
2156 if os.path.exists(self.dirlink):
2157 os.rmdir(self.dirlink)
2158 if os.path.lexists(self.missing_link):
2159 os.remove(self.missing_link)
2160
2161 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002162 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002163 self.assertTrue(os.path.exists(self.dirlink))
2164 self.assertTrue(os.path.isdir(self.dirlink))
2165 self.assertTrue(os.path.islink(self.dirlink))
2166 self.check_stat(self.dirlink, self.dirlink_target)
2167
2168 def test_file_link(self):
2169 os.symlink(self.filelink_target, self.filelink)
2170 self.assertTrue(os.path.exists(self.filelink))
2171 self.assertTrue(os.path.isfile(self.filelink))
2172 self.assertTrue(os.path.islink(self.filelink))
2173 self.check_stat(self.filelink, self.filelink_target)
2174
2175 def _create_missing_dir_link(self):
2176 'Create a "directory" link to a non-existent target'
2177 linkname = self.missing_link
2178 if os.path.lexists(linkname):
2179 os.remove(linkname)
2180 target = r'c:\\target does not exist.29r3c740'
2181 assert not os.path.exists(target)
2182 target_is_dir = True
2183 os.symlink(target, linkname, target_is_dir)
2184
2185 def test_remove_directory_link_to_missing_target(self):
2186 self._create_missing_dir_link()
2187 # For compatibility with Unix, os.remove will check the
2188 # directory status and call RemoveDirectory if the symlink
2189 # was created with target_is_dir==True.
2190 os.remove(self.missing_link)
2191
2192 @unittest.skip("currently fails; consider for improvement")
2193 def test_isdir_on_directory_link_to_missing_target(self):
2194 self._create_missing_dir_link()
2195 # consider having isdir return true for directory links
2196 self.assertTrue(os.path.isdir(self.missing_link))
2197
2198 @unittest.skip("currently fails; consider for improvement")
2199 def test_rmdir_on_directory_link_to_missing_target(self):
2200 self._create_missing_dir_link()
2201 # consider allowing rmdir to remove directory links
2202 os.rmdir(self.missing_link)
2203
2204 def check_stat(self, link, target):
2205 self.assertEqual(os.stat(link), os.stat(target))
2206 self.assertNotEqual(os.lstat(link), os.stat(link))
2207
Brian Curtind25aef52011-06-13 15:16:04 -05002208 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002209 self.assertEqual(os.stat(bytes_link), os.stat(target))
2210 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002211
2212 def test_12084(self):
2213 level1 = os.path.abspath(support.TESTFN)
2214 level2 = os.path.join(level1, "level2")
2215 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002216 self.addCleanup(support.rmtree, level1)
2217
2218 os.mkdir(level1)
2219 os.mkdir(level2)
2220 os.mkdir(level3)
2221
2222 file1 = os.path.abspath(os.path.join(level1, "file1"))
2223 create_file(file1)
2224
2225 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002226 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002227 os.chdir(level2)
2228 link = os.path.join(level2, "link")
2229 os.symlink(os.path.relpath(file1), "link")
2230 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002231
Victor Stinnerae39d232016-03-24 17:12:55 +01002232 # Check os.stat calls from the same dir as the link
2233 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002234
Victor Stinnerae39d232016-03-24 17:12:55 +01002235 # Check os.stat calls from a dir below the link
2236 os.chdir(level1)
2237 self.assertEqual(os.stat(file1),
2238 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002239
Victor Stinnerae39d232016-03-24 17:12:55 +01002240 # Check os.stat calls from a dir above the link
2241 os.chdir(level3)
2242 self.assertEqual(os.stat(file1),
2243 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002244 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002245 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002246
SSE43c34aad2018-02-13 00:10:35 +07002247 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2248 and os.path.exists(r'C:\ProgramData'),
2249 'Test directories not found')
2250 def test_29248(self):
2251 # os.symlink() calls CreateSymbolicLink, which creates
2252 # the reparse data buffer with the print name stored
2253 # first, so the offset is always 0. CreateSymbolicLink
2254 # stores the "PrintName" DOS path (e.g. "C:\") first,
2255 # with an offset of 0, followed by the "SubstituteName"
2256 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2257 # the other hand, seems to have been created manually
2258 # with an inverted order.
2259 target = os.readlink(r'C:\Users\All Users')
2260 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2261
Steve Dower6921e732018-03-05 14:26:08 -08002262 def test_buffer_overflow(self):
2263 # Older versions would have a buffer overflow when detecting
2264 # whether a link source was a directory. This test ensures we
2265 # no longer crash, but does not otherwise validate the behavior
2266 segment = 'X' * 27
2267 path = os.path.join(*[segment] * 10)
2268 test_cases = [
2269 # overflow with absolute src
2270 ('\\' + path, segment),
2271 # overflow dest with relative src
2272 (segment, path),
2273 # overflow when joining src
2274 (path[:180], path[:180]),
2275 ]
2276 for src, dest in test_cases:
2277 try:
2278 os.symlink(src, dest)
2279 except FileNotFoundError:
2280 pass
2281 else:
2282 try:
2283 os.remove(dest)
2284 except OSError:
2285 pass
2286 # Also test with bytes, since that is a separate code path.
2287 try:
2288 os.symlink(os.fsencode(src), os.fsencode(dest))
2289 except FileNotFoundError:
2290 pass
2291 else:
2292 try:
2293 os.remove(dest)
2294 except OSError:
2295 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002296
Tim Golden0321cf22014-05-05 19:46:17 +01002297@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2298class Win32JunctionTests(unittest.TestCase):
2299 junction = 'junctiontest'
2300 junction_target = os.path.dirname(os.path.abspath(__file__))
2301
2302 def setUp(self):
2303 assert os.path.exists(self.junction_target)
2304 assert not os.path.exists(self.junction)
2305
2306 def tearDown(self):
2307 if os.path.exists(self.junction):
2308 # os.rmdir delegates to Windows' RemoveDirectoryW,
2309 # which removes junction points safely.
2310 os.rmdir(self.junction)
2311
2312 def test_create_junction(self):
2313 _winapi.CreateJunction(self.junction_target, self.junction)
2314 self.assertTrue(os.path.exists(self.junction))
2315 self.assertTrue(os.path.isdir(self.junction))
2316
2317 # Junctions are not recognized as links.
2318 self.assertFalse(os.path.islink(self.junction))
2319
2320 def test_unlink_removes_junction(self):
2321 _winapi.CreateJunction(self.junction_target, self.junction)
2322 self.assertTrue(os.path.exists(self.junction))
2323
2324 os.unlink(self.junction)
2325 self.assertFalse(os.path.exists(self.junction))
2326
Mark Becwarb82bfac2019-02-02 16:08:23 -05002327@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2328class Win32NtTests(unittest.TestCase):
2329 def setUp(self):
2330 from test import support
2331 self.nt = support.import_module('nt')
2332 pass
2333
2334 def tearDown(self):
2335 pass
2336
2337 def test_getfinalpathname_handles(self):
2338 try:
2339 import ctypes, ctypes.wintypes
2340 except ImportError:
2341 raise unittest.SkipTest('ctypes module is required for this test')
2342
2343 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2344 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2345
2346 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2347 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2348 ctypes.wintypes.LPDWORD)
2349
2350 # This is a pseudo-handle that doesn't need to be closed
2351 hproc = kernel.GetCurrentProcess()
2352
2353 handle_count = ctypes.wintypes.DWORD()
2354 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2355 self.assertEqual(1, ok)
2356
2357 before_count = handle_count.value
2358
2359 # The first two test the error path, __file__ tests the success path
2360 filenames = [ r'\\?\C:',
2361 r'\\?\NUL',
2362 r'\\?\CONIN',
2363 __file__ ]
2364
2365 for i in range(10):
2366 for name in filenames:
2367 try:
2368 tmp = self.nt._getfinalpathname(name)
2369 except:
2370 # Failure is expected
2371 pass
2372 try:
2373 tmp = os.stat(name)
2374 except:
2375 pass
2376
2377 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2378 self.assertEqual(1, ok)
2379
2380 handle_delta = handle_count.value - before_count
2381
2382 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002383
Jason R. Coombs3a092862013-05-27 23:21:28 -04002384@support.skip_unless_symlink
2385class NonLocalSymlinkTests(unittest.TestCase):
2386
2387 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002388 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002389 Create this structure:
2390
2391 base
2392 \___ some_dir
2393 """
2394 os.makedirs('base/some_dir')
2395
2396 def tearDown(self):
2397 shutil.rmtree('base')
2398
2399 def test_directory_link_nonlocal(self):
2400 """
2401 The symlink target should resolve relative to the link, not relative
2402 to the current directory.
2403
2404 Then, link base/some_link -> base/some_dir and ensure that some_link
2405 is resolved as a directory.
2406
2407 In issue13772, it was discovered that directory detection failed if
2408 the symlink target was not specified relative to the current
2409 directory, which was a defect in the implementation.
2410 """
2411 src = os.path.join('base', 'some_link')
2412 os.symlink('some_dir', src)
2413 assert os.path.isdir(src)
2414
2415
Victor Stinnere8d51452010-08-19 01:05:19 +00002416class FSEncodingTests(unittest.TestCase):
2417 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002418 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2419 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002420
Victor Stinnere8d51452010-08-19 01:05:19 +00002421 def test_identity(self):
2422 # assert fsdecode(fsencode(x)) == x
2423 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2424 try:
2425 bytesfn = os.fsencode(fn)
2426 except UnicodeEncodeError:
2427 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002428 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002429
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002430
Brett Cannonefb00c02012-02-29 18:31:31 -05002431
2432class DeviceEncodingTests(unittest.TestCase):
2433
2434 def test_bad_fd(self):
2435 # Return None when an fd doesn't actually exist.
2436 self.assertIsNone(os.device_encoding(123456))
2437
Philip Jenveye308b7c2012-02-29 16:16:15 -08002438 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2439 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002440 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002441 def test_device_encoding(self):
2442 encoding = os.device_encoding(0)
2443 self.assertIsNotNone(encoding)
2444 self.assertTrue(codecs.lookup(encoding))
2445
2446
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002447class PidTests(unittest.TestCase):
2448 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2449 def test_getppid(self):
2450 p = subprocess.Popen([sys.executable, '-c',
2451 'import os; print(os.getppid())'],
2452 stdout=subprocess.PIPE)
2453 stdout, _ = p.communicate()
2454 # We are the parent of our subprocess
2455 self.assertEqual(int(stdout), os.getpid())
2456
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002457 def test_waitpid(self):
2458 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002459 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002460 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002461 status = os.waitpid(pid, 0)
2462 self.assertEqual(status, (pid, 0))
2463
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002464
Victor Stinner4659ccf2016-09-14 10:57:00 +02002465class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002466 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002467 self.exitcode = 17
2468
2469 filename = support.TESTFN
2470 self.addCleanup(support.unlink, filename)
2471
2472 if not with_env:
2473 code = 'import sys; sys.exit(%s)' % self.exitcode
2474 else:
2475 self.env = dict(os.environ)
2476 # create an unique key
2477 self.key = str(uuid.uuid4())
2478 self.env[self.key] = self.key
2479 # read the variable from os.environ to check that it exists
2480 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2481 % (self.key, self.exitcode))
2482
2483 with open(filename, "w") as fp:
2484 fp.write(code)
2485
Berker Peksag81816462016-09-15 20:19:47 +03002486 args = [sys.executable, filename]
2487 if use_bytes:
2488 args = [os.fsencode(a) for a in args]
2489 self.env = {os.fsencode(k): os.fsencode(v)
2490 for k, v in self.env.items()}
2491
2492 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002493
Berker Peksag4af23d72016-09-15 20:32:44 +03002494 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002495 def test_spawnl(self):
2496 args = self.create_args()
2497 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2498 self.assertEqual(exitcode, self.exitcode)
2499
Berker Peksag4af23d72016-09-15 20:32:44 +03002500 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002501 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002502 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002503 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2504 self.assertEqual(exitcode, self.exitcode)
2505
Berker Peksag4af23d72016-09-15 20:32:44 +03002506 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002507 def test_spawnlp(self):
2508 args = self.create_args()
2509 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2510 self.assertEqual(exitcode, self.exitcode)
2511
Berker Peksag4af23d72016-09-15 20:32:44 +03002512 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002513 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002514 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002515 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2516 self.assertEqual(exitcode, self.exitcode)
2517
Berker Peksag4af23d72016-09-15 20:32:44 +03002518 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002519 def test_spawnv(self):
2520 args = self.create_args()
2521 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2522 self.assertEqual(exitcode, self.exitcode)
2523
Berker Peksag4af23d72016-09-15 20:32:44 +03002524 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002525 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002526 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002527 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2528 self.assertEqual(exitcode, self.exitcode)
2529
Berker Peksag4af23d72016-09-15 20:32:44 +03002530 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002531 def test_spawnvp(self):
2532 args = self.create_args()
2533 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2534 self.assertEqual(exitcode, self.exitcode)
2535
Berker Peksag4af23d72016-09-15 20:32:44 +03002536 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002537 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002538 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002539 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2540 self.assertEqual(exitcode, self.exitcode)
2541
Berker Peksag4af23d72016-09-15 20:32:44 +03002542 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002543 def test_nowait(self):
2544 args = self.create_args()
2545 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2546 result = os.waitpid(pid, 0)
2547 self.assertEqual(result[0], pid)
2548 status = result[1]
2549 if hasattr(os, 'WIFEXITED'):
2550 self.assertTrue(os.WIFEXITED(status))
2551 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2552 else:
2553 self.assertEqual(status, self.exitcode << 8)
2554
Berker Peksag4af23d72016-09-15 20:32:44 +03002555 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002556 def test_spawnve_bytes(self):
2557 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2558 args = self.create_args(with_env=True, use_bytes=True)
2559 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2560 self.assertEqual(exitcode, self.exitcode)
2561
Steve Dower859fd7b2016-11-19 18:53:19 -08002562 @requires_os_func('spawnl')
2563 def test_spawnl_noargs(self):
2564 args = self.create_args()
2565 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002566 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002567
2568 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002569 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002570 args = self.create_args()
2571 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002572 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002573
2574 @requires_os_func('spawnv')
2575 def test_spawnv_noargs(self):
2576 args = self.create_args()
2577 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2578 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002579 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2580 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002581
2582 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002583 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002584 args = self.create_args()
2585 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2586 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002587 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2588 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002589
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002590 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002591 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002592
Ville Skyttä49b27342017-08-03 09:00:59 +03002593 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002594 newenv = os.environ.copy()
2595 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2596 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002597 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002598 except ValueError:
2599 pass
2600 else:
2601 self.assertEqual(exitcode, 127)
2602
Ville Skyttä49b27342017-08-03 09:00:59 +03002603 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002604 newenv = os.environ.copy()
2605 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2606 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002607 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002608 except ValueError:
2609 pass
2610 else:
2611 self.assertEqual(exitcode, 127)
2612
Ville Skyttä49b27342017-08-03 09:00:59 +03002613 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002614 newenv = os.environ.copy()
2615 newenv["FRUIT=ORANGE"] = "lemon"
2616 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002617 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002618 except ValueError:
2619 pass
2620 else:
2621 self.assertEqual(exitcode, 127)
2622
Ville Skyttä49b27342017-08-03 09:00:59 +03002623 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002624 filename = support.TESTFN
2625 self.addCleanup(support.unlink, filename)
2626 with open(filename, "w") as fp:
2627 fp.write('import sys, os\n'
2628 'if os.getenv("FRUIT") != "orange=lemon":\n'
2629 ' raise AssertionError')
2630 args = [sys.executable, filename]
2631 newenv = os.environ.copy()
2632 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002633 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002634 self.assertEqual(exitcode, 0)
2635
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002636 @requires_os_func('spawnve')
2637 def test_spawnve_invalid_env(self):
2638 self._test_invalid_env(os.spawnve)
2639
2640 @requires_os_func('spawnvpe')
2641 def test_spawnvpe_invalid_env(self):
2642 self._test_invalid_env(os.spawnvpe)
2643
Serhiy Storchaka77703942017-06-25 07:33:01 +03002644
Brian Curtin0151b8e2010-09-24 13:43:43 +00002645# The introduction of this TestCase caused at least two different errors on
2646# *nix buildbots. Temporarily skip this to let the buildbots move along.
2647@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002648@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2649class LoginTests(unittest.TestCase):
2650 def test_getlogin(self):
2651 user_name = os.getlogin()
2652 self.assertNotEqual(len(user_name), 0)
2653
2654
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002655@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2656 "needs os.getpriority and os.setpriority")
2657class ProgramPriorityTests(unittest.TestCase):
2658 """Tests for os.getpriority() and os.setpriority()."""
2659
2660 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002661
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002662 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2663 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2664 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002665 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2666 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002667 raise unittest.SkipTest("unable to reliably test setpriority "
2668 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002669 else:
2670 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002671 finally:
2672 try:
2673 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2674 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002675 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002676 raise
2677
2678
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002679class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002680
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002681 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002682
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683 def __init__(self, conn):
2684 asynchat.async_chat.__init__(self, conn)
2685 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002686 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687 self.closed = False
2688 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002689
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002690 def handle_read(self):
2691 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002692 if self.accumulate:
2693 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002694
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002695 def get_data(self):
2696 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002697
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002699 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002700 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002701
2702 def handle_error(self):
2703 raise
2704
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 def __init__(self, address):
2706 threading.Thread.__init__(self)
2707 asyncore.dispatcher.__init__(self)
2708 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2709 self.bind(address)
2710 self.listen(5)
2711 self.host, self.port = self.socket.getsockname()[:2]
2712 self.handler_instance = None
2713 self._active = False
2714 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002715
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002716 # --- public API
2717
2718 @property
2719 def running(self):
2720 return self._active
2721
2722 def start(self):
2723 assert not self.running
2724 self.__flag = threading.Event()
2725 threading.Thread.start(self)
2726 self.__flag.wait()
2727
2728 def stop(self):
2729 assert self.running
2730 self._active = False
2731 self.join()
2732
2733 def wait(self):
2734 # wait for handler connection to be closed, then stop the server
2735 while not getattr(self.handler_instance, "closed", False):
2736 time.sleep(0.001)
2737 self.stop()
2738
2739 # --- internals
2740
2741 def run(self):
2742 self._active = True
2743 self.__flag.set()
2744 while self._active and asyncore.socket_map:
2745 self._active_lock.acquire()
2746 asyncore.loop(timeout=0.001, count=1)
2747 self._active_lock.release()
2748 asyncore.close_all()
2749
2750 def handle_accept(self):
2751 conn, addr = self.accept()
2752 self.handler_instance = self.Handler(conn)
2753
2754 def handle_connect(self):
2755 self.close()
2756 handle_read = handle_connect
2757
2758 def writable(self):
2759 return 0
2760
2761 def handle_error(self):
2762 raise
2763
2764
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002765@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2766class TestSendfile(unittest.TestCase):
2767
Victor Stinner8c663fd2017-11-08 14:44:44 -08002768 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002769 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002770 not sys.platform.startswith("solaris") and \
2771 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002772 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2773 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002774 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2775 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002776
2777 @classmethod
2778 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002779 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002780 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002781
2782 @classmethod
2783 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002784 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002785 support.unlink(support.TESTFN)
2786
2787 def setUp(self):
2788 self.server = SendfileTestServer((support.HOST, 0))
2789 self.server.start()
2790 self.client = socket.socket()
2791 self.client.connect((self.server.host, self.server.port))
2792 self.client.settimeout(1)
2793 # synchronize by waiting for "220 ready" response
2794 self.client.recv(1024)
2795 self.sockno = self.client.fileno()
2796 self.file = open(support.TESTFN, 'rb')
2797 self.fileno = self.file.fileno()
2798
2799 def tearDown(self):
2800 self.file.close()
2801 self.client.close()
2802 if self.server.running:
2803 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002804 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002805
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002806 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002807 """A higher level wrapper representing how an application is
2808 supposed to use sendfile().
2809 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002810 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002811 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002812 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002813 except OSError as err:
2814 if err.errno == errno.ECONNRESET:
2815 # disconnected
2816 raise
2817 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2818 # we have to retry send data
2819 continue
2820 else:
2821 raise
2822
2823 def test_send_whole_file(self):
2824 # normal send
2825 total_sent = 0
2826 offset = 0
2827 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002828 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002829 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2830 if sent == 0:
2831 break
2832 offset += sent
2833 total_sent += sent
2834 self.assertTrue(sent <= nbytes)
2835 self.assertEqual(offset, total_sent)
2836
2837 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002838 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002839 self.client.close()
2840 self.server.wait()
2841 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002842 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002843 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002844
2845 def test_send_at_certain_offset(self):
2846 # start sending a file at a certain offset
2847 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002848 offset = len(self.DATA) // 2
2849 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002850 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002851 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002852 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2853 if sent == 0:
2854 break
2855 offset += sent
2856 total_sent += sent
2857 self.assertTrue(sent <= nbytes)
2858
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002859 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002860 self.client.close()
2861 self.server.wait()
2862 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002863 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002864 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002865 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002866 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002867
2868 def test_offset_overflow(self):
2869 # specify an offset > file size
2870 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002871 try:
2872 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2873 except OSError as e:
2874 # Solaris can raise EINVAL if offset >= file length, ignore.
2875 if e.errno != errno.EINVAL:
2876 raise
2877 else:
2878 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002879 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002880 self.client.close()
2881 self.server.wait()
2882 data = self.server.handler_instance.get_data()
2883 self.assertEqual(data, b'')
2884
2885 def test_invalid_offset(self):
2886 with self.assertRaises(OSError) as cm:
2887 os.sendfile(self.sockno, self.fileno, -1, 4096)
2888 self.assertEqual(cm.exception.errno, errno.EINVAL)
2889
Martin Panterbf19d162015-09-09 01:01:13 +00002890 def test_keywords(self):
2891 # Keyword arguments should be supported
2892 os.sendfile(out=self.sockno, offset=0, count=4096,
2893 **{'in': self.fileno})
2894 if self.SUPPORT_HEADERS_TRAILERS:
2895 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002896 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002897
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002898 # --- headers / trailers tests
2899
Serhiy Storchaka43767632013-11-03 21:31:38 +02002900 @requires_headers_trailers
2901 def test_headers(self):
2902 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002903 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002904 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002905 headers=[b"x" * 512, b"y" * 256])
2906 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002907 total_sent += sent
2908 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002909 while total_sent < len(expected_data):
2910 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002911 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2912 offset, nbytes)
2913 if sent == 0:
2914 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002915 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002916 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002917 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002918
Serhiy Storchaka43767632013-11-03 21:31:38 +02002919 self.assertEqual(total_sent, len(expected_data))
2920 self.client.close()
2921 self.server.wait()
2922 data = self.server.handler_instance.get_data()
2923 self.assertEqual(hash(data), hash(expected_data))
2924
2925 @requires_headers_trailers
2926 def test_trailers(self):
2927 TESTFN2 = support.TESTFN + "2"
2928 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002929
2930 self.addCleanup(support.unlink, TESTFN2)
2931 create_file(TESTFN2, file_data)
2932
2933 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002934 os.sendfile(self.sockno, f.fileno(), 0, 5,
2935 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002936 self.client.close()
2937 self.server.wait()
2938 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002939 self.assertEqual(data, b"abcde123456789")
2940
2941 @requires_headers_trailers
2942 @requires_32b
2943 def test_headers_overflow_32bits(self):
2944 self.server.handler_instance.accumulate = False
2945 with self.assertRaises(OSError) as cm:
2946 os.sendfile(self.sockno, self.fileno, 0, 0,
2947 headers=[b"x" * 2**16] * 2**15)
2948 self.assertEqual(cm.exception.errno, errno.EINVAL)
2949
2950 @requires_headers_trailers
2951 @requires_32b
2952 def test_trailers_overflow_32bits(self):
2953 self.server.handler_instance.accumulate = False
2954 with self.assertRaises(OSError) as cm:
2955 os.sendfile(self.sockno, self.fileno, 0, 0,
2956 trailers=[b"x" * 2**16] * 2**15)
2957 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002958
Serhiy Storchaka43767632013-11-03 21:31:38 +02002959 @requires_headers_trailers
2960 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2961 'test needs os.SF_NODISKIO')
2962 def test_flags(self):
2963 try:
2964 os.sendfile(self.sockno, self.fileno, 0, 4096,
2965 flags=os.SF_NODISKIO)
2966 except OSError as err:
2967 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2968 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002969
2970
Larry Hastings9cf065c2012-06-22 16:30:09 -07002971def supports_extended_attributes():
2972 if not hasattr(os, "setxattr"):
2973 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002974
Larry Hastings9cf065c2012-06-22 16:30:09 -07002975 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002976 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002977 try:
2978 os.setxattr(fp.fileno(), b"user.test", b"")
2979 except OSError:
2980 return False
2981 finally:
2982 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002983
2984 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002985
2986
2987@unittest.skipUnless(supports_extended_attributes(),
2988 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002989# Kernels < 2.6.39 don't respect setxattr flags.
2990@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002991class ExtendedAttributeTests(unittest.TestCase):
2992
Larry Hastings9cf065c2012-06-22 16:30:09 -07002993 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002994 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002995 self.addCleanup(support.unlink, fn)
2996 create_file(fn)
2997
Benjamin Peterson799bd802011-08-31 22:15:17 -04002998 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002999 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003000 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003001
Victor Stinnerf12e5062011-10-16 22:12:03 +02003002 init_xattr = listxattr(fn)
3003 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003004
Larry Hastings9cf065c2012-06-22 16:30:09 -07003005 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003006 xattr = set(init_xattr)
3007 xattr.add("user.test")
3008 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003009 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3010 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3011 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003012
Benjamin Peterson799bd802011-08-31 22:15:17 -04003013 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003014 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003015 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003016
Benjamin Peterson799bd802011-08-31 22:15:17 -04003017 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003018 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003019 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003020
Larry Hastings9cf065c2012-06-22 16:30:09 -07003021 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003022 xattr.add("user.test2")
3023 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003024 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003025
Benjamin Peterson799bd802011-08-31 22:15:17 -04003026 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003027 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003028 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003029
Victor Stinnerf12e5062011-10-16 22:12:03 +02003030 xattr.remove("user.test")
3031 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003032 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3033 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3034 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3035 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003036 many = sorted("user.test{}".format(i) for i in range(100))
3037 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003038 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003039 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003040
Larry Hastings9cf065c2012-06-22 16:30:09 -07003041 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003042 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003043 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003044
3045 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3046 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003047
3048 def test_simple(self):
3049 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3050 os.listxattr)
3051
3052 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003053 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3054 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003055
3056 def test_fds(self):
3057 def getxattr(path, *args):
3058 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003059 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003060 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003061 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003062 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003063 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003064 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003065 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003066 def listxattr(path, *args):
3067 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003068 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003069 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3070
3071
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003072@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3073class TermsizeTests(unittest.TestCase):
3074 def test_does_not_crash(self):
3075 """Check if get_terminal_size() returns a meaningful value.
3076
3077 There's no easy portable way to actually check the size of the
3078 terminal, so let's check if it returns something sensible instead.
3079 """
3080 try:
3081 size = os.get_terminal_size()
3082 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003083 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003084 # Under win32 a generic OSError can be thrown if the
3085 # handle cannot be retrieved
3086 self.skipTest("failed to query terminal size")
3087 raise
3088
Antoine Pitroucfade362012-02-08 23:48:59 +01003089 self.assertGreaterEqual(size.columns, 0)
3090 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003091
3092 def test_stty_match(self):
3093 """Check if stty returns the same results
3094
3095 stty actually tests stdin, so get_terminal_size is invoked on
3096 stdin explicitly. If stty succeeded, then get_terminal_size()
3097 should work too.
3098 """
3099 try:
3100 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003101 except (FileNotFoundError, subprocess.CalledProcessError,
3102 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003103 self.skipTest("stty invocation failed")
3104 expected = (int(size[1]), int(size[0])) # reversed order
3105
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003106 try:
3107 actual = os.get_terminal_size(sys.__stdin__.fileno())
3108 except OSError as e:
3109 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3110 # Under win32 a generic OSError can be thrown if the
3111 # handle cannot be retrieved
3112 self.skipTest("failed to query terminal size")
3113 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003114 self.assertEqual(expected, actual)
3115
3116
Victor Stinner292c8352012-10-30 02:17:38 +01003117class OSErrorTests(unittest.TestCase):
3118 def setUp(self):
3119 class Str(str):
3120 pass
3121
Victor Stinnerafe17062012-10-31 22:47:43 +01003122 self.bytes_filenames = []
3123 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003124 if support.TESTFN_UNENCODABLE is not None:
3125 decoded = support.TESTFN_UNENCODABLE
3126 else:
3127 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003128 self.unicode_filenames.append(decoded)
3129 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003130 if support.TESTFN_UNDECODABLE is not None:
3131 encoded = support.TESTFN_UNDECODABLE
3132 else:
3133 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003134 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003135 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003136 self.bytes_filenames.append(memoryview(encoded))
3137
3138 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003139
3140 def test_oserror_filename(self):
3141 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003142 (self.filenames, os.chdir,),
3143 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003144 (self.filenames, os.lstat,),
3145 (self.filenames, os.open, os.O_RDONLY),
3146 (self.filenames, os.rmdir,),
3147 (self.filenames, os.stat,),
3148 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003149 ]
3150 if sys.platform == "win32":
3151 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003152 (self.bytes_filenames, os.rename, b"dst"),
3153 (self.bytes_filenames, os.replace, b"dst"),
3154 (self.unicode_filenames, os.rename, "dst"),
3155 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003156 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003157 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003158 else:
3159 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003160 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003161 (self.filenames, os.rename, "dst"),
3162 (self.filenames, os.replace, "dst"),
3163 ))
3164 if hasattr(os, "chown"):
3165 funcs.append((self.filenames, os.chown, 0, 0))
3166 if hasattr(os, "lchown"):
3167 funcs.append((self.filenames, os.lchown, 0, 0))
3168 if hasattr(os, "truncate"):
3169 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003170 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003171 funcs.append((self.filenames, os.chflags, 0))
3172 if hasattr(os, "lchflags"):
3173 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003174 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003175 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003176 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003177 if sys.platform == "win32":
3178 funcs.append((self.bytes_filenames, os.link, b"dst"))
3179 funcs.append((self.unicode_filenames, os.link, "dst"))
3180 else:
3181 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003182 if hasattr(os, "listxattr"):
3183 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003184 (self.filenames, os.listxattr,),
3185 (self.filenames, os.getxattr, "user.test"),
3186 (self.filenames, os.setxattr, "user.test", b'user'),
3187 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003188 ))
3189 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003190 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003191 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003192 if sys.platform == "win32":
3193 funcs.append((self.unicode_filenames, os.readlink,))
3194 else:
3195 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003196
Steve Dowercc16be82016-09-08 10:35:16 -07003197
Victor Stinnerafe17062012-10-31 22:47:43 +01003198 for filenames, func, *func_args in funcs:
3199 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003200 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003201 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003202 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003203 else:
3204 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3205 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003206 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003207 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003208 except UnicodeDecodeError:
3209 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003210 else:
3211 self.fail("No exception thrown by {}".format(func))
3212
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003213class CPUCountTests(unittest.TestCase):
3214 def test_cpu_count(self):
3215 cpus = os.cpu_count()
3216 if cpus is not None:
3217 self.assertIsInstance(cpus, int)
3218 self.assertGreater(cpus, 0)
3219 else:
3220 self.skipTest("Could not determine the number of CPUs")
3221
Victor Stinnerdaf45552013-08-28 00:53:59 +02003222
3223class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003224 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003225 fd = os.open(__file__, os.O_RDONLY)
3226 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003227 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003228
Victor Stinnerdaf45552013-08-28 00:53:59 +02003229 os.set_inheritable(fd, True)
3230 self.assertEqual(os.get_inheritable(fd), True)
3231
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003232 @unittest.skipIf(fcntl is None, "need fcntl")
3233 def test_get_inheritable_cloexec(self):
3234 fd = os.open(__file__, os.O_RDONLY)
3235 self.addCleanup(os.close, fd)
3236 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003237
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003238 # clear FD_CLOEXEC flag
3239 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3240 flags &= ~fcntl.FD_CLOEXEC
3241 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003242
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003243 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003244
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003245 @unittest.skipIf(fcntl is None, "need fcntl")
3246 def test_set_inheritable_cloexec(self):
3247 fd = os.open(__file__, os.O_RDONLY)
3248 self.addCleanup(os.close, fd)
3249 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3250 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003251
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003252 os.set_inheritable(fd, True)
3253 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3254 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003255
Victor Stinnerdaf45552013-08-28 00:53:59 +02003256 def test_open(self):
3257 fd = os.open(__file__, os.O_RDONLY)
3258 self.addCleanup(os.close, fd)
3259 self.assertEqual(os.get_inheritable(fd), False)
3260
3261 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3262 def test_pipe(self):
3263 rfd, wfd = os.pipe()
3264 self.addCleanup(os.close, rfd)
3265 self.addCleanup(os.close, wfd)
3266 self.assertEqual(os.get_inheritable(rfd), False)
3267 self.assertEqual(os.get_inheritable(wfd), False)
3268
3269 def test_dup(self):
3270 fd1 = os.open(__file__, os.O_RDONLY)
3271 self.addCleanup(os.close, fd1)
3272
3273 fd2 = os.dup(fd1)
3274 self.addCleanup(os.close, fd2)
3275 self.assertEqual(os.get_inheritable(fd2), False)
3276
3277 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3278 def test_dup2(self):
3279 fd = os.open(__file__, os.O_RDONLY)
3280 self.addCleanup(os.close, fd)
3281
3282 # inheritable by default
3283 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003284 self.addCleanup(os.close, fd2)
3285 self.assertEqual(os.dup2(fd, fd2), fd2)
3286 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003287
3288 # force non-inheritable
3289 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003290 self.addCleanup(os.close, fd3)
3291 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3292 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003293
3294 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3295 def test_openpty(self):
3296 master_fd, slave_fd = os.openpty()
3297 self.addCleanup(os.close, master_fd)
3298 self.addCleanup(os.close, slave_fd)
3299 self.assertEqual(os.get_inheritable(master_fd), False)
3300 self.assertEqual(os.get_inheritable(slave_fd), False)
3301
3302
Brett Cannon3f9183b2016-08-26 14:44:48 -07003303class PathTConverterTests(unittest.TestCase):
3304 # tuples of (function name, allows fd arguments, additional arguments to
3305 # function, cleanup function)
3306 functions = [
3307 ('stat', True, (), None),
3308 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003309 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003310 ('chflags', False, (0,), None),
3311 ('lchflags', False, (0,), None),
3312 ('open', False, (0,), getattr(os, 'close', None)),
3313 ]
3314
3315 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003316 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003317 if os.name == 'nt':
3318 bytes_fspath = bytes_filename = None
3319 else:
3320 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003321 bytes_fspath = FakePath(bytes_filename)
3322 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003323 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003324 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003325
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003326 int_fspath = FakePath(fd)
3327 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003328
3329 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3330 with self.subTest(name=name):
3331 try:
3332 fn = getattr(os, name)
3333 except AttributeError:
3334 continue
3335
Brett Cannon8f96a302016-08-26 19:30:11 -07003336 for path in (str_filename, bytes_filename, str_fspath,
3337 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003338 if path is None:
3339 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003340 with self.subTest(name=name, path=path):
3341 result = fn(path, *extra_args)
3342 if cleanup_fn is not None:
3343 cleanup_fn(result)
3344
3345 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003346 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003347 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003348
3349 if allow_fd:
3350 result = fn(fd, *extra_args) # should not fail
3351 if cleanup_fn is not None:
3352 cleanup_fn(result)
3353 else:
3354 with self.assertRaisesRegex(
3355 TypeError,
3356 'os.PathLike'):
3357 fn(fd, *extra_args)
3358
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003359 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003360 msg = r'__fspath__\(\) to return str or bytes, not %s'
3361 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003362 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003363 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003364 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003365 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003366 os.stat(FakePath(object()))
3367
Brett Cannon3f9183b2016-08-26 14:44:48 -07003368
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003369@unittest.skipUnless(hasattr(os, 'get_blocking'),
3370 'needs os.get_blocking() and os.set_blocking()')
3371class BlockingTests(unittest.TestCase):
3372 def test_blocking(self):
3373 fd = os.open(__file__, os.O_RDONLY)
3374 self.addCleanup(os.close, fd)
3375 self.assertEqual(os.get_blocking(fd), True)
3376
3377 os.set_blocking(fd, False)
3378 self.assertEqual(os.get_blocking(fd), False)
3379
3380 os.set_blocking(fd, True)
3381 self.assertEqual(os.get_blocking(fd), True)
3382
3383
Yury Selivanov97e2e062014-09-26 12:33:06 -04003384
3385class ExportsTests(unittest.TestCase):
3386 def test_os_all(self):
3387 self.assertIn('open', os.__all__)
3388 self.assertIn('walk', os.__all__)
3389
3390
Victor Stinner6036e442015-03-08 01:58:04 +01003391class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003392 check_no_resource_warning = support.check_no_resource_warning
3393
Victor Stinner6036e442015-03-08 01:58:04 +01003394 def setUp(self):
3395 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003396 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003397 self.addCleanup(support.rmtree, self.path)
3398 os.mkdir(self.path)
3399
3400 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003401 path = self.bytes_path if isinstance(name, bytes) else self.path
3402 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003403 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003404 return filename
3405
3406 def get_entries(self, names):
3407 entries = dict((entry.name, entry)
3408 for entry in os.scandir(self.path))
3409 self.assertEqual(sorted(entries.keys()), names)
3410 return entries
3411
3412 def assert_stat_equal(self, stat1, stat2, skip_fields):
3413 if skip_fields:
3414 for attr in dir(stat1):
3415 if not attr.startswith("st_"):
3416 continue
3417 if attr in ("st_dev", "st_ino", "st_nlink"):
3418 continue
3419 self.assertEqual(getattr(stat1, attr),
3420 getattr(stat2, attr),
3421 (stat1, stat2, attr))
3422 else:
3423 self.assertEqual(stat1, stat2)
3424
3425 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003426 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003427 self.assertEqual(entry.name, name)
3428 self.assertEqual(entry.path, os.path.join(self.path, name))
3429 self.assertEqual(entry.inode(),
3430 os.stat(entry.path, follow_symlinks=False).st_ino)
3431
3432 entry_stat = os.stat(entry.path)
3433 self.assertEqual(entry.is_dir(),
3434 stat.S_ISDIR(entry_stat.st_mode))
3435 self.assertEqual(entry.is_file(),
3436 stat.S_ISREG(entry_stat.st_mode))
3437 self.assertEqual(entry.is_symlink(),
3438 os.path.islink(entry.path))
3439
3440 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3441 self.assertEqual(entry.is_dir(follow_symlinks=False),
3442 stat.S_ISDIR(entry_lstat.st_mode))
3443 self.assertEqual(entry.is_file(follow_symlinks=False),
3444 stat.S_ISREG(entry_lstat.st_mode))
3445
3446 self.assert_stat_equal(entry.stat(),
3447 entry_stat,
3448 os.name == 'nt' and not is_symlink)
3449 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3450 entry_lstat,
3451 os.name == 'nt')
3452
3453 def test_attributes(self):
3454 link = hasattr(os, 'link')
3455 symlink = support.can_symlink()
3456
3457 dirname = os.path.join(self.path, "dir")
3458 os.mkdir(dirname)
3459 filename = self.create_file("file.txt")
3460 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003461 try:
3462 os.link(filename, os.path.join(self.path, "link_file.txt"))
3463 except PermissionError as e:
3464 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003465 if symlink:
3466 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3467 target_is_directory=True)
3468 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3469
3470 names = ['dir', 'file.txt']
3471 if link:
3472 names.append('link_file.txt')
3473 if symlink:
3474 names.extend(('symlink_dir', 'symlink_file.txt'))
3475 entries = self.get_entries(names)
3476
3477 entry = entries['dir']
3478 self.check_entry(entry, 'dir', True, False, False)
3479
3480 entry = entries['file.txt']
3481 self.check_entry(entry, 'file.txt', False, True, False)
3482
3483 if link:
3484 entry = entries['link_file.txt']
3485 self.check_entry(entry, 'link_file.txt', False, True, False)
3486
3487 if symlink:
3488 entry = entries['symlink_dir']
3489 self.check_entry(entry, 'symlink_dir', True, False, True)
3490
3491 entry = entries['symlink_file.txt']
3492 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3493
3494 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003495 path = self.bytes_path if isinstance(name, bytes) else self.path
3496 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003497 self.assertEqual(len(entries), 1)
3498
3499 entry = entries[0]
3500 self.assertEqual(entry.name, name)
3501 return entry
3502
Brett Cannon96881cd2016-06-10 14:37:21 -07003503 def create_file_entry(self, name='file.txt'):
3504 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003505 return self.get_entry(os.path.basename(filename))
3506
3507 def test_current_directory(self):
3508 filename = self.create_file()
3509 old_dir = os.getcwd()
3510 try:
3511 os.chdir(self.path)
3512
3513 # call scandir() without parameter: it must list the content
3514 # of the current directory
3515 entries = dict((entry.name, entry) for entry in os.scandir())
3516 self.assertEqual(sorted(entries.keys()),
3517 [os.path.basename(filename)])
3518 finally:
3519 os.chdir(old_dir)
3520
3521 def test_repr(self):
3522 entry = self.create_file_entry()
3523 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3524
Brett Cannon96881cd2016-06-10 14:37:21 -07003525 def test_fspath_protocol(self):
3526 entry = self.create_file_entry()
3527 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3528
3529 def test_fspath_protocol_bytes(self):
3530 bytes_filename = os.fsencode('bytesfile.txt')
3531 bytes_entry = self.create_file_entry(name=bytes_filename)
3532 fspath = os.fspath(bytes_entry)
3533 self.assertIsInstance(fspath, bytes)
3534 self.assertEqual(fspath,
3535 os.path.join(os.fsencode(self.path),bytes_filename))
3536
Victor Stinner6036e442015-03-08 01:58:04 +01003537 def test_removed_dir(self):
3538 path = os.path.join(self.path, 'dir')
3539
3540 os.mkdir(path)
3541 entry = self.get_entry('dir')
3542 os.rmdir(path)
3543
3544 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3545 if os.name == 'nt':
3546 self.assertTrue(entry.is_dir())
3547 self.assertFalse(entry.is_file())
3548 self.assertFalse(entry.is_symlink())
3549 if os.name == 'nt':
3550 self.assertRaises(FileNotFoundError, entry.inode)
3551 # don't fail
3552 entry.stat()
3553 entry.stat(follow_symlinks=False)
3554 else:
3555 self.assertGreater(entry.inode(), 0)
3556 self.assertRaises(FileNotFoundError, entry.stat)
3557 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3558
3559 def test_removed_file(self):
3560 entry = self.create_file_entry()
3561 os.unlink(entry.path)
3562
3563 self.assertFalse(entry.is_dir())
3564 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3565 if os.name == 'nt':
3566 self.assertTrue(entry.is_file())
3567 self.assertFalse(entry.is_symlink())
3568 if os.name == 'nt':
3569 self.assertRaises(FileNotFoundError, entry.inode)
3570 # don't fail
3571 entry.stat()
3572 entry.stat(follow_symlinks=False)
3573 else:
3574 self.assertGreater(entry.inode(), 0)
3575 self.assertRaises(FileNotFoundError, entry.stat)
3576 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3577
3578 def test_broken_symlink(self):
3579 if not support.can_symlink():
3580 return self.skipTest('cannot create symbolic link')
3581
3582 filename = self.create_file("file.txt")
3583 os.symlink(filename,
3584 os.path.join(self.path, "symlink.txt"))
3585 entries = self.get_entries(['file.txt', 'symlink.txt'])
3586 entry = entries['symlink.txt']
3587 os.unlink(filename)
3588
3589 self.assertGreater(entry.inode(), 0)
3590 self.assertFalse(entry.is_dir())
3591 self.assertFalse(entry.is_file()) # broken symlink returns False
3592 self.assertFalse(entry.is_dir(follow_symlinks=False))
3593 self.assertFalse(entry.is_file(follow_symlinks=False))
3594 self.assertTrue(entry.is_symlink())
3595 self.assertRaises(FileNotFoundError, entry.stat)
3596 # don't fail
3597 entry.stat(follow_symlinks=False)
3598
3599 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003600 self.create_file("file.txt")
3601
3602 path_bytes = os.fsencode(self.path)
3603 entries = list(os.scandir(path_bytes))
3604 self.assertEqual(len(entries), 1, entries)
3605 entry = entries[0]
3606
3607 self.assertEqual(entry.name, b'file.txt')
3608 self.assertEqual(entry.path,
3609 os.fsencode(os.path.join(self.path, 'file.txt')))
3610
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003611 def test_bytes_like(self):
3612 self.create_file("file.txt")
3613
3614 for cls in bytearray, memoryview:
3615 path_bytes = cls(os.fsencode(self.path))
3616 with self.assertWarns(DeprecationWarning):
3617 entries = list(os.scandir(path_bytes))
3618 self.assertEqual(len(entries), 1, entries)
3619 entry = entries[0]
3620
3621 self.assertEqual(entry.name, b'file.txt')
3622 self.assertEqual(entry.path,
3623 os.fsencode(os.path.join(self.path, 'file.txt')))
3624 self.assertIs(type(entry.name), bytes)
3625 self.assertIs(type(entry.path), bytes)
3626
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003627 @unittest.skipUnless(os.listdir in os.supports_fd,
3628 'fd support for listdir required for this test.')
3629 def test_fd(self):
3630 self.assertIn(os.scandir, os.supports_fd)
3631 self.create_file('file.txt')
3632 expected_names = ['file.txt']
3633 if support.can_symlink():
3634 os.symlink('file.txt', os.path.join(self.path, 'link'))
3635 expected_names.append('link')
3636
3637 fd = os.open(self.path, os.O_RDONLY)
3638 try:
3639 with os.scandir(fd) as it:
3640 entries = list(it)
3641 names = [entry.name for entry in entries]
3642 self.assertEqual(sorted(names), expected_names)
3643 self.assertEqual(names, os.listdir(fd))
3644 for entry in entries:
3645 self.assertEqual(entry.path, entry.name)
3646 self.assertEqual(os.fspath(entry), entry.name)
3647 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3648 if os.stat in os.supports_dir_fd:
3649 st = os.stat(entry.name, dir_fd=fd)
3650 self.assertEqual(entry.stat(), st)
3651 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3652 self.assertEqual(entry.stat(follow_symlinks=False), st)
3653 finally:
3654 os.close(fd)
3655
Victor Stinner6036e442015-03-08 01:58:04 +01003656 def test_empty_path(self):
3657 self.assertRaises(FileNotFoundError, os.scandir, '')
3658
3659 def test_consume_iterator_twice(self):
3660 self.create_file("file.txt")
3661 iterator = os.scandir(self.path)
3662
3663 entries = list(iterator)
3664 self.assertEqual(len(entries), 1, entries)
3665
3666 # check than consuming the iterator twice doesn't raise exception
3667 entries2 = list(iterator)
3668 self.assertEqual(len(entries2), 0, entries2)
3669
3670 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003671 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003672 self.assertRaises(TypeError, os.scandir, obj)
3673
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003674 def test_close(self):
3675 self.create_file("file.txt")
3676 self.create_file("file2.txt")
3677 iterator = os.scandir(self.path)
3678 next(iterator)
3679 iterator.close()
3680 # multiple closes
3681 iterator.close()
3682 with self.check_no_resource_warning():
3683 del iterator
3684
3685 def test_context_manager(self):
3686 self.create_file("file.txt")
3687 self.create_file("file2.txt")
3688 with os.scandir(self.path) as iterator:
3689 next(iterator)
3690 with self.check_no_resource_warning():
3691 del iterator
3692
3693 def test_context_manager_close(self):
3694 self.create_file("file.txt")
3695 self.create_file("file2.txt")
3696 with os.scandir(self.path) as iterator:
3697 next(iterator)
3698 iterator.close()
3699
3700 def test_context_manager_exception(self):
3701 self.create_file("file.txt")
3702 self.create_file("file2.txt")
3703 with self.assertRaises(ZeroDivisionError):
3704 with os.scandir(self.path) as iterator:
3705 next(iterator)
3706 1/0
3707 with self.check_no_resource_warning():
3708 del iterator
3709
3710 def test_resource_warning(self):
3711 self.create_file("file.txt")
3712 self.create_file("file2.txt")
3713 iterator = os.scandir(self.path)
3714 next(iterator)
3715 with self.assertWarns(ResourceWarning):
3716 del iterator
3717 support.gc_collect()
3718 # exhausted iterator
3719 iterator = os.scandir(self.path)
3720 list(iterator)
3721 with self.check_no_resource_warning():
3722 del iterator
3723
Victor Stinner6036e442015-03-08 01:58:04 +01003724
Ethan Furmancdc08792016-06-02 15:06:09 -07003725class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003726
3727 # Abstracted so it can be overridden to test pure Python implementation
3728 # if a C version is provided.
3729 fspath = staticmethod(os.fspath)
3730
Ethan Furmancdc08792016-06-02 15:06:09 -07003731 def test_return_bytes(self):
3732 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003733 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003734
3735 def test_return_string(self):
3736 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003737 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003738
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003739 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003740 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003741 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003742
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003743 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003744 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3745 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3746
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003747 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003748 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3749 self.assertTrue(issubclass(FakePath, os.PathLike))
3750 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003751
Ethan Furmancdc08792016-06-02 15:06:09 -07003752 def test_garbage_in_exception_out(self):
3753 vapor = type('blah', (), {})
3754 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003755 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003756
3757 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003758 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003759
Brett Cannon044283a2016-07-15 10:41:49 -07003760 def test_bad_pathlike(self):
3761 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003762 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003763 # __fspath__ attribute that is not callable.
3764 c = type('foo', (), {})
3765 c.__fspath__ = 1
3766 self.assertRaises(TypeError, self.fspath, c())
3767 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003768 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003769 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003770
Victor Stinnerc29b5852017-11-02 07:28:27 -07003771
3772class TimesTests(unittest.TestCase):
3773 def test_times(self):
3774 times = os.times()
3775 self.assertIsInstance(times, os.times_result)
3776
3777 for field in ('user', 'system', 'children_user', 'children_system',
3778 'elapsed'):
3779 value = getattr(times, field)
3780 self.assertIsInstance(value, float)
3781
3782 if os.name == 'nt':
3783 self.assertEqual(times.children_user, 0)
3784 self.assertEqual(times.children_system, 0)
3785 self.assertEqual(times.elapsed, 0)
3786
3787
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003788# Only test if the C version is provided, otherwise TestPEP519 already tested
3789# the pure Python implementation.
3790if hasattr(os, "_fspath"):
3791 class TestPEP519PurePython(TestPEP519):
3792
3793 """Explicitly test the pure Python implementation of os.fspath()."""
3794
3795 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003796
3797
Fred Drake2e2be372001-09-20 21:33:42 +00003798if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003799 unittest.main()