blob: f127cb199bbb5cb219efbfddb1fa9a0bc3c0b069 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Christian Heimescd9fed62020-11-13 19:48:52 +010018import select
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Christian Heimescd9fed62020-11-13 19:48:52 +010023import struct
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020027import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020028import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020029import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070030import types
Victor Stinner47aacc82015-06-12 17:26:23 +020031import unittest
32import uuid
33import warnings
34from test import support
Hai Shid94af3f2020-08-08 17:32:41 +080035from test.support import import_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080036from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030037from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080038from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080039from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070040from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020041
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042try:
43 import resource
44except ImportError:
45 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010050try:
51 import _winapi
52except ImportError:
53 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020054try:
R David Murrayf2ad1732014-12-25 18:36:56 -050055 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Christian Heimescd9fed62020-11-13 19:48:52 +010064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080066from test.support import unix_shell
67from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
Berker Peksag4af23d72016-09-15 20:32:44 +030087def requires_os_func(name):
88 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
89
90
Victor Stinnerae39d232016-03-24 17:12:55 +010091def create_file(filename, content=b'content'):
92 with open(filename, "xb", 0) as fp:
93 fp.write(content)
94
95
Victor Stinner1de61d32020-11-17 23:08:10 +010096# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
97requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
98 'on AIX, splice() only accepts sockets')
99
100
Victor Stinner689830e2019-06-26 17:31:12 +0200101class MiscTests(unittest.TestCase):
102 def test_getcwd(self):
103 cwd = os.getcwd()
104 self.assertIsInstance(cwd, str)
105
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200106 def test_getcwd_long_path(self):
107 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
108 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
109 # longer path if longer paths support is enabled. Internally, the os
110 # module uses MAXPATHLEN which is at least 1024.
111 #
112 # Use a directory name of 200 characters to fit into Windows MAX_PATH
113 # limit.
114 #
115 # On Windows, the test can stop when trying to create a path longer
116 # than MAX_PATH if long paths support is disabled:
117 # see RtlAreLongPathsEnabled().
118 min_len = 2000 # characters
119 dirlen = 200 # characters
120 dirname = 'python_test_dir_'
121 dirname = dirname + ('a' * (dirlen - len(dirname)))
122
123 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800124 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200125 expected = path
126
127 while True:
128 cwd = os.getcwd()
129 self.assertEqual(cwd, expected)
130
131 need = min_len - (len(cwd) + len(os.path.sep))
132 if need <= 0:
133 break
134 if len(dirname) > need and need > 0:
135 dirname = dirname[:need]
136
137 path = os.path.join(path, dirname)
138 try:
139 os.mkdir(path)
140 # On Windows, chdir() can fail
141 # even if mkdir() succeeded
142 os.chdir(path)
143 except FileNotFoundError:
144 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
145 # ERROR_FILENAME_EXCED_RANGE (206) errors
146 # ("The filename or extension is too long")
147 break
148 except OSError as exc:
149 if exc.errno == errno.ENAMETOOLONG:
150 break
151 else:
152 raise
153
154 expected = path
155
156 if support.verbose:
157 print(f"Tested current directory length: {len(cwd)}")
158
Victor Stinner689830e2019-06-26 17:31:12 +0200159 def test_getcwdb(self):
160 cwd = os.getcwdb()
161 self.assertIsInstance(cwd, bytes)
162 self.assertEqual(os.fsdecode(cwd), os.getcwd())
163
164
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000165# Tests creating TESTFN
166class FileTests(unittest.TestCase):
167 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800168 if os.path.lexists(os_helper.TESTFN):
169 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000170 tearDown = setUp
171
172 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800173 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000174 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800175 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000176
Christian Heimesfdab48e2008-01-20 09:06:41 +0000177 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800178 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000179 # We must allocate two consecutive file descriptors, otherwise
180 # it will mess up other file descriptors (perhaps even the three
181 # standard ones).
182 second = os.dup(first)
183 try:
184 retries = 0
185 while second != first + 1:
186 os.close(first)
187 retries += 1
188 if retries > 10:
189 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000190 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000191 first, second = second, os.dup(second)
192 finally:
193 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000194 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000195 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000196 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000197
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000198 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000199 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800200 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000201 old = sys.getrefcount(path)
202 self.assertRaises(TypeError, os.rename, path, 0)
203 new = sys.getrefcount(path)
204 self.assertEqual(old, new)
205
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000206 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800207 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000208 fobj.write(b"spam")
209 fobj.flush()
210 fd = fobj.fileno()
211 os.lseek(fd, 0, 0)
212 s = os.read(fd, 4)
213 self.assertEqual(type(s), bytes)
214 self.assertEqual(s, b"spam")
215
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200216 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200217 # Skip the test on 32-bit platforms: the number of bytes must fit in a
218 # Py_ssize_t type
219 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
220 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200221 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
222 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800223 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
224 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200225
226 # Issue #21932: Make sure that os.read() does not raise an
227 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800228 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200229 data = os.read(fp.fileno(), size)
230
Victor Stinner8c663fd2017-11-08 14:44:44 -0800231 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200232 # operating system is free to return less bytes than requested.
233 self.assertEqual(data, b'test')
234
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000235 def test_write(self):
236 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800237 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000238 self.assertRaises(TypeError, os.write, fd, "beans")
239 os.write(fd, b"bacon\n")
240 os.write(fd, bytearray(b"eggs\n"))
241 os.write(fd, memoryview(b"spam\n"))
242 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800243 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000244 self.assertEqual(fobj.read().splitlines(),
245 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000246
Victor Stinnere0daff12011-03-20 23:36:35 +0100247 def write_windows_console(self, *args):
248 retcode = subprocess.call(args,
249 # use a new console to not flood the test output
250 creationflags=subprocess.CREATE_NEW_CONSOLE,
251 # use a shell to hide the console window (SW_HIDE)
252 shell=True)
253 self.assertEqual(retcode, 0)
254
255 @unittest.skipUnless(sys.platform == 'win32',
256 'test specific to the Windows console')
257 def test_write_windows_console(self):
258 # Issue #11395: the Windows console returns an error (12: not enough
259 # space error) on writing into stdout if stdout mode is binary and the
260 # length is greater than 66,000 bytes (or less, depending on heap
261 # usage).
262 code = "print('x' * 100000)"
263 self.write_windows_console(sys.executable, "-c", code)
264 self.write_windows_console(sys.executable, "-u", "-c", code)
265
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000266 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800267 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200268 f = os.fdopen(fd, *args)
269 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000270
271 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800272 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200273 os.close(fd)
274
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000275 self.fdopen_helper()
276 self.fdopen_helper('r')
277 self.fdopen_helper('r', 100)
278
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100279 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800280 TESTFN2 = os_helper.TESTFN + ".2"
281 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
282 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100283
Hai Shi0c4f0f32020-06-30 21:46:31 +0800284 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100285 create_file(TESTFN2, b"2")
286
Hai Shi0c4f0f32020-06-30 21:46:31 +0800287 os.replace(os_helper.TESTFN, TESTFN2)
288 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100289 with open(TESTFN2, 'r') as f:
290 self.assertEqual(f.read(), "1")
291
Martin Panterbf19d162015-09-09 01:01:13 +0000292 def test_open_keywords(self):
293 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
294 dir_fd=None)
295 os.close(f)
296
297 def test_symlink_keywords(self):
298 symlink = support.get_attribute(os, "symlink")
299 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800300 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000301 target_is_directory=False, dir_fd=None)
302 except (NotImplementedError, OSError):
303 pass # No OS support or unprivileged user
304
Pablo Galindoaac4d032019-05-31 19:39:47 +0100305 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
306 def test_copy_file_range_invalid_values(self):
307 with self.assertRaises(ValueError):
308 os.copy_file_range(0, 1, -10)
309
310 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
311 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800312 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100313 data = b'0123456789'
314
Hai Shi0c4f0f32020-06-30 21:46:31 +0800315 create_file(os_helper.TESTFN, data)
316 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100317
Hai Shi0c4f0f32020-06-30 21:46:31 +0800318 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100319 self.addCleanup(in_file.close)
320 in_fd = in_file.fileno()
321
322 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800323 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100324 self.addCleanup(out_file.close)
325 out_fd = out_file.fileno()
326
327 try:
328 i = os.copy_file_range(in_fd, out_fd, 5)
329 except OSError as e:
330 # Handle the case in which Python was compiled
331 # in a system with the syscall but without support
332 # in the kernel.
333 if e.errno != errno.ENOSYS:
334 raise
335 self.skipTest(e)
336 else:
337 # The number of copied bytes can be less than
338 # the number of bytes originally requested.
339 self.assertIn(i, range(0, 6));
340
341 with open(TESTFN2, 'rb') as in_file:
342 self.assertEqual(in_file.read(), data[:i])
343
344 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
345 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800346 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100347 data = b'0123456789'
348 bytes_to_copy = 6
349 in_skip = 3
350 out_seek = 5
351
Hai Shi0c4f0f32020-06-30 21:46:31 +0800352 create_file(os_helper.TESTFN, data)
353 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100354
Hai Shi0c4f0f32020-06-30 21:46:31 +0800355 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100356 self.addCleanup(in_file.close)
357 in_fd = in_file.fileno()
358
359 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800360 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100361 self.addCleanup(out_file.close)
362 out_fd = out_file.fileno()
363
364 try:
365 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
366 offset_src=in_skip,
367 offset_dst=out_seek)
368 except OSError as e:
369 # Handle the case in which Python was compiled
370 # in a system with the syscall but without support
371 # in the kernel.
372 if e.errno != errno.ENOSYS:
373 raise
374 self.skipTest(e)
375 else:
376 # The number of copied bytes can be less than
377 # the number of bytes originally requested.
378 self.assertIn(i, range(0, bytes_to_copy+1));
379
380 with open(TESTFN4, 'rb') as in_file:
381 read = in_file.read()
382 # seeked bytes (5) are zero'ed
383 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
384 # 012 are skipped (in_skip)
385 # 345678 are copied in the file (in_skip + bytes_to_copy)
386 self.assertEqual(read[out_seek:],
387 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200388
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000389 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
390 def test_splice_invalid_values(self):
391 with self.assertRaises(ValueError):
392 os.splice(0, 1, -10)
393
394 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100395 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000396 def test_splice(self):
397 TESTFN2 = os_helper.TESTFN + ".3"
398 data = b'0123456789'
399
400 create_file(os_helper.TESTFN, data)
401 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
402
403 in_file = open(os_helper.TESTFN, 'rb')
404 self.addCleanup(in_file.close)
405 in_fd = in_file.fileno()
406
407 read_fd, write_fd = os.pipe()
408 self.addCleanup(lambda: os.close(read_fd))
409 self.addCleanup(lambda: os.close(write_fd))
410
411 try:
412 i = os.splice(in_fd, write_fd, 5)
413 except OSError as e:
414 # Handle the case in which Python was compiled
415 # in a system with the syscall but without support
416 # in the kernel.
417 if e.errno != errno.ENOSYS:
418 raise
419 self.skipTest(e)
420 else:
421 # The number of copied bytes can be less than
422 # the number of bytes originally requested.
423 self.assertIn(i, range(0, 6));
424
425 self.assertEqual(os.read(read_fd, 100), data[:i])
426
427 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100428 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000429 def test_splice_offset_in(self):
430 TESTFN4 = os_helper.TESTFN + ".4"
431 data = b'0123456789'
432 bytes_to_copy = 6
433 in_skip = 3
434
435 create_file(os_helper.TESTFN, data)
436 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
437
438 in_file = open(os_helper.TESTFN, 'rb')
439 self.addCleanup(in_file.close)
440 in_fd = in_file.fileno()
441
442 read_fd, write_fd = os.pipe()
443 self.addCleanup(lambda: os.close(read_fd))
444 self.addCleanup(lambda: os.close(write_fd))
445
446 try:
447 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
448 except OSError as e:
449 # Handle the case in which Python was compiled
450 # in a system with the syscall but without support
451 # in the kernel.
452 if e.errno != errno.ENOSYS:
453 raise
454 self.skipTest(e)
455 else:
456 # The number of copied bytes can be less than
457 # the number of bytes originally requested.
458 self.assertIn(i, range(0, bytes_to_copy+1));
459
460 read = os.read(read_fd, 100)
461 # 012 are skipped (in_skip)
462 # 345678 are copied in the file (in_skip + bytes_to_copy)
463 self.assertEqual(read, data[in_skip:in_skip+i])
464
465 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100466 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000467 def test_splice_offset_out(self):
468 TESTFN4 = os_helper.TESTFN + ".4"
469 data = b'0123456789'
470 bytes_to_copy = 6
471 out_seek = 3
472
473 create_file(os_helper.TESTFN, data)
474 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
475
476 read_fd, write_fd = os.pipe()
477 self.addCleanup(lambda: os.close(read_fd))
478 self.addCleanup(lambda: os.close(write_fd))
479 os.write(write_fd, data)
480
481 out_file = open(TESTFN4, 'w+b')
482 self.addCleanup(os_helper.unlink, TESTFN4)
483 self.addCleanup(out_file.close)
484 out_fd = out_file.fileno()
485
486 try:
487 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
488 except OSError as e:
489 # Handle the case in which Python was compiled
490 # in a system with the syscall but without support
491 # in the kernel.
492 if e.errno != errno.ENOSYS:
493 raise
494 self.skipTest(e)
495 else:
496 # The number of copied bytes can be less than
497 # the number of bytes originally requested.
498 self.assertIn(i, range(0, bytes_to_copy+1));
499
500 with open(TESTFN4, 'rb') as in_file:
501 read = in_file.read()
502 # seeked bytes (5) are zero'ed
503 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
504 # 012 are skipped (in_skip)
505 # 345678 are copied in the file (in_skip + bytes_to_copy)
506 self.assertEqual(read[out_seek:], data[:i])
507
508
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000509# Test attributes on return values from os.*stat* family.
510class StatAttributeTests(unittest.TestCase):
511 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800512 self.fname = os_helper.TESTFN
513 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100514 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000515
Antoine Pitrou38425292010-09-21 18:19:07 +0000516 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000517 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000518
519 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000520 self.assertEqual(result[stat.ST_SIZE], 3)
521 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000522
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000523 # Make sure all the attributes are there
524 members = dir(result)
525 for name in dir(stat):
526 if name[:3] == 'ST_':
527 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000528 if name.endswith("TIME"):
529 def trunc(x): return int(x)
530 else:
531 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000532 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000533 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000534 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000535
Larry Hastings6fe20b32012-04-19 15:07:49 -0700536 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700537 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700538 for name in 'st_atime st_mtime st_ctime'.split():
539 floaty = int(getattr(result, name) * 100000)
540 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700541 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700542
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000543 try:
544 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200545 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000546 except IndexError:
547 pass
548
549 # Make sure that assignment fails
550 try:
551 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200552 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000553 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000554 pass
555
556 try:
557 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200558 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000559 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000560 pass
561
562 try:
563 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200564 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000565 except AttributeError:
566 pass
567
568 # Use the stat_result constructor with a too-short tuple.
569 try:
570 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200571 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000572 except TypeError:
573 pass
574
Ezio Melotti42da6632011-03-15 05:18:48 +0200575 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000576 try:
577 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
578 except TypeError:
579 pass
580
Antoine Pitrou38425292010-09-21 18:19:07 +0000581 def test_stat_attributes(self):
582 self.check_stat_attributes(self.fname)
583
584 def test_stat_attributes_bytes(self):
585 try:
586 fname = self.fname.encode(sys.getfilesystemencoding())
587 except UnicodeEncodeError:
588 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700589 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000590
Christian Heimes25827622013-10-12 01:27:08 +0200591 def test_stat_result_pickle(self):
592 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200593 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
594 p = pickle.dumps(result, proto)
595 self.assertIn(b'stat_result', p)
596 if proto < 4:
597 self.assertIn(b'cos\nstat_result\n', p)
598 unpickled = pickle.loads(p)
599 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200600
Serhiy Storchaka43767632013-11-03 21:31:38 +0200601 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000602 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700603 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000604
605 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000606 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000607
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000608 # Make sure all the attributes are there.
609 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
610 'ffree', 'favail', 'flag', 'namemax')
611 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000612 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000613
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100614 self.assertTrue(isinstance(result.f_fsid, int))
615
616 # Test that the size of the tuple doesn't change
617 self.assertEqual(len(result), 10)
618
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000619 # Make sure that assignment really fails
620 try:
621 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200622 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000623 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000624 pass
625
626 try:
627 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200628 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000629 except AttributeError:
630 pass
631
632 # Use the constructor with a too-short tuple.
633 try:
634 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200635 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000636 except TypeError:
637 pass
638
Ezio Melotti42da6632011-03-15 05:18:48 +0200639 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000640 try:
641 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
642 except TypeError:
643 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000644
Christian Heimes25827622013-10-12 01:27:08 +0200645 @unittest.skipUnless(hasattr(os, 'statvfs'),
646 "need os.statvfs()")
647 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700648 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200649
Serhiy Storchakabad12572014-12-15 14:03:42 +0200650 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
651 p = pickle.dumps(result, proto)
652 self.assertIn(b'statvfs_result', p)
653 if proto < 4:
654 self.assertIn(b'cos\nstatvfs_result\n', p)
655 unpickled = pickle.loads(p)
656 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200657
Serhiy Storchaka43767632013-11-03 21:31:38 +0200658 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
659 def test_1686475(self):
660 # Verify that an open file can be stat'ed
661 try:
662 os.stat(r"c:\pagefile.sys")
663 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600664 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200665 except OSError as e:
666 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000667
Serhiy Storchaka43767632013-11-03 21:31:38 +0200668 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
669 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
670 def test_15261(self):
671 # Verify that stat'ing a closed fd does not cause crash
672 r, w = os.pipe()
673 try:
674 os.stat(r) # should not raise error
675 finally:
676 os.close(r)
677 os.close(w)
678 with self.assertRaises(OSError) as ctx:
679 os.stat(r)
680 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100681
Zachary Ware63f277b2014-06-19 09:46:37 -0500682 def check_file_attributes(self, result):
683 self.assertTrue(hasattr(result, 'st_file_attributes'))
684 self.assertTrue(isinstance(result.st_file_attributes, int))
685 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
686
687 @unittest.skipUnless(sys.platform == "win32",
688 "st_file_attributes is Win32 specific")
689 def test_file_attributes(self):
690 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
691 result = os.stat(self.fname)
692 self.check_file_attributes(result)
693 self.assertEqual(
694 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
695 0)
696
697 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800698 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200699 os.mkdir(dirname)
700 self.addCleanup(os.rmdir, dirname)
701
702 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500703 self.check_file_attributes(result)
704 self.assertEqual(
705 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
706 stat.FILE_ATTRIBUTE_DIRECTORY)
707
Berker Peksag0b4dc482016-09-17 15:49:59 +0300708 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
709 def test_access_denied(self):
710 # Default to FindFirstFile WIN32_FIND_DATA when access is
711 # denied. See issue 28075.
712 # os.environ['TEMP'] should be located on a volume that
713 # supports file ACLs.
714 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800715 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300716 create_file(fname, b'ABC')
717 # Deny the right to [S]YNCHRONIZE on the file to
718 # force CreateFile to fail with ERROR_ACCESS_DENIED.
719 DETACHED_PROCESS = 8
720 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500721 # bpo-30584: Use security identifier *S-1-5-32-545 instead
722 # of localized "Users" to not depend on the locale.
723 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300724 creationflags=DETACHED_PROCESS
725 )
726 result = os.stat(fname)
727 self.assertNotEqual(result.st_size, 0)
728
Steve Dower772ec0f2019-09-04 14:42:54 -0700729 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
730 def test_stat_block_device(self):
731 # bpo-38030: os.stat fails for block devices
732 # Test a filename like "//./C:"
733 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
734 result = os.stat(fname)
735 self.assertEqual(result.st_mode, stat.S_IFBLK)
736
Victor Stinner47aacc82015-06-12 17:26:23 +0200737
738class UtimeTests(unittest.TestCase):
739 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800740 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200741 self.fname = os.path.join(self.dirname, "f1")
742
Hai Shi0c4f0f32020-06-30 21:46:31 +0800743 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200744 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100745 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200746
Victor Stinner47aacc82015-06-12 17:26:23 +0200747 def support_subsecond(self, filename):
748 # Heuristic to check if the filesystem supports timestamp with
749 # subsecond resolution: check if float and int timestamps are different
750 st = os.stat(filename)
751 return ((st.st_atime != st[7])
752 or (st.st_mtime != st[8])
753 or (st.st_ctime != st[9]))
754
755 def _test_utime(self, set_time, filename=None):
756 if not filename:
757 filename = self.fname
758
759 support_subsecond = self.support_subsecond(filename)
760 if support_subsecond:
761 # Timestamp with a resolution of 1 microsecond (10^-6).
762 #
763 # The resolution of the C internal function used by os.utime()
764 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
765 # test with a resolution of 1 ns requires more work:
766 # see the issue #15745.
767 atime_ns = 1002003000 # 1.002003 seconds
768 mtime_ns = 4005006000 # 4.005006 seconds
769 else:
770 # use a resolution of 1 second
771 atime_ns = 5 * 10**9
772 mtime_ns = 8 * 10**9
773
774 set_time(filename, (atime_ns, mtime_ns))
775 st = os.stat(filename)
776
777 if support_subsecond:
778 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
779 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
780 else:
781 self.assertEqual(st.st_atime, atime_ns * 1e-9)
782 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
783 self.assertEqual(st.st_atime_ns, atime_ns)
784 self.assertEqual(st.st_mtime_ns, mtime_ns)
785
786 def test_utime(self):
787 def set_time(filename, ns):
788 # test the ns keyword parameter
789 os.utime(filename, ns=ns)
790 self._test_utime(set_time)
791
792 @staticmethod
793 def ns_to_sec(ns):
794 # Convert a number of nanosecond (int) to a number of seconds (float).
795 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
796 # issue, os.utime() rounds towards minus infinity.
797 return (ns * 1e-9) + 0.5e-9
798
799 def test_utime_by_indexed(self):
800 # pass times as floating point seconds as the second indexed parameter
801 def set_time(filename, ns):
802 atime_ns, mtime_ns = ns
803 atime = self.ns_to_sec(atime_ns)
804 mtime = self.ns_to_sec(mtime_ns)
805 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
806 # or utime(time_t)
807 os.utime(filename, (atime, mtime))
808 self._test_utime(set_time)
809
810 def test_utime_by_times(self):
811 def set_time(filename, ns):
812 atime_ns, mtime_ns = ns
813 atime = self.ns_to_sec(atime_ns)
814 mtime = self.ns_to_sec(mtime_ns)
815 # test the times keyword parameter
816 os.utime(filename, times=(atime, mtime))
817 self._test_utime(set_time)
818
819 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
820 "follow_symlinks support for utime required "
821 "for this test.")
822 def test_utime_nofollow_symlinks(self):
823 def set_time(filename, ns):
824 # use follow_symlinks=False to test utimensat(timespec)
825 # or lutimes(timeval)
826 os.utime(filename, ns=ns, follow_symlinks=False)
827 self._test_utime(set_time)
828
829 @unittest.skipUnless(os.utime in os.supports_fd,
830 "fd support for utime required for this test.")
831 def test_utime_fd(self):
832 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100833 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200834 # use a file descriptor to test futimens(timespec)
835 # or futimes(timeval)
836 os.utime(fp.fileno(), ns=ns)
837 self._test_utime(set_time)
838
839 @unittest.skipUnless(os.utime in os.supports_dir_fd,
840 "dir_fd support for utime required for this test.")
841 def test_utime_dir_fd(self):
842 def set_time(filename, ns):
843 dirname, name = os.path.split(filename)
844 dirfd = os.open(dirname, os.O_RDONLY)
845 try:
846 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
847 os.utime(name, dir_fd=dirfd, ns=ns)
848 finally:
849 os.close(dirfd)
850 self._test_utime(set_time)
851
852 def test_utime_directory(self):
853 def set_time(filename, ns):
854 # test calling os.utime() on a directory
855 os.utime(filename, ns=ns)
856 self._test_utime(set_time, filename=self.dirname)
857
858 def _test_utime_current(self, set_time):
859 # Get the system clock
860 current = time.time()
861
862 # Call os.utime() to set the timestamp to the current system clock
863 set_time(self.fname)
864
865 if not self.support_subsecond(self.fname):
866 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700867 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200868 # On Windows, the usual resolution of time.time() is 15.6 ms.
869 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700870 #
871 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
872 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200873 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200874 st = os.stat(self.fname)
875 msg = ("st_time=%r, current=%r, dt=%r"
876 % (st.st_mtime, current, st.st_mtime - current))
877 self.assertAlmostEqual(st.st_mtime, current,
878 delta=delta, msg=msg)
879
880 def test_utime_current(self):
881 def set_time(filename):
882 # Set to the current time in the new way
883 os.utime(self.fname)
884 self._test_utime_current(set_time)
885
886 def test_utime_current_old(self):
887 def set_time(filename):
888 # Set to the current time in the old explicit way.
889 os.utime(self.fname, None)
890 self._test_utime_current(set_time)
891
892 def get_file_system(self, path):
893 if sys.platform == 'win32':
894 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
895 import ctypes
896 kernel32 = ctypes.windll.kernel32
897 buf = ctypes.create_unicode_buffer("", 100)
898 ok = kernel32.GetVolumeInformationW(root, None, 0,
899 None, None, None,
900 buf, len(buf))
901 if ok:
902 return buf.value
903 # return None if the filesystem is unknown
904
905 def test_large_time(self):
906 # Many filesystems are limited to the year 2038. At least, the test
907 # pass with NTFS filesystem.
908 if self.get_file_system(self.dirname) != "NTFS":
909 self.skipTest("requires NTFS")
910
911 large = 5000000000 # some day in 2128
912 os.utime(self.fname, (large, large))
913 self.assertEqual(os.stat(self.fname).st_mtime, large)
914
915 def test_utime_invalid_arguments(self):
916 # seconds and nanoseconds parameters are mutually exclusive
917 with self.assertRaises(ValueError):
918 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200919 with self.assertRaises(TypeError):
920 os.utime(self.fname, [5, 5])
921 with self.assertRaises(TypeError):
922 os.utime(self.fname, (5,))
923 with self.assertRaises(TypeError):
924 os.utime(self.fname, (5, 5, 5))
925 with self.assertRaises(TypeError):
926 os.utime(self.fname, ns=[5, 5])
927 with self.assertRaises(TypeError):
928 os.utime(self.fname, ns=(5,))
929 with self.assertRaises(TypeError):
930 os.utime(self.fname, ns=(5, 5, 5))
931
932 if os.utime not in os.supports_follow_symlinks:
933 with self.assertRaises(NotImplementedError):
934 os.utime(self.fname, (5, 5), follow_symlinks=False)
935 if os.utime not in os.supports_fd:
936 with open(self.fname, 'wb', 0) as fp:
937 with self.assertRaises(TypeError):
938 os.utime(fp.fileno(), (5, 5))
939 if os.utime not in os.supports_dir_fd:
940 with self.assertRaises(NotImplementedError):
941 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200942
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300943 @support.cpython_only
944 def test_issue31577(self):
945 # The interpreter shouldn't crash in case utime() received a bad
946 # ns argument.
947 def get_bad_int(divmod_ret_val):
948 class BadInt:
949 def __divmod__(*args):
950 return divmod_ret_val
951 return BadInt()
952 with self.assertRaises(TypeError):
953 os.utime(self.fname, ns=(get_bad_int(42), 1))
954 with self.assertRaises(TypeError):
955 os.utime(self.fname, ns=(get_bad_int(()), 1))
956 with self.assertRaises(TypeError):
957 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
958
Victor Stinner47aacc82015-06-12 17:26:23 +0200959
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000960from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000961
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000962class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000963 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000964 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000965
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000966 def setUp(self):
967 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000968 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000969 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000970 for key, value in self._reference().items():
971 os.environ[key] = value
972
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000973 def tearDown(self):
974 os.environ.clear()
975 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000976 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000977 os.environb.clear()
978 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000979
Christian Heimes90333392007-11-01 19:08:42 +0000980 def _reference(self):
981 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
982
983 def _empty_mapping(self):
984 os.environ.clear()
985 return os.environ
986
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000987 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200988 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
989 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000990 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000991 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300992 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200993 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300994 value = popen.read().strip()
995 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000996
Xavier de Gayed1415312016-07-22 12:15:29 +0200997 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
998 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000999 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +02001000 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
1001 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +03001002 it = iter(popen)
1003 self.assertEqual(next(it), "line1\n")
1004 self.assertEqual(next(it), "line2\n")
1005 self.assertEqual(next(it), "line3\n")
1006 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +00001007
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001008 # Verify environ keys and values from the OS are of the
1009 # correct str type.
1010 def test_keyvalue_types(self):
1011 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001012 self.assertEqual(type(key), str)
1013 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001014
Christian Heimes90333392007-11-01 19:08:42 +00001015 def test_items(self):
1016 for key, value in self._reference().items():
1017 self.assertEqual(os.environ.get(key), value)
1018
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001019 # Issue 7310
1020 def test___repr__(self):
1021 """Check that the repr() of os.environ looks like environ({...})."""
1022 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +00001023 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
1024 '{!r}: {!r}'.format(key, value)
1025 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001026
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +00001027 def test_get_exec_path(self):
1028 defpath_list = os.defpath.split(os.pathsep)
1029 test_path = ['/monty', '/python', '', '/flying/circus']
1030 test_env = {'PATH': os.pathsep.join(test_path)}
1031
1032 saved_environ = os.environ
1033 try:
1034 os.environ = dict(test_env)
1035 # Test that defaulting to os.environ works.
1036 self.assertSequenceEqual(test_path, os.get_exec_path())
1037 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1038 finally:
1039 os.environ = saved_environ
1040
1041 # No PATH environment variable
1042 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1043 # Empty PATH environment variable
1044 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1045 # Supplied PATH environment variable
1046 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1047
Victor Stinnerb745a742010-05-18 17:17:23 +00001048 if os.supports_bytes_environ:
1049 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +00001050 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001051 # ignore BytesWarning warning
1052 with warnings.catch_warnings(record=True):
1053 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +00001054 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001055 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +00001056 pass
1057 else:
1058 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +00001059
1060 # bytes key and/or value
1061 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1062 ['abc'])
1063 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1064 ['abc'])
1065 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1066 ['abc'])
1067
1068 @unittest.skipUnless(os.supports_bytes_environ,
1069 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +00001070 def test_environb(self):
1071 # os.environ -> os.environb
1072 value = 'euro\u20ac'
1073 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001074 value_bytes = value.encode(sys.getfilesystemencoding(),
1075 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +00001076 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001077 msg = "U+20AC character is not encodable to %s" % (
1078 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +00001079 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +00001080 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001081 self.assertEqual(os.environ['unicode'], value)
1082 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +00001083
1084 # os.environb -> os.environ
1085 value = b'\xff'
1086 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +00001088 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001089 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001090
Victor Stinner161e7b32020-01-24 11:53:44 +01001091 def test_putenv_unsetenv(self):
1092 name = "PYTHONTESTVAR"
1093 value = "testvalue"
1094 code = f'import os; print(repr(os.environ.get({name!r})))'
1095
Hai Shi0c4f0f32020-06-30 21:46:31 +08001096 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +01001097 env.pop(name, None)
1098
1099 os.putenv(name, value)
1100 proc = subprocess.run([sys.executable, '-c', code], check=True,
1101 stdout=subprocess.PIPE, text=True)
1102 self.assertEqual(proc.stdout.rstrip(), repr(value))
1103
1104 os.unsetenv(name)
1105 proc = subprocess.run([sys.executable, '-c', code], check=True,
1106 stdout=subprocess.PIPE, text=True)
1107 self.assertEqual(proc.stdout.rstrip(), repr(None))
1108
Victor Stinner13ff2452018-01-22 18:32:50 +01001109 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +01001110 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +01001111 def test_putenv_unsetenv_error(self):
1112 # Empty variable name is invalid.
1113 # "=" and null character are not allowed in a variable name.
1114 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1115 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1116 self.assertRaises((OSError, ValueError), os.unsetenv, name)
1117
Victor Stinnerb73dd022020-01-22 21:11:17 +01001118 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +01001119 # On Windows, an environment variable string ("name=value" string)
1120 # is limited to 32,767 characters
1121 longstr = 'x' * 32_768
1122 self.assertRaises(ValueError, os.putenv, longstr, "1")
1123 self.assertRaises(ValueError, os.putenv, "X", longstr)
1124 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +01001125
Victor Stinner6d101392013-04-14 16:35:04 +02001126 def test_key_type(self):
1127 missing = 'missingkey'
1128 self.assertNotIn(missing, os.environ)
1129
Victor Stinner839e5ea2013-04-14 16:43:03 +02001130 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001131 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001132 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001133 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001134
Victor Stinner839e5ea2013-04-14 16:43:03 +02001135 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001136 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001137 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001138 self.assertTrue(cm.exception.__suppress_context__)
1139
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001140 def _test_environ_iteration(self, collection):
1141 iterator = iter(collection)
1142 new_key = "__new_key__"
1143
1144 next(iterator) # start iteration over os.environ.items
1145
1146 # add a new key in os.environ mapping
1147 os.environ[new_key] = "test_environ_iteration"
1148
1149 try:
1150 next(iterator) # force iteration over modified mapping
1151 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1152 finally:
1153 del os.environ[new_key]
1154
1155 def test_iter_error_when_changing_os_environ(self):
1156 self._test_environ_iteration(os.environ)
1157
1158 def test_iter_error_when_changing_os_environ_items(self):
1159 self._test_environ_iteration(os.environ.items())
1160
1161 def test_iter_error_when_changing_os_environ_values(self):
1162 self._test_environ_iteration(os.environ.values())
1163
Charles Burklandd648ef12020-03-13 09:04:43 -07001164 def _test_underlying_process_env(self, var, expected):
1165 if not (unix_shell and os.path.exists(unix_shell)):
1166 return
1167
1168 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1169 value = popen.read().strip()
1170
1171 self.assertEqual(expected, value)
1172
1173 def test_or_operator(self):
1174 overridden_key = '_TEST_VAR_'
1175 original_value = 'original_value'
1176 os.environ[overridden_key] = original_value
1177
1178 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1179 expected = dict(os.environ)
1180 expected.update(new_vars_dict)
1181
1182 actual = os.environ | new_vars_dict
1183 self.assertDictEqual(expected, actual)
1184 self.assertEqual('3', actual[overridden_key])
1185
1186 new_vars_items = new_vars_dict.items()
1187 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1188
1189 self._test_underlying_process_env('_A_', '')
1190 self._test_underlying_process_env(overridden_key, original_value)
1191
1192 def test_ior_operator(self):
1193 overridden_key = '_TEST_VAR_'
1194 os.environ[overridden_key] = 'original_value'
1195
1196 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1197 expected = dict(os.environ)
1198 expected.update(new_vars_dict)
1199
1200 os.environ |= new_vars_dict
1201 self.assertEqual(expected, os.environ)
1202 self.assertEqual('3', os.environ[overridden_key])
1203
1204 self._test_underlying_process_env('_A_', '1')
1205 self._test_underlying_process_env(overridden_key, '3')
1206
1207 def test_ior_operator_invalid_dicts(self):
1208 os_environ_copy = os.environ.copy()
1209 with self.assertRaises(TypeError):
1210 dict_with_bad_key = {1: '_A_'}
1211 os.environ |= dict_with_bad_key
1212
1213 with self.assertRaises(TypeError):
1214 dict_with_bad_val = {'_A_': 1}
1215 os.environ |= dict_with_bad_val
1216
1217 # Check nothing was added.
1218 self.assertEqual(os_environ_copy, os.environ)
1219
1220 def test_ior_operator_key_value_iterable(self):
1221 overridden_key = '_TEST_VAR_'
1222 os.environ[overridden_key] = 'original_value'
1223
1224 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1225 expected = dict(os.environ)
1226 expected.update(new_vars_items)
1227
1228 os.environ |= new_vars_items
1229 self.assertEqual(expected, os.environ)
1230 self.assertEqual('3', os.environ[overridden_key])
1231
1232 self._test_underlying_process_env('_A_', '1')
1233 self._test_underlying_process_env(overridden_key, '3')
1234
1235 def test_ror_operator(self):
1236 overridden_key = '_TEST_VAR_'
1237 original_value = 'original_value'
1238 os.environ[overridden_key] = original_value
1239
1240 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1241 expected = dict(new_vars_dict)
1242 expected.update(os.environ)
1243
1244 actual = new_vars_dict | os.environ
1245 self.assertDictEqual(expected, actual)
1246 self.assertEqual(original_value, actual[overridden_key])
1247
1248 new_vars_items = new_vars_dict.items()
1249 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1250
1251 self._test_underlying_process_env('_A_', '')
1252 self._test_underlying_process_env(overridden_key, original_value)
1253
Victor Stinner6d101392013-04-14 16:35:04 +02001254
Tim Petersc4e09402003-04-25 07:11:48 +00001255class WalkTests(unittest.TestCase):
1256 """Tests for os.walk()."""
1257
Victor Stinner0561c532015-03-12 10:28:24 +01001258 # Wrapper to hide minor differences between os.walk and os.fwalk
1259 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001260 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001261 if 'follow_symlinks' in kwargs:
1262 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001263 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001264
Charles-François Natali7372b062012-02-05 15:15:38 +01001265 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001266 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001267 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001268
1269 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001270 # TESTFN/
1271 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001272 # tmp1
1273 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001274 # tmp2
1275 # SUB11/ no kids
1276 # SUB2/ a file kid and a dirsymlink kid
1277 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001278 # SUB21/ not readable
1279 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001280 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001281 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001282 # broken_link2
1283 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001284 # TEST2/
1285 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001286 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001287 self.sub1_path = join(self.walk_path, "SUB1")
1288 self.sub11_path = join(self.sub1_path, "SUB11")
1289 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001290 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001291 tmp1_path = join(self.walk_path, "tmp1")
1292 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001293 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001294 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001295 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001296 t2_path = join(os_helper.TESTFN, "TEST2")
1297 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001298 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001299 broken_link2_path = join(sub2_path, "broken_link2")
1300 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001301
1302 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001303 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001304 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001305 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001306 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001307
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001308 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001309 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001310 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001311
Hai Shi0c4f0f32020-06-30 21:46:31 +08001312 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001313 os.symlink(os.path.abspath(t2_path), self.link_path)
1314 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001315 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1316 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001317 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001318 ["broken_link", "broken_link2", "broken_link3",
1319 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001320 else:
pxinwr3e028b22019-02-15 13:04:47 +08001321 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001322
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001323 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001324 try:
1325 os.listdir(sub21_path)
1326 except PermissionError:
1327 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1328 else:
1329 os.chmod(sub21_path, stat.S_IRWXU)
1330 os.unlink(tmp5_path)
1331 os.rmdir(sub21_path)
1332 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001333
Victor Stinner0561c532015-03-12 10:28:24 +01001334 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001335 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001336 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001337
Tim Petersc4e09402003-04-25 07:11:48 +00001338 self.assertEqual(len(all), 4)
1339 # We can't know which order SUB1 and SUB2 will appear in.
1340 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1341 # flipped: TESTFN, SUB2, SUB1, SUB11
1342 flipped = all[0][1][0] != "SUB1"
1343 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001344 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001345 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001346 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1347 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1348 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1349 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001350
Brett Cannon3f9183b2016-08-26 14:44:48 -07001351 def test_walk_prune(self, walk_path=None):
1352 if walk_path is None:
1353 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001354 # Prune the search.
1355 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001356 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001357 all.append((root, dirs, files))
1358 # Don't descend into SUB1.
1359 if 'SUB1' in dirs:
1360 # Note that this also mutates the dirs we appended to all!
1361 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001362
Victor Stinner0561c532015-03-12 10:28:24 +01001363 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001364 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001365
1366 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001367 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001368 self.assertEqual(all[1], self.sub2_tree)
1369
Brett Cannon3f9183b2016-08-26 14:44:48 -07001370 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001371 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001372
Victor Stinner0561c532015-03-12 10:28:24 +01001373 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001374 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001375 all = list(self.walk(self.walk_path, topdown=False))
1376
Victor Stinner53b0a412016-03-26 01:12:36 +01001377 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001378 # We can't know which order SUB1 and SUB2 will appear in.
1379 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1380 # flipped: SUB2, SUB11, SUB1, TESTFN
1381 flipped = all[3][1][0] != "SUB1"
1382 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001383 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001384 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001385 self.assertEqual(all[3],
1386 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1387 self.assertEqual(all[flipped],
1388 (self.sub11_path, [], []))
1389 self.assertEqual(all[flipped + 1],
1390 (self.sub1_path, ["SUB11"], ["tmp2"]))
1391 self.assertEqual(all[2 - 2 * flipped],
1392 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001393
Victor Stinner0561c532015-03-12 10:28:24 +01001394 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001395 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001396 self.skipTest("need symlink support")
1397
1398 # Walk, following symlinks.
1399 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1400 for root, dirs, files in walk_it:
1401 if root == self.link_path:
1402 self.assertEqual(dirs, [])
1403 self.assertEqual(files, ["tmp4"])
1404 break
1405 else:
1406 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001407
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001408 def test_walk_bad_dir(self):
1409 # Walk top-down.
1410 errors = []
1411 walk_it = self.walk(self.walk_path, onerror=errors.append)
1412 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001413 self.assertEqual(errors, [])
1414 dir1 = 'SUB1'
1415 path1 = os.path.join(root, dir1)
1416 path1new = os.path.join(root, dir1 + '.new')
1417 os.rename(path1, path1new)
1418 try:
1419 roots = [r for r, d, f in walk_it]
1420 self.assertTrue(errors)
1421 self.assertNotIn(path1, roots)
1422 self.assertNotIn(path1new, roots)
1423 for dir2 in dirs:
1424 if dir2 != dir1:
1425 self.assertIn(os.path.join(root, dir2), roots)
1426 finally:
1427 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001428
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001429 def test_walk_many_open_files(self):
1430 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001431 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001432 p = os.path.join(base, *(['d']*depth))
1433 os.makedirs(p)
1434
1435 iters = [self.walk(base, topdown=False) for j in range(100)]
1436 for i in range(depth + 1):
1437 expected = (p, ['d'] if i else [], [])
1438 for it in iters:
1439 self.assertEqual(next(it), expected)
1440 p = os.path.dirname(p)
1441
1442 iters = [self.walk(base, topdown=True) for j in range(100)]
1443 p = base
1444 for i in range(depth + 1):
1445 expected = (p, ['d'] if i < depth else [], [])
1446 for it in iters:
1447 self.assertEqual(next(it), expected)
1448 p = os.path.join(p, 'd')
1449
Charles-François Natali7372b062012-02-05 15:15:38 +01001450
1451@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1452class FwalkTests(WalkTests):
1453 """Tests for os.fwalk()."""
1454
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001455 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001456 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001457 yield (root, dirs, files)
1458
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001459 def fwalk(self, *args, **kwargs):
1460 return os.fwalk(*args, **kwargs)
1461
Larry Hastingsc48fe982012-06-25 04:49:05 -07001462 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1463 """
1464 compare with walk() results.
1465 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001466 walk_kwargs = walk_kwargs.copy()
1467 fwalk_kwargs = fwalk_kwargs.copy()
1468 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1469 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1470 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001471
Charles-François Natali7372b062012-02-05 15:15:38 +01001472 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001473 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001474 expected[root] = (set(dirs), set(files))
1475
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001476 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001477 self.assertIn(root, expected)
1478 self.assertEqual(expected[root], (set(dirs), set(files)))
1479
Larry Hastingsc48fe982012-06-25 04:49:05 -07001480 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001481 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001482 self._compare_to_walk(kwargs, kwargs)
1483
Charles-François Natali7372b062012-02-05 15:15:38 +01001484 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001485 try:
1486 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001487 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001488 fwalk_kwargs = walk_kwargs.copy()
1489 fwalk_kwargs['dir_fd'] = fd
1490 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1491 finally:
1492 os.close(fd)
1493
1494 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001495 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001496 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001497 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001498 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001499 # check that the FD is valid
1500 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001501 # redundant check
1502 os.stat(rootfd)
1503 # check that listdir() returns consistent information
1504 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001505
1506 def test_fd_leak(self):
1507 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1508 # we both check that calling fwalk() a large number of times doesn't
1509 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1510 minfd = os.dup(1)
1511 os.close(minfd)
1512 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001513 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001514 pass
1515 newfd = os.dup(1)
1516 self.addCleanup(os.close, newfd)
1517 self.assertEqual(newfd, minfd)
1518
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001519 # fwalk() keeps file descriptors open
1520 test_walk_many_open_files = None
1521
1522
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001523class BytesWalkTests(WalkTests):
1524 """Tests for os.walk() with bytes."""
1525 def walk(self, top, **kwargs):
1526 if 'follow_symlinks' in kwargs:
1527 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1528 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1529 root = os.fsdecode(broot)
1530 dirs = list(map(os.fsdecode, bdirs))
1531 files = list(map(os.fsdecode, bfiles))
1532 yield (root, dirs, files)
1533 bdirs[:] = list(map(os.fsencode, dirs))
1534 bfiles[:] = list(map(os.fsencode, files))
1535
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001536@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1537class BytesFwalkTests(FwalkTests):
1538 """Tests for os.walk() with bytes."""
1539 def fwalk(self, top='.', *args, **kwargs):
1540 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1541 root = os.fsdecode(broot)
1542 dirs = list(map(os.fsdecode, bdirs))
1543 files = list(map(os.fsdecode, bfiles))
1544 yield (root, dirs, files, topfd)
1545 bdirs[:] = list(map(os.fsencode, dirs))
1546 bfiles[:] = list(map(os.fsencode, files))
1547
Charles-François Natali7372b062012-02-05 15:15:38 +01001548
Guido van Rossume7ba4952007-06-06 23:52:48 +00001549class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001550 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001551 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001552
1553 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001554 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001555 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1556 os.makedirs(path) # Should work
1557 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1558 os.makedirs(path)
1559
1560 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001561 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001562 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1563 os.makedirs(path)
1564 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1565 'dir5', 'dir6')
1566 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001567
Serhiy Storchakae304e332017-03-24 13:27:42 +02001568 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001569 with os_helper.temp_umask(0o002):
1570 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001571 parent = os.path.join(base, 'dir1')
1572 path = os.path.join(parent, 'dir2')
1573 os.makedirs(path, 0o555)
1574 self.assertTrue(os.path.exists(path))
1575 self.assertTrue(os.path.isdir(path))
1576 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001577 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1578 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001579
Terry Reedy5a22b652010-12-02 07:05:56 +00001580 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001581 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001582 mode = 0o777
1583 old_mask = os.umask(0o022)
1584 os.makedirs(path, mode)
1585 self.assertRaises(OSError, os.makedirs, path, mode)
1586 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001587 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001588 os.makedirs(path, mode=mode, exist_ok=True)
1589 os.umask(old_mask)
1590
Martin Pantera82642f2015-11-19 04:48:44 +00001591 # Issue #25583: A drive root could raise PermissionError on Windows
1592 os.makedirs(os.path.abspath('/'), exist_ok=True)
1593
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001594 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001595 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001596 S_ISGID = stat.S_ISGID
1597 mode = 0o777
1598 old_mask = os.umask(0o022)
1599 try:
1600 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001601 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001602 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001603 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001604 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001605 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001606 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001607 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1608 # The os should apply S_ISGID from the parent dir for us, but
1609 # this test need not depend on that behavior. Be explicit.
1610 os.makedirs(path, mode | S_ISGID)
1611 # http://bugs.python.org/issue14992
1612 # Should not fail when the bit is already set.
1613 os.makedirs(path, mode, exist_ok=True)
1614 # remove the bit.
1615 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001616 # May work even when the bit is not already set when demanded.
1617 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001618 finally:
1619 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001620
1621 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001622 base = os_helper.TESTFN
1623 path = os.path.join(os_helper.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001624 with open(path, 'w') as f:
1625 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001626 self.assertRaises(OSError, os.makedirs, path)
1627 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1628 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1629 os.remove(path)
1630
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001631 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001632 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001633 'dir4', 'dir5', 'dir6')
1634 # If the tests failed, the bottom-most directory ('../dir6')
1635 # may not have been created, so we look for the outermost directory
1636 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001637 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001638 path = os.path.dirname(path)
1639
1640 os.removedirs(path)
1641
Andrew Svetlov405faed2012-12-25 12:18:09 +02001642
R David Murrayf2ad1732014-12-25 18:36:56 -05001643@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1644class ChownFileTests(unittest.TestCase):
1645
Berker Peksag036a71b2015-07-21 09:29:48 +03001646 @classmethod
1647 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001648 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001649
1650 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001651 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001652 uid = stat.st_uid
1653 gid = stat.st_gid
1654 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001655 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1656 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1657 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1658 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001659
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001660 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1661 def test_chown_gid(self):
1662 groups = os.getgroups()
1663 if len(groups) < 2:
1664 self.skipTest("test needs at least 2 groups")
1665
R David Murrayf2ad1732014-12-25 18:36:56 -05001666 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001667 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001668
Hai Shi0c4f0f32020-06-30 21:46:31 +08001669 os.chown(os_helper.TESTFN, uid, gid_1)
1670 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001671 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001672
Hai Shi0c4f0f32020-06-30 21:46:31 +08001673 os.chown(os_helper.TESTFN, uid, gid_2)
1674 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001675 self.assertEqual(gid, gid_2)
1676
1677 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1678 "test needs root privilege and more than one user")
1679 def test_chown_with_root(self):
1680 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001681 gid = os.stat(os_helper.TESTFN).st_gid
1682 os.chown(os_helper.TESTFN, uid_1, gid)
1683 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001684 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001685 os.chown(os_helper.TESTFN, uid_2, gid)
1686 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001687 self.assertEqual(uid, uid_2)
1688
1689 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1690 "test needs non-root account and more than one user")
1691 def test_chown_without_permission(self):
1692 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001693 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001694 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001695 os.chown(os_helper.TESTFN, uid_1, gid)
1696 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001697
Berker Peksag036a71b2015-07-21 09:29:48 +03001698 @classmethod
1699 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001700 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001701
1702
Andrew Svetlov405faed2012-12-25 12:18:09 +02001703class RemoveDirsTests(unittest.TestCase):
1704 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001705 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001706
1707 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001708 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001709
1710 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001711 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001712 os.mkdir(dira)
1713 dirb = os.path.join(dira, 'dirb')
1714 os.mkdir(dirb)
1715 os.removedirs(dirb)
1716 self.assertFalse(os.path.exists(dirb))
1717 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001718 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001719
1720 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001721 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001722 os.mkdir(dira)
1723 dirb = os.path.join(dira, 'dirb')
1724 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001725 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001726 os.removedirs(dirb)
1727 self.assertFalse(os.path.exists(dirb))
1728 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001729 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001730
1731 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001732 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001733 os.mkdir(dira)
1734 dirb = os.path.join(dira, 'dirb')
1735 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001736 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001737 with self.assertRaises(OSError):
1738 os.removedirs(dirb)
1739 self.assertTrue(os.path.exists(dirb))
1740 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001741 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001742
1743
Guido van Rossume7ba4952007-06-06 23:52:48 +00001744class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001745 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001746 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001747 f.write(b'hello')
1748 f.close()
1749 with open(os.devnull, 'rb') as f:
1750 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001751
Andrew Svetlov405faed2012-12-25 12:18:09 +02001752
Guido van Rossume7ba4952007-06-06 23:52:48 +00001753class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001754 def test_urandom_length(self):
1755 self.assertEqual(len(os.urandom(0)), 0)
1756 self.assertEqual(len(os.urandom(1)), 1)
1757 self.assertEqual(len(os.urandom(10)), 10)
1758 self.assertEqual(len(os.urandom(100)), 100)
1759 self.assertEqual(len(os.urandom(1000)), 1000)
1760
1761 def test_urandom_value(self):
1762 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001763 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001764 data2 = os.urandom(16)
1765 self.assertNotEqual(data1, data2)
1766
1767 def get_urandom_subprocess(self, count):
1768 code = '\n'.join((
1769 'import os, sys',
1770 'data = os.urandom(%s)' % count,
1771 'sys.stdout.buffer.write(data)',
1772 'sys.stdout.buffer.flush()'))
1773 out = assert_python_ok('-c', code)
1774 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001775 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001776 return stdout
1777
1778 def test_urandom_subprocess(self):
1779 data1 = self.get_urandom_subprocess(16)
1780 data2 = self.get_urandom_subprocess(16)
1781 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001782
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001783
Victor Stinner9b1f4742016-09-06 16:18:52 -07001784@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1785class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001786 @classmethod
1787 def setUpClass(cls):
1788 try:
1789 os.getrandom(1)
1790 except OSError as exc:
1791 if exc.errno == errno.ENOSYS:
1792 # Python compiled on a more recent Linux version
1793 # than the current Linux kernel
1794 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1795 else:
1796 raise
1797
Victor Stinner9b1f4742016-09-06 16:18:52 -07001798 def test_getrandom_type(self):
1799 data = os.getrandom(16)
1800 self.assertIsInstance(data, bytes)
1801 self.assertEqual(len(data), 16)
1802
1803 def test_getrandom0(self):
1804 empty = os.getrandom(0)
1805 self.assertEqual(empty, b'')
1806
1807 def test_getrandom_random(self):
1808 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1809
1810 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1811 # resource /dev/random
1812
1813 def test_getrandom_nonblock(self):
1814 # The call must not fail. Check also that the flag exists
1815 try:
1816 os.getrandom(1, os.GRND_NONBLOCK)
1817 except BlockingIOError:
1818 # System urandom is not initialized yet
1819 pass
1820
1821 def test_getrandom_value(self):
1822 data1 = os.getrandom(16)
1823 data2 = os.getrandom(16)
1824 self.assertNotEqual(data1, data2)
1825
1826
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001827# os.urandom() doesn't use a file descriptor when it is implemented with the
1828# getentropy() function, the getrandom() function or the getrandom() syscall
1829OS_URANDOM_DONT_USE_FD = (
1830 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1831 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1832 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001833
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001834@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1835 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001836@unittest.skipIf(sys.platform == "vxworks",
1837 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001838class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001839 @unittest.skipUnless(resource, "test requires the resource module")
1840 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001841 # Check urandom() failing when it is not able to open /dev/random.
1842 # We spawn a new process to make the test more robust (if getrlimit()
1843 # failed to restore the file descriptor limit after this, the whole
1844 # test suite would crash; this actually happened on the OS X Tiger
1845 # buildbot).
1846 code = """if 1:
1847 import errno
1848 import os
1849 import resource
1850
1851 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1852 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1853 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001854 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001855 except OSError as e:
1856 assert e.errno == errno.EMFILE, e.errno
1857 else:
1858 raise AssertionError("OSError not raised")
1859 """
1860 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001861
Antoine Pitroue472aea2014-04-26 14:33:03 +02001862 def test_urandom_fd_closed(self):
1863 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1864 # closed.
1865 code = """if 1:
1866 import os
1867 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001868 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001869 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001870 with test.support.SuppressCrashReport():
1871 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001872 sys.stdout.buffer.write(os.urandom(4))
1873 """
1874 rc, out, err = assert_python_ok('-Sc', code)
1875
1876 def test_urandom_fd_reopened(self):
1877 # Issue #21207: urandom() should detect its fd to /dev/urandom
1878 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001879 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1880 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001881
Antoine Pitroue472aea2014-04-26 14:33:03 +02001882 code = """if 1:
1883 import os
1884 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001885 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001886 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001887 with test.support.SuppressCrashReport():
1888 for fd in range(3, 256):
1889 try:
1890 os.close(fd)
1891 except OSError:
1892 pass
1893 else:
1894 # Found the urandom fd (XXX hopefully)
1895 break
1896 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001897 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001898 new_fd = f.fileno()
1899 # Issue #26935: posix allows new_fd and fd to be equal but
1900 # some libc implementations have dup2 return an error in this
1901 # case.
1902 if new_fd != fd:
1903 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001904 sys.stdout.buffer.write(os.urandom(4))
1905 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001906 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001907 rc, out, err = assert_python_ok('-Sc', code)
1908 self.assertEqual(len(out), 8)
1909 self.assertNotEqual(out[0:4], out[4:8])
1910 rc, out2, err2 = assert_python_ok('-Sc', code)
1911 self.assertEqual(len(out2), 8)
1912 self.assertNotEqual(out2, out)
1913
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001914
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001915@contextlib.contextmanager
1916def _execvpe_mockup(defpath=None):
1917 """
1918 Stubs out execv and execve functions when used as context manager.
1919 Records exec calls. The mock execv and execve functions always raise an
1920 exception as they would normally never return.
1921 """
1922 # A list of tuples containing (function name, first arg, args)
1923 # of calls to execv or execve that have been made.
1924 calls = []
1925
1926 def mock_execv(name, *args):
1927 calls.append(('execv', name, args))
1928 raise RuntimeError("execv called")
1929
1930 def mock_execve(name, *args):
1931 calls.append(('execve', name, args))
1932 raise OSError(errno.ENOTDIR, "execve called")
1933
1934 try:
1935 orig_execv = os.execv
1936 orig_execve = os.execve
1937 orig_defpath = os.defpath
1938 os.execv = mock_execv
1939 os.execve = mock_execve
1940 if defpath is not None:
1941 os.defpath = defpath
1942 yield calls
1943 finally:
1944 os.execv = orig_execv
1945 os.execve = orig_execve
1946 os.defpath = orig_defpath
1947
pxinwrf2d7ac72019-05-21 18:46:37 +08001948@unittest.skipUnless(hasattr(os, 'execv'),
1949 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001950class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001951 @unittest.skipIf(USING_LINUXTHREADS,
1952 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001953 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001954 self.assertRaises(OSError, os.execvpe, 'no such app-',
1955 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001956
Steve Dowerbce26262016-11-19 19:17:26 -08001957 def test_execv_with_bad_arglist(self):
1958 self.assertRaises(ValueError, os.execv, 'notepad', ())
1959 self.assertRaises(ValueError, os.execv, 'notepad', [])
1960 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1961 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1962
Thomas Heller6790d602007-08-30 17:15:14 +00001963 def test_execvpe_with_bad_arglist(self):
1964 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001965 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1966 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001967
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001968 @unittest.skipUnless(hasattr(os, '_execvpe'),
1969 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001970 def _test_internal_execvpe(self, test_type):
1971 program_path = os.sep + 'absolutepath'
1972 if test_type is bytes:
1973 program = b'executable'
1974 fullpath = os.path.join(os.fsencode(program_path), program)
1975 native_fullpath = fullpath
1976 arguments = [b'progname', 'arg1', 'arg2']
1977 else:
1978 program = 'executable'
1979 arguments = ['progname', 'arg1', 'arg2']
1980 fullpath = os.path.join(program_path, program)
1981 if os.name != "nt":
1982 native_fullpath = os.fsencode(fullpath)
1983 else:
1984 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001985 env = {'spam': 'beans'}
1986
Victor Stinnerb745a742010-05-18 17:17:23 +00001987 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001988 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001989 self.assertRaises(RuntimeError,
1990 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001991 self.assertEqual(len(calls), 1)
1992 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1993
Victor Stinnerb745a742010-05-18 17:17:23 +00001994 # test os._execvpe() with a relative path:
1995 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001996 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001997 self.assertRaises(OSError,
1998 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001999 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00002000 self.assertSequenceEqual(calls[0],
2001 ('execve', native_fullpath, (arguments, env)))
2002
2003 # test os._execvpe() with a relative path:
2004 # os.get_exec_path() reads the 'PATH' variable
2005 with _execvpe_mockup() as calls:
2006 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00002007 if test_type is bytes:
2008 env_path[b'PATH'] = program_path
2009 else:
2010 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00002011 self.assertRaises(OSError,
2012 os._execvpe, program, arguments, env=env_path)
2013 self.assertEqual(len(calls), 1)
2014 self.assertSequenceEqual(calls[0],
2015 ('execve', native_fullpath, (arguments, env_path)))
2016
2017 def test_internal_execvpe_str(self):
2018 self._test_internal_execvpe(str)
2019 if os.name != "nt":
2020 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002021
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002022 def test_execve_invalid_env(self):
2023 args = [sys.executable, '-c', 'pass']
2024
Ville Skyttä49b27342017-08-03 09:00:59 +03002025 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002026 newenv = os.environ.copy()
2027 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2028 with self.assertRaises(ValueError):
2029 os.execve(args[0], args, newenv)
2030
Ville Skyttä49b27342017-08-03 09:00:59 +03002031 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002032 newenv = os.environ.copy()
2033 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2034 with self.assertRaises(ValueError):
2035 os.execve(args[0], args, newenv)
2036
Ville Skyttä49b27342017-08-03 09:00:59 +03002037 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002038 newenv = os.environ.copy()
2039 newenv["FRUIT=ORANGE"] = "lemon"
2040 with self.assertRaises(ValueError):
2041 os.execve(args[0], args, newenv)
2042
Alexey Izbyshev83460312018-10-20 03:28:22 +03002043 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2044 def test_execve_with_empty_path(self):
2045 # bpo-32890: Check GetLastError() misuse
2046 try:
2047 os.execve('', ['arg'], {})
2048 except OSError as e:
2049 self.assertTrue(e.winerror is None or e.winerror != 0)
2050 else:
2051 self.fail('No OSError raised')
2052
Gregory P. Smith4ae37772010-05-08 18:05:46 +00002053
Serhiy Storchaka43767632013-11-03 21:31:38 +02002054@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002055class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01002056 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01002057 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002058 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01002059 except FileNotFoundError:
2060 exists = False
2061 except OSError as exc:
2062 exists = True
2063 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08002064 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01002065 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002066 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01002067
Thomas Wouters477c8d52006-05-27 19:21:47 +00002068 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002069 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002070
2071 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002072 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002073
2074 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002075 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002076
2077 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002078 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002079
Hai Shi0c4f0f32020-06-30 21:46:31 +08002080 with open(os_helper.TESTFN, "x") as f:
2081 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002082
2083 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002084 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002085
Thomas Wouters477c8d52006-05-27 19:21:47 +00002086 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002087 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002088
Victor Stinnere77c9742016-03-25 10:28:23 +01002089
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002090class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00002091 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002092 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2093 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07002094 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002095 def get_single(f):
2096 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00002097 if hasattr(os, f):
2098 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002099 return helper
2100 for f in singles:
2101 locals()["test_"+f] = get_single(f)
2102
Benjamin Peterson7522c742009-01-19 21:00:09 +00002103 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002104 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002105 f(os_helper.make_bad_fd(), *args)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002106 except OSError as e:
2107 self.assertEqual(e.errno, errno.EBADF)
2108 else:
Martin Panter7462b6492015-11-02 03:37:02 +00002109 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002110 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00002111
Serhiy Storchaka43767632013-11-03 21:31:38 +02002112 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002113 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002114 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002115
Serhiy Storchaka43767632013-11-03 21:31:38 +02002116 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002117 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002118 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002119 # Make sure none of the descriptors we are about to close are
2120 # currently valid (issue 6542).
2121 for i in range(10):
2122 try: os.fstat(fd+i)
2123 except OSError:
2124 pass
2125 else:
2126 break
2127 if i < 2:
2128 raise unittest.SkipTest(
2129 "Unable to acquire a range of invalid file descriptors")
2130 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002131
Serhiy Storchaka43767632013-11-03 21:31:38 +02002132 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002133 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002134 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002135
Serhiy Storchaka43767632013-11-03 21:31:38 +02002136 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002137 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002138 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002139
Serhiy Storchaka43767632013-11-03 21:31:38 +02002140 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002141 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002142 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002143
Serhiy Storchaka43767632013-11-03 21:31:38 +02002144 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002145 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002146 self.check(os.pathconf, "PC_NAME_MAX")
2147 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002148
Serhiy Storchaka43767632013-11-03 21:31:38 +02002149 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002150 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002151 self.check(os.truncate, 0)
2152 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002153
Serhiy Storchaka43767632013-11-03 21:31:38 +02002154 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002155 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002156 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002157
Serhiy Storchaka43767632013-11-03 21:31:38 +02002158 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002159 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002160 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002161
Victor Stinner57ddf782014-01-08 15:21:28 +01002162 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2163 def test_readv(self):
2164 buf = bytearray(10)
2165 self.check(os.readv, [buf])
2166
Serhiy Storchaka43767632013-11-03 21:31:38 +02002167 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002168 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002169 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002170
Serhiy Storchaka43767632013-11-03 21:31:38 +02002171 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002172 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002173 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002174
Victor Stinner57ddf782014-01-08 15:21:28 +01002175 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2176 def test_writev(self):
2177 self.check(os.writev, [b'abc'])
2178
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002179 def test_inheritable(self):
2180 self.check(os.get_inheritable)
2181 self.check(os.set_inheritable, True)
2182
2183 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2184 'needs os.get_blocking() and os.set_blocking()')
2185 def test_blocking(self):
2186 self.check(os.get_blocking)
2187 self.check(os.set_blocking, True)
2188
Brian Curtin1b9df392010-11-24 20:24:31 +00002189
2190class LinkTests(unittest.TestCase):
2191 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002192 self.file1 = os_helper.TESTFN
2193 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002194
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002195 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002196 for file in (self.file1, self.file2):
2197 if os.path.exists(file):
2198 os.unlink(file)
2199
Brian Curtin1b9df392010-11-24 20:24:31 +00002200 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002201 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002202
xdegaye6a55d092017-11-12 17:57:04 +01002203 try:
2204 os.link(file1, file2)
2205 except PermissionError as e:
2206 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002207 with open(file1, "r") as f1, open(file2, "r") as f2:
2208 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2209
2210 def test_link(self):
2211 self._test_link(self.file1, self.file2)
2212
2213 def test_link_bytes(self):
2214 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2215 bytes(self.file2, sys.getfilesystemencoding()))
2216
Brian Curtinf498b752010-11-30 15:54:04 +00002217 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002218 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002219 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002220 except UnicodeError:
2221 raise unittest.SkipTest("Unable to encode for this platform.")
2222
Brian Curtinf498b752010-11-30 15:54:04 +00002223 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002224 self.file2 = self.file1 + "2"
2225 self._test_link(self.file1, self.file2)
2226
Serhiy Storchaka43767632013-11-03 21:31:38 +02002227@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2228class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002229 # uid_t and gid_t are 32-bit unsigned integers on Linux
2230 UID_OVERFLOW = (1 << 32)
2231 GID_OVERFLOW = (1 << 32)
2232
Serhiy Storchaka43767632013-11-03 21:31:38 +02002233 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2234 def test_setuid(self):
2235 if os.getuid() != 0:
2236 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002237 self.assertRaises(TypeError, os.setuid, 'not an int')
2238 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002239
Serhiy Storchaka43767632013-11-03 21:31:38 +02002240 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2241 def test_setgid(self):
2242 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2243 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002244 self.assertRaises(TypeError, os.setgid, 'not an int')
2245 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002246
Serhiy Storchaka43767632013-11-03 21:31:38 +02002247 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2248 def test_seteuid(self):
2249 if os.getuid() != 0:
2250 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002251 self.assertRaises(TypeError, os.setegid, 'not an int')
2252 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002253
Serhiy Storchaka43767632013-11-03 21:31:38 +02002254 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2255 def test_setegid(self):
2256 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2257 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002258 self.assertRaises(TypeError, os.setegid, 'not an int')
2259 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002260
Serhiy Storchaka43767632013-11-03 21:31:38 +02002261 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2262 def test_setreuid(self):
2263 if os.getuid() != 0:
2264 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002265 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2266 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2267 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2268 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002269
Serhiy Storchaka43767632013-11-03 21:31:38 +02002270 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2271 def test_setreuid_neg1(self):
2272 # Needs to accept -1. We run this in a subprocess to avoid
2273 # altering the test runner's process state (issue8045).
2274 subprocess.check_call([
2275 sys.executable, '-c',
2276 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002277
Serhiy Storchaka43767632013-11-03 21:31:38 +02002278 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2279 def test_setregid(self):
2280 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2281 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002282 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2283 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2284 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2285 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002286
Serhiy Storchaka43767632013-11-03 21:31:38 +02002287 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2288 def test_setregid_neg1(self):
2289 # Needs to accept -1. We run this in a subprocess to avoid
2290 # altering the test runner's process state (issue8045).
2291 subprocess.check_call([
2292 sys.executable, '-c',
2293 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002294
Serhiy Storchaka43767632013-11-03 21:31:38 +02002295@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2296class Pep383Tests(unittest.TestCase):
2297 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002298 if os_helper.TESTFN_UNENCODABLE:
2299 self.dir = os_helper.TESTFN_UNENCODABLE
2300 elif os_helper.TESTFN_NONASCII:
2301 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002302 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002303 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002304 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002305
Serhiy Storchaka43767632013-11-03 21:31:38 +02002306 bytesfn = []
2307 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002308 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002309 fn = os.fsencode(fn)
2310 except UnicodeEncodeError:
2311 return
2312 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002313 add_filename(os_helper.TESTFN_UNICODE)
2314 if os_helper.TESTFN_UNENCODABLE:
2315 add_filename(os_helper.TESTFN_UNENCODABLE)
2316 if os_helper.TESTFN_NONASCII:
2317 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002318 if not bytesfn:
2319 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002320
Serhiy Storchaka43767632013-11-03 21:31:38 +02002321 self.unicodefn = set()
2322 os.mkdir(self.dir)
2323 try:
2324 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002325 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002326 fn = os.fsdecode(fn)
2327 if fn in self.unicodefn:
2328 raise ValueError("duplicate filename")
2329 self.unicodefn.add(fn)
2330 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002331 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002332 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002333
Serhiy Storchaka43767632013-11-03 21:31:38 +02002334 def tearDown(self):
2335 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002336
Serhiy Storchaka43767632013-11-03 21:31:38 +02002337 def test_listdir(self):
2338 expected = self.unicodefn
2339 found = set(os.listdir(self.dir))
2340 self.assertEqual(found, expected)
2341 # test listdir without arguments
2342 current_directory = os.getcwd()
2343 try:
2344 os.chdir(os.sep)
2345 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2346 finally:
2347 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002348
Serhiy Storchaka43767632013-11-03 21:31:38 +02002349 def test_open(self):
2350 for fn in self.unicodefn:
2351 f = open(os.path.join(self.dir, fn), 'rb')
2352 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002353
Serhiy Storchaka43767632013-11-03 21:31:38 +02002354 @unittest.skipUnless(hasattr(os, 'statvfs'),
2355 "need os.statvfs()")
2356 def test_statvfs(self):
2357 # issue #9645
2358 for fn in self.unicodefn:
2359 # should not fail with file not found error
2360 fullname = os.path.join(self.dir, fn)
2361 os.statvfs(fullname)
2362
2363 def test_stat(self):
2364 for fn in self.unicodefn:
2365 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002366
Brian Curtineb24d742010-04-12 17:16:38 +00002367@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2368class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002369 def _kill(self, sig):
2370 # Start sys.executable as a subprocess and communicate from the
2371 # subprocess to the parent that the interpreter is ready. When it
2372 # becomes ready, send *sig* via os.kill to the subprocess and check
2373 # that the return code is equal to *sig*.
2374 import ctypes
2375 from ctypes import wintypes
2376 import msvcrt
2377
2378 # Since we can't access the contents of the process' stdout until the
2379 # process has exited, use PeekNamedPipe to see what's inside stdout
2380 # without waiting. This is done so we can tell that the interpreter
2381 # is started and running at a point where it could handle a signal.
2382 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2383 PeekNamedPipe.restype = wintypes.BOOL
2384 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2385 ctypes.POINTER(ctypes.c_char), # stdout buf
2386 wintypes.DWORD, # Buffer size
2387 ctypes.POINTER(wintypes.DWORD), # bytes read
2388 ctypes.POINTER(wintypes.DWORD), # bytes avail
2389 ctypes.POINTER(wintypes.DWORD)) # bytes left
2390 msg = "running"
2391 proc = subprocess.Popen([sys.executable, "-c",
2392 "import sys;"
2393 "sys.stdout.write('{}');"
2394 "sys.stdout.flush();"
2395 "input()".format(msg)],
2396 stdout=subprocess.PIPE,
2397 stderr=subprocess.PIPE,
2398 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002399 self.addCleanup(proc.stdout.close)
2400 self.addCleanup(proc.stderr.close)
2401 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002402
2403 count, max = 0, 100
2404 while count < max and proc.poll() is None:
2405 # Create a string buffer to store the result of stdout from the pipe
2406 buf = ctypes.create_string_buffer(len(msg))
2407 # Obtain the text currently in proc.stdout
2408 # Bytes read/avail/left are left as NULL and unused
2409 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2410 buf, ctypes.sizeof(buf), None, None, None)
2411 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2412 if buf.value:
2413 self.assertEqual(msg, buf.value.decode())
2414 break
2415 time.sleep(0.1)
2416 count += 1
2417 else:
2418 self.fail("Did not receive communication from the subprocess")
2419
Brian Curtineb24d742010-04-12 17:16:38 +00002420 os.kill(proc.pid, sig)
2421 self.assertEqual(proc.wait(), sig)
2422
2423 def test_kill_sigterm(self):
2424 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002425 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002426
2427 def test_kill_int(self):
2428 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002429 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002430
2431 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002432 tagname = "test_os_%s" % uuid.uuid1()
2433 m = mmap.mmap(-1, 1, tagname)
2434 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002435 # Run a script which has console control handling enabled.
2436 proc = subprocess.Popen([sys.executable,
2437 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002438 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002439 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2440 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002441 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002442 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002443 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002444 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002445 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002446 count += 1
2447 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002448 # Forcefully kill the process if we weren't able to signal it.
2449 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002450 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002451 os.kill(proc.pid, event)
2452 # proc.send_signal(event) could also be done here.
2453 # Allow time for the signal to be passed and the process to exit.
2454 time.sleep(0.5)
2455 if not proc.poll():
2456 # Forcefully kill the process if we weren't able to signal it.
2457 os.kill(proc.pid, signal.SIGINT)
2458 self.fail("subprocess did not stop on {}".format(name))
2459
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002460 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002461 def test_CTRL_C_EVENT(self):
2462 from ctypes import wintypes
2463 import ctypes
2464
2465 # Make a NULL value by creating a pointer with no argument.
2466 NULL = ctypes.POINTER(ctypes.c_int)()
2467 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2468 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2469 wintypes.BOOL)
2470 SetConsoleCtrlHandler.restype = wintypes.BOOL
2471
2472 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002473 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002474 # by subprocesses.
2475 SetConsoleCtrlHandler(NULL, 0)
2476
2477 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2478
2479 def test_CTRL_BREAK_EVENT(self):
2480 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2481
2482
Brian Curtind40e6f72010-07-08 21:39:08 +00002483@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002484class Win32ListdirTests(unittest.TestCase):
2485 """Test listdir on Windows."""
2486
2487 def setUp(self):
2488 self.created_paths = []
2489 for i in range(2):
2490 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002491 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002492 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002493 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002494 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002495 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002496 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2497 self.created_paths.extend([dir_name, file_name])
2498 self.created_paths.sort()
2499
2500 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002501 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002502
2503 def test_listdir_no_extended_path(self):
2504 """Test when the path is not an "extended" path."""
2505 # unicode
2506 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002507 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002508 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002509
Tim Golden781bbeb2013-10-25 20:24:06 +01002510 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002511 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002512 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002513 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002514
2515 def test_listdir_extended_path(self):
2516 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002517 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002518 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002519 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002520 self.assertEqual(
2521 sorted(os.listdir(path)),
2522 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002523
Tim Golden781bbeb2013-10-25 20:24:06 +01002524 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002525 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002526 self.assertEqual(
2527 sorted(os.listdir(path)),
2528 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002529
2530
Berker Peksage0b5b202018-08-15 13:03:41 +03002531@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2532class ReadlinkTests(unittest.TestCase):
2533 filelink = 'readlinktest'
2534 filelink_target = os.path.abspath(__file__)
2535 filelinkb = os.fsencode(filelink)
2536 filelinkb_target = os.fsencode(filelink_target)
2537
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002538 def assertPathEqual(self, left, right):
2539 left = os.path.normcase(left)
2540 right = os.path.normcase(right)
2541 if sys.platform == 'win32':
2542 # Bad practice to blindly strip the prefix as it may be required to
2543 # correctly refer to the file, but we're only comparing paths here.
2544 has_prefix = lambda p: p.startswith(
2545 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2546 if has_prefix(left):
2547 left = left[4:]
2548 if has_prefix(right):
2549 right = right[4:]
2550 self.assertEqual(left, right)
2551
Berker Peksage0b5b202018-08-15 13:03:41 +03002552 def setUp(self):
2553 self.assertTrue(os.path.exists(self.filelink_target))
2554 self.assertTrue(os.path.exists(self.filelinkb_target))
2555 self.assertFalse(os.path.exists(self.filelink))
2556 self.assertFalse(os.path.exists(self.filelinkb))
2557
2558 def test_not_symlink(self):
2559 filelink_target = FakePath(self.filelink_target)
2560 self.assertRaises(OSError, os.readlink, self.filelink_target)
2561 self.assertRaises(OSError, os.readlink, filelink_target)
2562
2563 def test_missing_link(self):
2564 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2565 self.assertRaises(FileNotFoundError, os.readlink,
2566 FakePath('missing-link'))
2567
Hai Shi0c4f0f32020-06-30 21:46:31 +08002568 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002569 def test_pathlike(self):
2570 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002571 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002572 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002573 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002574
Hai Shi0c4f0f32020-06-30 21:46:31 +08002575 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002576 def test_pathlike_bytes(self):
2577 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002578 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002579 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002580 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002581 self.assertIsInstance(path, bytes)
2582
Hai Shi0c4f0f32020-06-30 21:46:31 +08002583 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002584 def test_bytes(self):
2585 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002586 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002587 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002588 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002589 self.assertIsInstance(path, bytes)
2590
2591
Tim Golden781bbeb2013-10-25 20:24:06 +01002592@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002593@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002594class Win32SymlinkTests(unittest.TestCase):
2595 filelink = 'filelinktest'
2596 filelink_target = os.path.abspath(__file__)
2597 dirlink = 'dirlinktest'
2598 dirlink_target = os.path.dirname(filelink_target)
2599 missing_link = 'missing link'
2600
2601 def setUp(self):
2602 assert os.path.exists(self.dirlink_target)
2603 assert os.path.exists(self.filelink_target)
2604 assert not os.path.exists(self.dirlink)
2605 assert not os.path.exists(self.filelink)
2606 assert not os.path.exists(self.missing_link)
2607
2608 def tearDown(self):
2609 if os.path.exists(self.filelink):
2610 os.remove(self.filelink)
2611 if os.path.exists(self.dirlink):
2612 os.rmdir(self.dirlink)
2613 if os.path.lexists(self.missing_link):
2614 os.remove(self.missing_link)
2615
2616 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002617 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002618 self.assertTrue(os.path.exists(self.dirlink))
2619 self.assertTrue(os.path.isdir(self.dirlink))
2620 self.assertTrue(os.path.islink(self.dirlink))
2621 self.check_stat(self.dirlink, self.dirlink_target)
2622
2623 def test_file_link(self):
2624 os.symlink(self.filelink_target, self.filelink)
2625 self.assertTrue(os.path.exists(self.filelink))
2626 self.assertTrue(os.path.isfile(self.filelink))
2627 self.assertTrue(os.path.islink(self.filelink))
2628 self.check_stat(self.filelink, self.filelink_target)
2629
2630 def _create_missing_dir_link(self):
2631 'Create a "directory" link to a non-existent target'
2632 linkname = self.missing_link
2633 if os.path.lexists(linkname):
2634 os.remove(linkname)
2635 target = r'c:\\target does not exist.29r3c740'
2636 assert not os.path.exists(target)
2637 target_is_dir = True
2638 os.symlink(target, linkname, target_is_dir)
2639
2640 def test_remove_directory_link_to_missing_target(self):
2641 self._create_missing_dir_link()
2642 # For compatibility with Unix, os.remove will check the
2643 # directory status and call RemoveDirectory if the symlink
2644 # was created with target_is_dir==True.
2645 os.remove(self.missing_link)
2646
Brian Curtind40e6f72010-07-08 21:39:08 +00002647 def test_isdir_on_directory_link_to_missing_target(self):
2648 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002649 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002650
Brian Curtind40e6f72010-07-08 21:39:08 +00002651 def test_rmdir_on_directory_link_to_missing_target(self):
2652 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002653 os.rmdir(self.missing_link)
2654
2655 def check_stat(self, link, target):
2656 self.assertEqual(os.stat(link), os.stat(target))
2657 self.assertNotEqual(os.lstat(link), os.stat(link))
2658
Brian Curtind25aef52011-06-13 15:16:04 -05002659 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002660 self.assertEqual(os.stat(bytes_link), os.stat(target))
2661 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002662
2663 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002664 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002665 level2 = os.path.join(level1, "level2")
2666 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002667 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002668
2669 os.mkdir(level1)
2670 os.mkdir(level2)
2671 os.mkdir(level3)
2672
2673 file1 = os.path.abspath(os.path.join(level1, "file1"))
2674 create_file(file1)
2675
2676 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002677 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002678 os.chdir(level2)
2679 link = os.path.join(level2, "link")
2680 os.symlink(os.path.relpath(file1), "link")
2681 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002682
Victor Stinnerae39d232016-03-24 17:12:55 +01002683 # Check os.stat calls from the same dir as the link
2684 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002685
Victor Stinnerae39d232016-03-24 17:12:55 +01002686 # Check os.stat calls from a dir below the link
2687 os.chdir(level1)
2688 self.assertEqual(os.stat(file1),
2689 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002690
Victor Stinnerae39d232016-03-24 17:12:55 +01002691 # Check os.stat calls from a dir above the link
2692 os.chdir(level3)
2693 self.assertEqual(os.stat(file1),
2694 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002695 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002696 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002697
SSE43c34aad2018-02-13 00:10:35 +07002698 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2699 and os.path.exists(r'C:\ProgramData'),
2700 'Test directories not found')
2701 def test_29248(self):
2702 # os.symlink() calls CreateSymbolicLink, which creates
2703 # the reparse data buffer with the print name stored
2704 # first, so the offset is always 0. CreateSymbolicLink
2705 # stores the "PrintName" DOS path (e.g. "C:\") first,
2706 # with an offset of 0, followed by the "SubstituteName"
2707 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2708 # the other hand, seems to have been created manually
2709 # with an inverted order.
2710 target = os.readlink(r'C:\Users\All Users')
2711 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2712
Steve Dower6921e732018-03-05 14:26:08 -08002713 def test_buffer_overflow(self):
2714 # Older versions would have a buffer overflow when detecting
2715 # whether a link source was a directory. This test ensures we
2716 # no longer crash, but does not otherwise validate the behavior
2717 segment = 'X' * 27
2718 path = os.path.join(*[segment] * 10)
2719 test_cases = [
2720 # overflow with absolute src
2721 ('\\' + path, segment),
2722 # overflow dest with relative src
2723 (segment, path),
2724 # overflow when joining src
2725 (path[:180], path[:180]),
2726 ]
2727 for src, dest in test_cases:
2728 try:
2729 os.symlink(src, dest)
2730 except FileNotFoundError:
2731 pass
2732 else:
2733 try:
2734 os.remove(dest)
2735 except OSError:
2736 pass
2737 # Also test with bytes, since that is a separate code path.
2738 try:
2739 os.symlink(os.fsencode(src), os.fsencode(dest))
2740 except FileNotFoundError:
2741 pass
2742 else:
2743 try:
2744 os.remove(dest)
2745 except OSError:
2746 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002747
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002748 def test_appexeclink(self):
2749 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002750 if not os.path.isdir(root):
2751 self.skipTest("test requires a WindowsApps directory")
2752
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002753 aliases = [os.path.join(root, a)
2754 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2755
2756 for alias in aliases:
2757 if support.verbose:
2758 print()
2759 print("Testing with", alias)
2760 st = os.lstat(alias)
2761 self.assertEqual(st, os.stat(alias))
2762 self.assertFalse(stat.S_ISLNK(st.st_mode))
2763 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2764 # testing the first one we see is sufficient
2765 break
2766 else:
2767 self.skipTest("test requires an app execution alias")
2768
Tim Golden0321cf22014-05-05 19:46:17 +01002769@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2770class Win32JunctionTests(unittest.TestCase):
2771 junction = 'junctiontest'
2772 junction_target = os.path.dirname(os.path.abspath(__file__))
2773
2774 def setUp(self):
2775 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002776 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002777
2778 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002779 if os.path.lexists(self.junction):
2780 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002781
2782 def test_create_junction(self):
2783 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002784 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002785 self.assertTrue(os.path.exists(self.junction))
2786 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002787 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2788 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002789
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002790 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002791 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002792 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2793 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002794
2795 def test_unlink_removes_junction(self):
2796 _winapi.CreateJunction(self.junction_target, self.junction)
2797 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002798 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002799
2800 os.unlink(self.junction)
2801 self.assertFalse(os.path.exists(self.junction))
2802
Mark Becwarb82bfac2019-02-02 16:08:23 -05002803@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2804class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002805 def test_getfinalpathname_handles(self):
Hai Shid94af3f2020-08-08 17:32:41 +08002806 nt = import_helper.import_module('nt')
2807 ctypes = import_helper.import_module('ctypes')
Berker Peksag6ef726a2019-04-22 18:46:28 +03002808 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002809
2810 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2811 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2812
2813 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2814 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2815 ctypes.wintypes.LPDWORD)
2816
2817 # This is a pseudo-handle that doesn't need to be closed
2818 hproc = kernel.GetCurrentProcess()
2819
2820 handle_count = ctypes.wintypes.DWORD()
2821 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2822 self.assertEqual(1, ok)
2823
2824 before_count = handle_count.value
2825
2826 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002827 filenames = [
2828 r'\\?\C:',
2829 r'\\?\NUL',
2830 r'\\?\CONIN',
2831 __file__,
2832 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002833
Berker Peksag6ef726a2019-04-22 18:46:28 +03002834 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002835 for name in filenames:
2836 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002837 nt._getfinalpathname(name)
2838 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002839 # Failure is expected
2840 pass
2841 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002842 os.stat(name)
2843 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002844 pass
2845
2846 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2847 self.assertEqual(1, ok)
2848
2849 handle_delta = handle_count.value - before_count
2850
2851 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002852
Hai Shi0c4f0f32020-06-30 21:46:31 +08002853@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002854class NonLocalSymlinkTests(unittest.TestCase):
2855
2856 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002857 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002858 Create this structure:
2859
2860 base
2861 \___ some_dir
2862 """
2863 os.makedirs('base/some_dir')
2864
2865 def tearDown(self):
2866 shutil.rmtree('base')
2867
2868 def test_directory_link_nonlocal(self):
2869 """
2870 The symlink target should resolve relative to the link, not relative
2871 to the current directory.
2872
2873 Then, link base/some_link -> base/some_dir and ensure that some_link
2874 is resolved as a directory.
2875
2876 In issue13772, it was discovered that directory detection failed if
2877 the symlink target was not specified relative to the current
2878 directory, which was a defect in the implementation.
2879 """
2880 src = os.path.join('base', 'some_link')
2881 os.symlink('some_dir', src)
2882 assert os.path.isdir(src)
2883
2884
Victor Stinnere8d51452010-08-19 01:05:19 +00002885class FSEncodingTests(unittest.TestCase):
2886 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002887 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2888 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002889
Victor Stinnere8d51452010-08-19 01:05:19 +00002890 def test_identity(self):
2891 # assert fsdecode(fsencode(x)) == x
2892 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2893 try:
2894 bytesfn = os.fsencode(fn)
2895 except UnicodeEncodeError:
2896 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002897 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002898
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002899
Brett Cannonefb00c02012-02-29 18:31:31 -05002900
2901class DeviceEncodingTests(unittest.TestCase):
2902
2903 def test_bad_fd(self):
2904 # Return None when an fd doesn't actually exist.
2905 self.assertIsNone(os.device_encoding(123456))
2906
Paul Monson62dfd7d2019-04-25 11:36:45 -07002907 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002908 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002909 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002910 def test_device_encoding(self):
2911 encoding = os.device_encoding(0)
2912 self.assertIsNotNone(encoding)
2913 self.assertTrue(codecs.lookup(encoding))
2914
2915
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002916class PidTests(unittest.TestCase):
2917 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2918 def test_getppid(self):
2919 p = subprocess.Popen([sys.executable, '-c',
2920 'import os; print(os.getppid())'],
2921 stdout=subprocess.PIPE)
2922 stdout, _ = p.communicate()
2923 # We are the parent of our subprocess
2924 self.assertEqual(int(stdout), os.getpid())
2925
Victor Stinner9bee32b2020-04-22 16:30:35 +02002926 def check_waitpid(self, code, exitcode, callback=None):
2927 if sys.platform == 'win32':
2928 # On Windows, os.spawnv() simply joins arguments with spaces:
2929 # arguments need to be quoted
2930 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2931 else:
2932 args = [sys.executable, '-c', code]
2933 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002934
Victor Stinner9bee32b2020-04-22 16:30:35 +02002935 if callback is not None:
2936 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002937
Victor Stinner9bee32b2020-04-22 16:30:35 +02002938 # don't use support.wait_process() to test directly os.waitpid()
2939 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002940 pid2, status = os.waitpid(pid, 0)
2941 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2942 self.assertEqual(pid2, pid)
2943
Victor Stinner9bee32b2020-04-22 16:30:35 +02002944 def test_waitpid(self):
2945 self.check_waitpid(code='pass', exitcode=0)
2946
2947 def test_waitstatus_to_exitcode(self):
2948 exitcode = 23
2949 code = f'import sys; sys.exit({exitcode})'
2950 self.check_waitpid(code, exitcode=exitcode)
2951
2952 with self.assertRaises(TypeError):
2953 os.waitstatus_to_exitcode(0.0)
2954
2955 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2956 def test_waitpid_windows(self):
2957 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2958 # with exit code larger than INT_MAX.
2959 STATUS_CONTROL_C_EXIT = 0xC000013A
2960 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2961 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2962
2963 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2964 def test_waitstatus_to_exitcode_windows(self):
2965 max_exitcode = 2 ** 32 - 1
2966 for exitcode in (0, 1, 5, max_exitcode):
2967 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2968 exitcode)
2969
2970 # invalid values
2971 with self.assertRaises(ValueError):
2972 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2973 with self.assertRaises(OverflowError):
2974 os.waitstatus_to_exitcode(-1)
2975
Victor Stinner65a796e2020-04-01 18:49:29 +02002976 # Skip the test on Windows
2977 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2978 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002979 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002980 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002981
Victor Stinner9bee32b2020-04-22 16:30:35 +02002982 def kill_process(pid):
2983 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002984
Victor Stinner9bee32b2020-04-22 16:30:35 +02002985 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002986
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002987
Victor Stinner4659ccf2016-09-14 10:57:00 +02002988class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002989 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002990 self.exitcode = 17
2991
Hai Shi0c4f0f32020-06-30 21:46:31 +08002992 filename = os_helper.TESTFN
2993 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002994
2995 if not with_env:
2996 code = 'import sys; sys.exit(%s)' % self.exitcode
2997 else:
2998 self.env = dict(os.environ)
2999 # create an unique key
3000 self.key = str(uuid.uuid4())
3001 self.env[self.key] = self.key
3002 # read the variable from os.environ to check that it exists
3003 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
3004 % (self.key, self.exitcode))
3005
3006 with open(filename, "w") as fp:
3007 fp.write(code)
3008
Berker Peksag81816462016-09-15 20:19:47 +03003009 args = [sys.executable, filename]
3010 if use_bytes:
3011 args = [os.fsencode(a) for a in args]
3012 self.env = {os.fsencode(k): os.fsencode(v)
3013 for k, v in self.env.items()}
3014
3015 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02003016
Berker Peksag4af23d72016-09-15 20:32:44 +03003017 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003018 def test_spawnl(self):
3019 args = self.create_args()
3020 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
3021 self.assertEqual(exitcode, self.exitcode)
3022
Berker Peksag4af23d72016-09-15 20:32:44 +03003023 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003024 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003025 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003026 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
3027 self.assertEqual(exitcode, self.exitcode)
3028
Berker Peksag4af23d72016-09-15 20:32:44 +03003029 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003030 def test_spawnlp(self):
3031 args = self.create_args()
3032 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
3033 self.assertEqual(exitcode, self.exitcode)
3034
Berker Peksag4af23d72016-09-15 20:32:44 +03003035 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003036 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003037 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003038 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
3039 self.assertEqual(exitcode, self.exitcode)
3040
Berker Peksag4af23d72016-09-15 20:32:44 +03003041 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003042 def test_spawnv(self):
3043 args = self.create_args()
3044 exitcode = os.spawnv(os.P_WAIT, args[0], args)
3045 self.assertEqual(exitcode, self.exitcode)
3046
Victor Stinner9bee32b2020-04-22 16:30:35 +02003047 # Test for PyUnicode_FSConverter()
3048 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
3049 self.assertEqual(exitcode, self.exitcode)
3050
Berker Peksag4af23d72016-09-15 20:32:44 +03003051 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003052 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003053 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003054 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3055 self.assertEqual(exitcode, self.exitcode)
3056
Berker Peksag4af23d72016-09-15 20:32:44 +03003057 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003058 def test_spawnvp(self):
3059 args = self.create_args()
3060 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
3061 self.assertEqual(exitcode, self.exitcode)
3062
Berker Peksag4af23d72016-09-15 20:32:44 +03003063 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003064 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003065 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003066 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
3067 self.assertEqual(exitcode, self.exitcode)
3068
Berker Peksag4af23d72016-09-15 20:32:44 +03003069 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003070 def test_nowait(self):
3071 args = self.create_args()
3072 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02003073 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003074
Berker Peksag4af23d72016-09-15 20:32:44 +03003075 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03003076 def test_spawnve_bytes(self):
3077 # Test bytes handling in parse_arglist and parse_envlist (#28114)
3078 args = self.create_args(with_env=True, use_bytes=True)
3079 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3080 self.assertEqual(exitcode, self.exitcode)
3081
Steve Dower859fd7b2016-11-19 18:53:19 -08003082 @requires_os_func('spawnl')
3083 def test_spawnl_noargs(self):
3084 args = self.create_args()
3085 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08003086 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08003087
3088 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08003089 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003090 args = self.create_args()
3091 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003092 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08003093
3094 @requires_os_func('spawnv')
3095 def test_spawnv_noargs(self):
3096 args = self.create_args()
3097 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
3098 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08003099 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
3100 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08003101
3102 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08003103 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003104 args = self.create_args()
3105 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
3106 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003107 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
3108 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02003109
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003110 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03003111 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003112
Ville Skyttä49b27342017-08-03 09:00:59 +03003113 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003114 newenv = os.environ.copy()
3115 newenv["FRUIT\0VEGETABLE"] = "cabbage"
3116 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003117 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003118 except ValueError:
3119 pass
3120 else:
3121 self.assertEqual(exitcode, 127)
3122
Ville Skyttä49b27342017-08-03 09:00:59 +03003123 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003124 newenv = os.environ.copy()
3125 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3126 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003127 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003128 except ValueError:
3129 pass
3130 else:
3131 self.assertEqual(exitcode, 127)
3132
Ville Skyttä49b27342017-08-03 09:00:59 +03003133 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003134 newenv = os.environ.copy()
3135 newenv["FRUIT=ORANGE"] = "lemon"
3136 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003137 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003138 except ValueError:
3139 pass
3140 else:
3141 self.assertEqual(exitcode, 127)
3142
Ville Skyttä49b27342017-08-03 09:00:59 +03003143 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003144 filename = os_helper.TESTFN
3145 self.addCleanup(os_helper.unlink, filename)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003146 with open(filename, "w") as fp:
3147 fp.write('import sys, os\n'
3148 'if os.getenv("FRUIT") != "orange=lemon":\n'
3149 ' raise AssertionError')
3150 args = [sys.executable, filename]
3151 newenv = os.environ.copy()
3152 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003153 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003154 self.assertEqual(exitcode, 0)
3155
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003156 @requires_os_func('spawnve')
3157 def test_spawnve_invalid_env(self):
3158 self._test_invalid_env(os.spawnve)
3159
3160 @requires_os_func('spawnvpe')
3161 def test_spawnvpe_invalid_env(self):
3162 self._test_invalid_env(os.spawnvpe)
3163
Serhiy Storchaka77703942017-06-25 07:33:01 +03003164
Brian Curtin0151b8e2010-09-24 13:43:43 +00003165# The introduction of this TestCase caused at least two different errors on
3166# *nix buildbots. Temporarily skip this to let the buildbots move along.
3167@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003168@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3169class LoginTests(unittest.TestCase):
3170 def test_getlogin(self):
3171 user_name = os.getlogin()
3172 self.assertNotEqual(len(user_name), 0)
3173
3174
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003175@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3176 "needs os.getpriority and os.setpriority")
3177class ProgramPriorityTests(unittest.TestCase):
3178 """Tests for os.getpriority() and os.setpriority()."""
3179
3180 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003181
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003182 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3183 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3184 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003185 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3186 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003187 raise unittest.SkipTest("unable to reliably test setpriority "
3188 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003189 else:
3190 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003191 finally:
3192 try:
3193 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3194 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003195 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003196 raise
3197
3198
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003201 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003202
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003203 def __init__(self, conn):
3204 asynchat.async_chat.__init__(self, conn)
3205 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003206 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003207 self.closed = False
3208 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003209
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003210 def handle_read(self):
3211 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003212 if self.accumulate:
3213 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003214
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003215 def get_data(self):
3216 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003217
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003218 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003219 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003220 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003221
3222 def handle_error(self):
3223 raise
3224
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003225 def __init__(self, address):
3226 threading.Thread.__init__(self)
3227 asyncore.dispatcher.__init__(self)
3228 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3229 self.bind(address)
3230 self.listen(5)
3231 self.host, self.port = self.socket.getsockname()[:2]
3232 self.handler_instance = None
3233 self._active = False
3234 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003235
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003236 # --- public API
3237
3238 @property
3239 def running(self):
3240 return self._active
3241
3242 def start(self):
3243 assert not self.running
3244 self.__flag = threading.Event()
3245 threading.Thread.start(self)
3246 self.__flag.wait()
3247
3248 def stop(self):
3249 assert self.running
3250 self._active = False
3251 self.join()
3252
3253 def wait(self):
3254 # wait for handler connection to be closed, then stop the server
3255 while not getattr(self.handler_instance, "closed", False):
3256 time.sleep(0.001)
3257 self.stop()
3258
3259 # --- internals
3260
3261 def run(self):
3262 self._active = True
3263 self.__flag.set()
3264 while self._active and asyncore.socket_map:
3265 self._active_lock.acquire()
3266 asyncore.loop(timeout=0.001, count=1)
3267 self._active_lock.release()
3268 asyncore.close_all()
3269
3270 def handle_accept(self):
3271 conn, addr = self.accept()
3272 self.handler_instance = self.Handler(conn)
3273
3274 def handle_connect(self):
3275 self.close()
3276 handle_read = handle_connect
3277
3278 def writable(self):
3279 return 0
3280
3281 def handle_error(self):
3282 raise
3283
3284
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003285@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3286class TestSendfile(unittest.TestCase):
3287
Victor Stinner8c663fd2017-11-08 14:44:44 -08003288 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003289 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003290 not sys.platform.startswith("solaris") and \
3291 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003292 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3293 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003294 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3295 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003296
3297 @classmethod
3298 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003299 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003300 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003301
3302 @classmethod
3303 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003304 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003305 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003306
3307 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003308 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003309 self.server.start()
3310 self.client = socket.socket()
3311 self.client.connect((self.server.host, self.server.port))
3312 self.client.settimeout(1)
3313 # synchronize by waiting for "220 ready" response
3314 self.client.recv(1024)
3315 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003316 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003317 self.fileno = self.file.fileno()
3318
3319 def tearDown(self):
3320 self.file.close()
3321 self.client.close()
3322 if self.server.running:
3323 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003324 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003325
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003326 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003327 """A higher level wrapper representing how an application is
3328 supposed to use sendfile().
3329 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003330 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003331 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003332 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003333 except OSError as err:
3334 if err.errno == errno.ECONNRESET:
3335 # disconnected
3336 raise
3337 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3338 # we have to retry send data
3339 continue
3340 else:
3341 raise
3342
3343 def test_send_whole_file(self):
3344 # normal send
3345 total_sent = 0
3346 offset = 0
3347 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003348 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003349 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3350 if sent == 0:
3351 break
3352 offset += sent
3353 total_sent += sent
3354 self.assertTrue(sent <= nbytes)
3355 self.assertEqual(offset, total_sent)
3356
3357 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003358 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003359 self.client.close()
3360 self.server.wait()
3361 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003362 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003363 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003364
3365 def test_send_at_certain_offset(self):
3366 # start sending a file at a certain offset
3367 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003368 offset = len(self.DATA) // 2
3369 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003370 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003371 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003372 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3373 if sent == 0:
3374 break
3375 offset += sent
3376 total_sent += sent
3377 self.assertTrue(sent <= nbytes)
3378
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003379 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003380 self.client.close()
3381 self.server.wait()
3382 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003383 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003384 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003385 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003386 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003387
3388 def test_offset_overflow(self):
3389 # specify an offset > file size
3390 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003391 try:
3392 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3393 except OSError as e:
3394 # Solaris can raise EINVAL if offset >= file length, ignore.
3395 if e.errno != errno.EINVAL:
3396 raise
3397 else:
3398 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003399 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003400 self.client.close()
3401 self.server.wait()
3402 data = self.server.handler_instance.get_data()
3403 self.assertEqual(data, b'')
3404
3405 def test_invalid_offset(self):
3406 with self.assertRaises(OSError) as cm:
3407 os.sendfile(self.sockno, self.fileno, -1, 4096)
3408 self.assertEqual(cm.exception.errno, errno.EINVAL)
3409
Martin Panterbf19d162015-09-09 01:01:13 +00003410 def test_keywords(self):
3411 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003412 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3413 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003414 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003415 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3416 offset=0, count=4096,
3417 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003418
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003419 # --- headers / trailers tests
3420
Serhiy Storchaka43767632013-11-03 21:31:38 +02003421 @requires_headers_trailers
3422 def test_headers(self):
3423 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003424 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003425 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003426 headers=[b"x" * 512, b"y" * 256])
3427 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003428 total_sent += sent
3429 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003430 while total_sent < len(expected_data):
3431 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003432 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3433 offset, nbytes)
3434 if sent == 0:
3435 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003436 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003437 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003438 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003439
Serhiy Storchaka43767632013-11-03 21:31:38 +02003440 self.assertEqual(total_sent, len(expected_data))
3441 self.client.close()
3442 self.server.wait()
3443 data = self.server.handler_instance.get_data()
3444 self.assertEqual(hash(data), hash(expected_data))
3445
3446 @requires_headers_trailers
3447 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003448 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003449 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003450
Hai Shi0c4f0f32020-06-30 21:46:31 +08003451 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003452 create_file(TESTFN2, file_data)
3453
3454 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003455 os.sendfile(self.sockno, f.fileno(), 0, 5,
3456 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003457 self.client.close()
3458 self.server.wait()
3459 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003460 self.assertEqual(data, b"abcde123456789")
3461
3462 @requires_headers_trailers
3463 @requires_32b
3464 def test_headers_overflow_32bits(self):
3465 self.server.handler_instance.accumulate = False
3466 with self.assertRaises(OSError) as cm:
3467 os.sendfile(self.sockno, self.fileno, 0, 0,
3468 headers=[b"x" * 2**16] * 2**15)
3469 self.assertEqual(cm.exception.errno, errno.EINVAL)
3470
3471 @requires_headers_trailers
3472 @requires_32b
3473 def test_trailers_overflow_32bits(self):
3474 self.server.handler_instance.accumulate = False
3475 with self.assertRaises(OSError) as cm:
3476 os.sendfile(self.sockno, self.fileno, 0, 0,
3477 trailers=[b"x" * 2**16] * 2**15)
3478 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003479
Serhiy Storchaka43767632013-11-03 21:31:38 +02003480 @requires_headers_trailers
3481 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3482 'test needs os.SF_NODISKIO')
3483 def test_flags(self):
3484 try:
3485 os.sendfile(self.sockno, self.fileno, 0, 4096,
3486 flags=os.SF_NODISKIO)
3487 except OSError as err:
3488 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3489 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003490
3491
Larry Hastings9cf065c2012-06-22 16:30:09 -07003492def supports_extended_attributes():
3493 if not hasattr(os, "setxattr"):
3494 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003495
Larry Hastings9cf065c2012-06-22 16:30:09 -07003496 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003497 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003498 try:
3499 os.setxattr(fp.fileno(), b"user.test", b"")
3500 except OSError:
3501 return False
3502 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003503 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003504
3505 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003506
3507
3508@unittest.skipUnless(supports_extended_attributes(),
3509 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003510# Kernels < 2.6.39 don't respect setxattr flags.
3511@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003512class ExtendedAttributeTests(unittest.TestCase):
3513
Larry Hastings9cf065c2012-06-22 16:30:09 -07003514 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003515 fn = os_helper.TESTFN
3516 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003517 create_file(fn)
3518
Benjamin Peterson799bd802011-08-31 22:15:17 -04003519 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003520 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003521 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003522
Victor Stinnerf12e5062011-10-16 22:12:03 +02003523 init_xattr = listxattr(fn)
3524 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003525
Larry Hastings9cf065c2012-06-22 16:30:09 -07003526 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003527 xattr = set(init_xattr)
3528 xattr.add("user.test")
3529 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003530 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3531 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3532 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003533
Benjamin Peterson799bd802011-08-31 22:15:17 -04003534 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003535 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003536 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003537
Benjamin Peterson799bd802011-08-31 22:15:17 -04003538 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003539 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003540 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003541
Larry Hastings9cf065c2012-06-22 16:30:09 -07003542 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003543 xattr.add("user.test2")
3544 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003545 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003546
Benjamin Peterson799bd802011-08-31 22:15:17 -04003547 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003548 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003549 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003550
Victor Stinnerf12e5062011-10-16 22:12:03 +02003551 xattr.remove("user.test")
3552 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003553 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3554 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3555 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3556 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003557 many = sorted("user.test{}".format(i) for i in range(100))
3558 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003559 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003560 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003561
Larry Hastings9cf065c2012-06-22 16:30:09 -07003562 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003563 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003564 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003565
3566 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003567 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003568
3569 def test_simple(self):
3570 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3571 os.listxattr)
3572
3573 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003574 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3575 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003576
3577 def test_fds(self):
3578 def getxattr(path, *args):
3579 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003580 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003581 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003582 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003583 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003584 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003585 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003586 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003587 def listxattr(path, *args):
3588 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003589 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003590 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3591
3592
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003593@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3594class TermsizeTests(unittest.TestCase):
3595 def test_does_not_crash(self):
3596 """Check if get_terminal_size() returns a meaningful value.
3597
3598 There's no easy portable way to actually check the size of the
3599 terminal, so let's check if it returns something sensible instead.
3600 """
3601 try:
3602 size = os.get_terminal_size()
3603 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003604 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003605 # Under win32 a generic OSError can be thrown if the
3606 # handle cannot be retrieved
3607 self.skipTest("failed to query terminal size")
3608 raise
3609
Antoine Pitroucfade362012-02-08 23:48:59 +01003610 self.assertGreaterEqual(size.columns, 0)
3611 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003612
3613 def test_stty_match(self):
3614 """Check if stty returns the same results
3615
3616 stty actually tests stdin, so get_terminal_size is invoked on
3617 stdin explicitly. If stty succeeded, then get_terminal_size()
3618 should work too.
3619 """
3620 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003621 size = (
3622 subprocess.check_output(
3623 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3624 ).split()
3625 )
xdegaye6a55d092017-11-12 17:57:04 +01003626 except (FileNotFoundError, subprocess.CalledProcessError,
3627 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003628 self.skipTest("stty invocation failed")
3629 expected = (int(size[1]), int(size[0])) # reversed order
3630
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003631 try:
3632 actual = os.get_terminal_size(sys.__stdin__.fileno())
3633 except OSError as e:
3634 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3635 # Under win32 a generic OSError can be thrown if the
3636 # handle cannot be retrieved
3637 self.skipTest("failed to query terminal size")
3638 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003639 self.assertEqual(expected, actual)
3640
3641
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003642@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003643@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003644class MemfdCreateTests(unittest.TestCase):
3645 def test_memfd_create(self):
3646 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3647 self.assertNotEqual(fd, -1)
3648 self.addCleanup(os.close, fd)
3649 self.assertFalse(os.get_inheritable(fd))
3650 with open(fd, "wb", closefd=False) as f:
3651 f.write(b'memfd_create')
3652 self.assertEqual(f.tell(), 12)
3653
3654 fd2 = os.memfd_create("Hi")
3655 self.addCleanup(os.close, fd2)
3656 self.assertFalse(os.get_inheritable(fd2))
3657
3658
Christian Heimescd9fed62020-11-13 19:48:52 +01003659@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3660@support.requires_linux_version(2, 6, 30)
3661class EventfdTests(unittest.TestCase):
3662 def test_eventfd_initval(self):
3663 def pack(value):
3664 """Pack as native uint64_t
3665 """
3666 return struct.pack("@Q", value)
3667 size = 8 # read/write 8 bytes
3668 initval = 42
3669 fd = os.eventfd(initval)
3670 self.assertNotEqual(fd, -1)
3671 self.addCleanup(os.close, fd)
3672 self.assertFalse(os.get_inheritable(fd))
3673
3674 # test with raw read/write
3675 res = os.read(fd, size)
3676 self.assertEqual(res, pack(initval))
3677
3678 os.write(fd, pack(23))
3679 res = os.read(fd, size)
3680 self.assertEqual(res, pack(23))
3681
3682 os.write(fd, pack(40))
3683 os.write(fd, pack(2))
3684 res = os.read(fd, size)
3685 self.assertEqual(res, pack(42))
3686
3687 # test with eventfd_read/eventfd_write
3688 os.eventfd_write(fd, 20)
3689 os.eventfd_write(fd, 3)
3690 res = os.eventfd_read(fd)
3691 self.assertEqual(res, 23)
3692
3693 def test_eventfd_semaphore(self):
3694 initval = 2
3695 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3696 fd = os.eventfd(initval, flags)
3697 self.assertNotEqual(fd, -1)
3698 self.addCleanup(os.close, fd)
3699
3700 # semaphore starts has initval 2, two reads return '1'
3701 res = os.eventfd_read(fd)
3702 self.assertEqual(res, 1)
3703 res = os.eventfd_read(fd)
3704 self.assertEqual(res, 1)
3705 # third read would block
3706 with self.assertRaises(BlockingIOError):
3707 os.eventfd_read(fd)
3708 with self.assertRaises(BlockingIOError):
3709 os.read(fd, 8)
3710
3711 # increase semaphore counter, read one
3712 os.eventfd_write(fd, 1)
3713 res = os.eventfd_read(fd)
3714 self.assertEqual(res, 1)
3715 # next read would block, too
3716 with self.assertRaises(BlockingIOError):
3717 os.eventfd_read(fd)
3718
3719 def test_eventfd_select(self):
3720 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3721 fd = os.eventfd(0, flags)
3722 self.assertNotEqual(fd, -1)
3723 self.addCleanup(os.close, fd)
3724
3725 # counter is zero, only writeable
3726 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3727 self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3728
3729 # counter is non-zero, read and writeable
3730 os.eventfd_write(fd, 23)
3731 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3732 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3733 self.assertEqual(os.eventfd_read(fd), 23)
3734
3735 # counter at max, only readable
3736 os.eventfd_write(fd, (2**64) - 2)
3737 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3738 self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3739 os.eventfd_read(fd)
3740
3741
Victor Stinner292c8352012-10-30 02:17:38 +01003742class OSErrorTests(unittest.TestCase):
3743 def setUp(self):
3744 class Str(str):
3745 pass
3746
Victor Stinnerafe17062012-10-31 22:47:43 +01003747 self.bytes_filenames = []
3748 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003749 if os_helper.TESTFN_UNENCODABLE is not None:
3750 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003751 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003752 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003753 self.unicode_filenames.append(decoded)
3754 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003755 if os_helper.TESTFN_UNDECODABLE is not None:
3756 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003757 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003758 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003759 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003760 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003761 self.bytes_filenames.append(memoryview(encoded))
3762
3763 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003764
3765 def test_oserror_filename(self):
3766 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003767 (self.filenames, os.chdir,),
3768 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003769 (self.filenames, os.lstat,),
3770 (self.filenames, os.open, os.O_RDONLY),
3771 (self.filenames, os.rmdir,),
3772 (self.filenames, os.stat,),
3773 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003774 ]
3775 if sys.platform == "win32":
3776 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003777 (self.bytes_filenames, os.rename, b"dst"),
3778 (self.bytes_filenames, os.replace, b"dst"),
3779 (self.unicode_filenames, os.rename, "dst"),
3780 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003781 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003782 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003783 else:
3784 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003785 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003786 (self.filenames, os.rename, "dst"),
3787 (self.filenames, os.replace, "dst"),
3788 ))
3789 if hasattr(os, "chown"):
3790 funcs.append((self.filenames, os.chown, 0, 0))
3791 if hasattr(os, "lchown"):
3792 funcs.append((self.filenames, os.lchown, 0, 0))
3793 if hasattr(os, "truncate"):
3794 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003795 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003796 funcs.append((self.filenames, os.chflags, 0))
3797 if hasattr(os, "lchflags"):
3798 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003799 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003800 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003801 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003802 if sys.platform == "win32":
3803 funcs.append((self.bytes_filenames, os.link, b"dst"))
3804 funcs.append((self.unicode_filenames, os.link, "dst"))
3805 else:
3806 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003807 if hasattr(os, "listxattr"):
3808 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003809 (self.filenames, os.listxattr,),
3810 (self.filenames, os.getxattr, "user.test"),
3811 (self.filenames, os.setxattr, "user.test", b'user'),
3812 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003813 ))
3814 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003815 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003816 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003817 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003818
Steve Dowercc16be82016-09-08 10:35:16 -07003819
Victor Stinnerafe17062012-10-31 22:47:43 +01003820 for filenames, func, *func_args in funcs:
3821 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003822 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003823 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003824 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003825 else:
3826 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3827 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003828 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003829 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003830 except UnicodeDecodeError:
3831 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003832 else:
3833 self.fail("No exception thrown by {}".format(func))
3834
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003835class CPUCountTests(unittest.TestCase):
3836 def test_cpu_count(self):
3837 cpus = os.cpu_count()
3838 if cpus is not None:
3839 self.assertIsInstance(cpus, int)
3840 self.assertGreater(cpus, 0)
3841 else:
3842 self.skipTest("Could not determine the number of CPUs")
3843
Victor Stinnerdaf45552013-08-28 00:53:59 +02003844
3845class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003846 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003847 fd = os.open(__file__, os.O_RDONLY)
3848 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003849 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003850
Victor Stinnerdaf45552013-08-28 00:53:59 +02003851 os.set_inheritable(fd, True)
3852 self.assertEqual(os.get_inheritable(fd), True)
3853
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003854 @unittest.skipIf(fcntl is None, "need fcntl")
3855 def test_get_inheritable_cloexec(self):
3856 fd = os.open(__file__, os.O_RDONLY)
3857 self.addCleanup(os.close, fd)
3858 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003859
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003860 # clear FD_CLOEXEC flag
3861 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3862 flags &= ~fcntl.FD_CLOEXEC
3863 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003864
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003865 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003866
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003867 @unittest.skipIf(fcntl is None, "need fcntl")
3868 def test_set_inheritable_cloexec(self):
3869 fd = os.open(__file__, os.O_RDONLY)
3870 self.addCleanup(os.close, fd)
3871 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3872 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003873
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003874 os.set_inheritable(fd, True)
3875 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3876 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003877
Victor Stinnerdaf45552013-08-28 00:53:59 +02003878 def test_open(self):
3879 fd = os.open(__file__, os.O_RDONLY)
3880 self.addCleanup(os.close, fd)
3881 self.assertEqual(os.get_inheritable(fd), False)
3882
3883 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3884 def test_pipe(self):
3885 rfd, wfd = os.pipe()
3886 self.addCleanup(os.close, rfd)
3887 self.addCleanup(os.close, wfd)
3888 self.assertEqual(os.get_inheritable(rfd), False)
3889 self.assertEqual(os.get_inheritable(wfd), False)
3890
3891 def test_dup(self):
3892 fd1 = os.open(__file__, os.O_RDONLY)
3893 self.addCleanup(os.close, fd1)
3894
3895 fd2 = os.dup(fd1)
3896 self.addCleanup(os.close, fd2)
3897 self.assertEqual(os.get_inheritable(fd2), False)
3898
Zackery Spytz5be66602019-08-23 12:38:41 -06003899 def test_dup_standard_stream(self):
3900 fd = os.dup(1)
3901 self.addCleanup(os.close, fd)
3902 self.assertGreater(fd, 0)
3903
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003904 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3905 def test_dup_nul(self):
3906 # os.dup() was creating inheritable fds for character files.
3907 fd1 = os.open('NUL', os.O_RDONLY)
3908 self.addCleanup(os.close, fd1)
3909 fd2 = os.dup(fd1)
3910 self.addCleanup(os.close, fd2)
3911 self.assertFalse(os.get_inheritable(fd2))
3912
Victor Stinnerdaf45552013-08-28 00:53:59 +02003913 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3914 def test_dup2(self):
3915 fd = os.open(__file__, os.O_RDONLY)
3916 self.addCleanup(os.close, fd)
3917
3918 # inheritable by default
3919 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003920 self.addCleanup(os.close, fd2)
3921 self.assertEqual(os.dup2(fd, fd2), fd2)
3922 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003923
3924 # force non-inheritable
3925 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003926 self.addCleanup(os.close, fd3)
3927 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3928 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003929
3930 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3931 def test_openpty(self):
3932 master_fd, slave_fd = os.openpty()
3933 self.addCleanup(os.close, master_fd)
3934 self.addCleanup(os.close, slave_fd)
3935 self.assertEqual(os.get_inheritable(master_fd), False)
3936 self.assertEqual(os.get_inheritable(slave_fd), False)
3937
3938
Brett Cannon3f9183b2016-08-26 14:44:48 -07003939class PathTConverterTests(unittest.TestCase):
3940 # tuples of (function name, allows fd arguments, additional arguments to
3941 # function, cleanup function)
3942 functions = [
3943 ('stat', True, (), None),
3944 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003945 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003946 ('chflags', False, (0,), None),
3947 ('lchflags', False, (0,), None),
3948 ('open', False, (0,), getattr(os, 'close', None)),
3949 ]
3950
3951 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003952 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003953 if os.name == 'nt':
3954 bytes_fspath = bytes_filename = None
3955 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003956 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003957 bytes_fspath = FakePath(bytes_filename)
3958 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003959 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003960 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003961
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003962 int_fspath = FakePath(fd)
3963 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003964
3965 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3966 with self.subTest(name=name):
3967 try:
3968 fn = getattr(os, name)
3969 except AttributeError:
3970 continue
3971
Brett Cannon8f96a302016-08-26 19:30:11 -07003972 for path in (str_filename, bytes_filename, str_fspath,
3973 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003974 if path is None:
3975 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003976 with self.subTest(name=name, path=path):
3977 result = fn(path, *extra_args)
3978 if cleanup_fn is not None:
3979 cleanup_fn(result)
3980
3981 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003982 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003983 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003984
3985 if allow_fd:
3986 result = fn(fd, *extra_args) # should not fail
3987 if cleanup_fn is not None:
3988 cleanup_fn(result)
3989 else:
3990 with self.assertRaisesRegex(
3991 TypeError,
3992 'os.PathLike'):
3993 fn(fd, *extra_args)
3994
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003995 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003996 msg = r'__fspath__\(\) to return str or bytes, not %s'
3997 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003998 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003999 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004000 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004001 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004002 os.stat(FakePath(object()))
4003
Brett Cannon3f9183b2016-08-26 14:44:48 -07004004
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004005@unittest.skipUnless(hasattr(os, 'get_blocking'),
4006 'needs os.get_blocking() and os.set_blocking()')
4007class BlockingTests(unittest.TestCase):
4008 def test_blocking(self):
4009 fd = os.open(__file__, os.O_RDONLY)
4010 self.addCleanup(os.close, fd)
4011 self.assertEqual(os.get_blocking(fd), True)
4012
4013 os.set_blocking(fd, False)
4014 self.assertEqual(os.get_blocking(fd), False)
4015
4016 os.set_blocking(fd, True)
4017 self.assertEqual(os.get_blocking(fd), True)
4018
4019
Yury Selivanov97e2e062014-09-26 12:33:06 -04004020
4021class ExportsTests(unittest.TestCase):
4022 def test_os_all(self):
4023 self.assertIn('open', os.__all__)
4024 self.assertIn('walk', os.__all__)
4025
4026
Eddie Elizondob3966632019-11-05 07:16:14 -08004027class TestDirEntry(unittest.TestCase):
4028 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004029 self.path = os.path.realpath(os_helper.TESTFN)
4030 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08004031 os.mkdir(self.path)
4032
4033 def test_uninstantiable(self):
4034 self.assertRaises(TypeError, os.DirEntry)
4035
4036 def test_unpickable(self):
4037 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4038 entry = [entry for entry in os.scandir(self.path)].pop()
4039 self.assertIsInstance(entry, os.DirEntry)
4040 self.assertEqual(entry.name, "file.txt")
4041 import pickle
4042 self.assertRaises(TypeError, pickle.dumps, entry, filename)
4043
4044
Victor Stinner6036e442015-03-08 01:58:04 +01004045class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004046 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004047
Victor Stinner6036e442015-03-08 01:58:04 +01004048 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004049 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07004050 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08004051 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01004052 os.mkdir(self.path)
4053
4054 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07004055 path = self.bytes_path if isinstance(name, bytes) else self.path
4056 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01004057 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01004058 return filename
4059
4060 def get_entries(self, names):
4061 entries = dict((entry.name, entry)
4062 for entry in os.scandir(self.path))
4063 self.assertEqual(sorted(entries.keys()), names)
4064 return entries
4065
4066 def assert_stat_equal(self, stat1, stat2, skip_fields):
4067 if skip_fields:
4068 for attr in dir(stat1):
4069 if not attr.startswith("st_"):
4070 continue
4071 if attr in ("st_dev", "st_ino", "st_nlink"):
4072 continue
4073 self.assertEqual(getattr(stat1, attr),
4074 getattr(stat2, attr),
4075 (stat1, stat2, attr))
4076 else:
4077 self.assertEqual(stat1, stat2)
4078
Eddie Elizondob3966632019-11-05 07:16:14 -08004079 def test_uninstantiable(self):
4080 scandir_iter = os.scandir(self.path)
4081 self.assertRaises(TypeError, type(scandir_iter))
4082 scandir_iter.close()
4083
4084 def test_unpickable(self):
4085 filename = self.create_file("file.txt")
4086 scandir_iter = os.scandir(self.path)
4087 import pickle
4088 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4089 scandir_iter.close()
4090
Victor Stinner6036e442015-03-08 01:58:04 +01004091 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07004092 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01004093 self.assertEqual(entry.name, name)
4094 self.assertEqual(entry.path, os.path.join(self.path, name))
4095 self.assertEqual(entry.inode(),
4096 os.stat(entry.path, follow_symlinks=False).st_ino)
4097
4098 entry_stat = os.stat(entry.path)
4099 self.assertEqual(entry.is_dir(),
4100 stat.S_ISDIR(entry_stat.st_mode))
4101 self.assertEqual(entry.is_file(),
4102 stat.S_ISREG(entry_stat.st_mode))
4103 self.assertEqual(entry.is_symlink(),
4104 os.path.islink(entry.path))
4105
4106 entry_lstat = os.stat(entry.path, follow_symlinks=False)
4107 self.assertEqual(entry.is_dir(follow_symlinks=False),
4108 stat.S_ISDIR(entry_lstat.st_mode))
4109 self.assertEqual(entry.is_file(follow_symlinks=False),
4110 stat.S_ISREG(entry_lstat.st_mode))
4111
4112 self.assert_stat_equal(entry.stat(),
4113 entry_stat,
4114 os.name == 'nt' and not is_symlink)
4115 self.assert_stat_equal(entry.stat(follow_symlinks=False),
4116 entry_lstat,
4117 os.name == 'nt')
4118
4119 def test_attributes(self):
4120 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08004121 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01004122
4123 dirname = os.path.join(self.path, "dir")
4124 os.mkdir(dirname)
4125 filename = self.create_file("file.txt")
4126 if link:
xdegaye6a55d092017-11-12 17:57:04 +01004127 try:
4128 os.link(filename, os.path.join(self.path, "link_file.txt"))
4129 except PermissionError as e:
4130 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01004131 if symlink:
4132 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4133 target_is_directory=True)
4134 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4135
4136 names = ['dir', 'file.txt']
4137 if link:
4138 names.append('link_file.txt')
4139 if symlink:
4140 names.extend(('symlink_dir', 'symlink_file.txt'))
4141 entries = self.get_entries(names)
4142
4143 entry = entries['dir']
4144 self.check_entry(entry, 'dir', True, False, False)
4145
4146 entry = entries['file.txt']
4147 self.check_entry(entry, 'file.txt', False, True, False)
4148
4149 if link:
4150 entry = entries['link_file.txt']
4151 self.check_entry(entry, 'link_file.txt', False, True, False)
4152
4153 if symlink:
4154 entry = entries['symlink_dir']
4155 self.check_entry(entry, 'symlink_dir', True, False, True)
4156
4157 entry = entries['symlink_file.txt']
4158 self.check_entry(entry, 'symlink_file.txt', False, True, True)
4159
4160 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07004161 path = self.bytes_path if isinstance(name, bytes) else self.path
4162 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01004163 self.assertEqual(len(entries), 1)
4164
4165 entry = entries[0]
4166 self.assertEqual(entry.name, name)
4167 return entry
4168
Brett Cannon96881cd2016-06-10 14:37:21 -07004169 def create_file_entry(self, name='file.txt'):
4170 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01004171 return self.get_entry(os.path.basename(filename))
4172
4173 def test_current_directory(self):
4174 filename = self.create_file()
4175 old_dir = os.getcwd()
4176 try:
4177 os.chdir(self.path)
4178
4179 # call scandir() without parameter: it must list the content
4180 # of the current directory
4181 entries = dict((entry.name, entry) for entry in os.scandir())
4182 self.assertEqual(sorted(entries.keys()),
4183 [os.path.basename(filename)])
4184 finally:
4185 os.chdir(old_dir)
4186
4187 def test_repr(self):
4188 entry = self.create_file_entry()
4189 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4190
Brett Cannon96881cd2016-06-10 14:37:21 -07004191 def test_fspath_protocol(self):
4192 entry = self.create_file_entry()
4193 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4194
4195 def test_fspath_protocol_bytes(self):
4196 bytes_filename = os.fsencode('bytesfile.txt')
4197 bytes_entry = self.create_file_entry(name=bytes_filename)
4198 fspath = os.fspath(bytes_entry)
4199 self.assertIsInstance(fspath, bytes)
4200 self.assertEqual(fspath,
4201 os.path.join(os.fsencode(self.path),bytes_filename))
4202
Victor Stinner6036e442015-03-08 01:58:04 +01004203 def test_removed_dir(self):
4204 path = os.path.join(self.path, 'dir')
4205
4206 os.mkdir(path)
4207 entry = self.get_entry('dir')
4208 os.rmdir(path)
4209
4210 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4211 if os.name == 'nt':
4212 self.assertTrue(entry.is_dir())
4213 self.assertFalse(entry.is_file())
4214 self.assertFalse(entry.is_symlink())
4215 if os.name == 'nt':
4216 self.assertRaises(FileNotFoundError, entry.inode)
4217 # don't fail
4218 entry.stat()
4219 entry.stat(follow_symlinks=False)
4220 else:
4221 self.assertGreater(entry.inode(), 0)
4222 self.assertRaises(FileNotFoundError, entry.stat)
4223 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4224
4225 def test_removed_file(self):
4226 entry = self.create_file_entry()
4227 os.unlink(entry.path)
4228
4229 self.assertFalse(entry.is_dir())
4230 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4231 if os.name == 'nt':
4232 self.assertTrue(entry.is_file())
4233 self.assertFalse(entry.is_symlink())
4234 if os.name == 'nt':
4235 self.assertRaises(FileNotFoundError, entry.inode)
4236 # don't fail
4237 entry.stat()
4238 entry.stat(follow_symlinks=False)
4239 else:
4240 self.assertGreater(entry.inode(), 0)
4241 self.assertRaises(FileNotFoundError, entry.stat)
4242 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4243
4244 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004245 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004246 return self.skipTest('cannot create symbolic link')
4247
4248 filename = self.create_file("file.txt")
4249 os.symlink(filename,
4250 os.path.join(self.path, "symlink.txt"))
4251 entries = self.get_entries(['file.txt', 'symlink.txt'])
4252 entry = entries['symlink.txt']
4253 os.unlink(filename)
4254
4255 self.assertGreater(entry.inode(), 0)
4256 self.assertFalse(entry.is_dir())
4257 self.assertFalse(entry.is_file()) # broken symlink returns False
4258 self.assertFalse(entry.is_dir(follow_symlinks=False))
4259 self.assertFalse(entry.is_file(follow_symlinks=False))
4260 self.assertTrue(entry.is_symlink())
4261 self.assertRaises(FileNotFoundError, entry.stat)
4262 # don't fail
4263 entry.stat(follow_symlinks=False)
4264
4265 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004266 self.create_file("file.txt")
4267
4268 path_bytes = os.fsencode(self.path)
4269 entries = list(os.scandir(path_bytes))
4270 self.assertEqual(len(entries), 1, entries)
4271 entry = entries[0]
4272
4273 self.assertEqual(entry.name, b'file.txt')
4274 self.assertEqual(entry.path,
4275 os.fsencode(os.path.join(self.path, 'file.txt')))
4276
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004277 def test_bytes_like(self):
4278 self.create_file("file.txt")
4279
4280 for cls in bytearray, memoryview:
4281 path_bytes = cls(os.fsencode(self.path))
4282 with self.assertWarns(DeprecationWarning):
4283 entries = list(os.scandir(path_bytes))
4284 self.assertEqual(len(entries), 1, entries)
4285 entry = entries[0]
4286
4287 self.assertEqual(entry.name, b'file.txt')
4288 self.assertEqual(entry.path,
4289 os.fsencode(os.path.join(self.path, 'file.txt')))
4290 self.assertIs(type(entry.name), bytes)
4291 self.assertIs(type(entry.path), bytes)
4292
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004293 @unittest.skipUnless(os.listdir in os.supports_fd,
4294 'fd support for listdir required for this test.')
4295 def test_fd(self):
4296 self.assertIn(os.scandir, os.supports_fd)
4297 self.create_file('file.txt')
4298 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004299 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004300 os.symlink('file.txt', os.path.join(self.path, 'link'))
4301 expected_names.append('link')
4302
4303 fd = os.open(self.path, os.O_RDONLY)
4304 try:
4305 with os.scandir(fd) as it:
4306 entries = list(it)
4307 names = [entry.name for entry in entries]
4308 self.assertEqual(sorted(names), expected_names)
4309 self.assertEqual(names, os.listdir(fd))
4310 for entry in entries:
4311 self.assertEqual(entry.path, entry.name)
4312 self.assertEqual(os.fspath(entry), entry.name)
4313 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4314 if os.stat in os.supports_dir_fd:
4315 st = os.stat(entry.name, dir_fd=fd)
4316 self.assertEqual(entry.stat(), st)
4317 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4318 self.assertEqual(entry.stat(follow_symlinks=False), st)
4319 finally:
4320 os.close(fd)
4321
Victor Stinner6036e442015-03-08 01:58:04 +01004322 def test_empty_path(self):
4323 self.assertRaises(FileNotFoundError, os.scandir, '')
4324
4325 def test_consume_iterator_twice(self):
4326 self.create_file("file.txt")
4327 iterator = os.scandir(self.path)
4328
4329 entries = list(iterator)
4330 self.assertEqual(len(entries), 1, entries)
4331
4332 # check than consuming the iterator twice doesn't raise exception
4333 entries2 = list(iterator)
4334 self.assertEqual(len(entries2), 0, entries2)
4335
4336 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004337 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004338 self.assertRaises(TypeError, os.scandir, obj)
4339
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004340 def test_close(self):
4341 self.create_file("file.txt")
4342 self.create_file("file2.txt")
4343 iterator = os.scandir(self.path)
4344 next(iterator)
4345 iterator.close()
4346 # multiple closes
4347 iterator.close()
4348 with self.check_no_resource_warning():
4349 del iterator
4350
4351 def test_context_manager(self):
4352 self.create_file("file.txt")
4353 self.create_file("file2.txt")
4354 with os.scandir(self.path) as iterator:
4355 next(iterator)
4356 with self.check_no_resource_warning():
4357 del iterator
4358
4359 def test_context_manager_close(self):
4360 self.create_file("file.txt")
4361 self.create_file("file2.txt")
4362 with os.scandir(self.path) as iterator:
4363 next(iterator)
4364 iterator.close()
4365
4366 def test_context_manager_exception(self):
4367 self.create_file("file.txt")
4368 self.create_file("file2.txt")
4369 with self.assertRaises(ZeroDivisionError):
4370 with os.scandir(self.path) as iterator:
4371 next(iterator)
4372 1/0
4373 with self.check_no_resource_warning():
4374 del iterator
4375
4376 def test_resource_warning(self):
4377 self.create_file("file.txt")
4378 self.create_file("file2.txt")
4379 iterator = os.scandir(self.path)
4380 next(iterator)
4381 with self.assertWarns(ResourceWarning):
4382 del iterator
4383 support.gc_collect()
4384 # exhausted iterator
4385 iterator = os.scandir(self.path)
4386 list(iterator)
4387 with self.check_no_resource_warning():
4388 del iterator
4389
Victor Stinner6036e442015-03-08 01:58:04 +01004390
Ethan Furmancdc08792016-06-02 15:06:09 -07004391class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004392
4393 # Abstracted so it can be overridden to test pure Python implementation
4394 # if a C version is provided.
4395 fspath = staticmethod(os.fspath)
4396
Ethan Furmancdc08792016-06-02 15:06:09 -07004397 def test_return_bytes(self):
4398 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004399 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004400
4401 def test_return_string(self):
4402 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004403 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004404
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004405 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004406 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004407 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004408
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004409 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004410 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4411 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4412
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004413 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004414 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4415 self.assertTrue(issubclass(FakePath, os.PathLike))
4416 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004417
Ethan Furmancdc08792016-06-02 15:06:09 -07004418 def test_garbage_in_exception_out(self):
4419 vapor = type('blah', (), {})
4420 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004421 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004422
4423 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004424 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004425
Brett Cannon044283a2016-07-15 10:41:49 -07004426 def test_bad_pathlike(self):
4427 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004428 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004429 # __fspath__ attribute that is not callable.
4430 c = type('foo', (), {})
4431 c.__fspath__ = 1
4432 self.assertRaises(TypeError, self.fspath, c())
4433 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004434 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004435 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004436
Bar Hareleae87e32019-12-22 11:57:27 +02004437 def test_pathlike_subclasshook(self):
4438 # bpo-38878: subclasshook causes subclass checks
4439 # true on abstract implementation.
4440 class A(os.PathLike):
4441 pass
4442 self.assertFalse(issubclass(FakePath, A))
4443 self.assertTrue(issubclass(FakePath, os.PathLike))
4444
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004445 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004446 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004447
Victor Stinnerc29b5852017-11-02 07:28:27 -07004448
4449class TimesTests(unittest.TestCase):
4450 def test_times(self):
4451 times = os.times()
4452 self.assertIsInstance(times, os.times_result)
4453
4454 for field in ('user', 'system', 'children_user', 'children_system',
4455 'elapsed'):
4456 value = getattr(times, field)
4457 self.assertIsInstance(value, float)
4458
4459 if os.name == 'nt':
4460 self.assertEqual(times.children_user, 0)
4461 self.assertEqual(times.children_system, 0)
4462 self.assertEqual(times.elapsed, 0)
4463
4464
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004465# Only test if the C version is provided, otherwise TestPEP519 already tested
4466# the pure Python implementation.
4467if hasattr(os, "_fspath"):
4468 class TestPEP519PurePython(TestPEP519):
4469
4470 """Explicitly test the pure Python implementation of os.fspath()."""
4471
4472 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004473
4474
Fred Drake2e2be372001-09-20 21:33:42 +00004475if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004476 unittest.main()