blob: 8b3d1feb78fe36d26fdfebd027af83ee55e629d3 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Christian Heimescd9fed62020-11-13 19:48:52 +010018import select
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Christian Heimescd9fed62020-11-13 19:48:52 +010023import struct
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020027import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020028import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020029import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070030import types
Victor Stinner47aacc82015-06-12 17:26:23 +020031import unittest
32import uuid
33import warnings
34from test import support
Hai Shid94af3f2020-08-08 17:32:41 +080035from test.support import import_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080036from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030037from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080038from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080039from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070040from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020041
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042try:
43 import resource
44except ImportError:
45 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010050try:
51 import _winapi
52except ImportError:
53 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020054try:
R David Murrayf2ad1732014-12-25 18:36:56 -050055 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Christian Heimescd9fed62020-11-13 19:48:52 +010064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080066from test.support import unix_shell
67from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
Berker Peksag4af23d72016-09-15 20:32:44 +030087def requires_os_func(name):
88 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
89
90
Victor Stinnerae39d232016-03-24 17:12:55 +010091def create_file(filename, content=b'content'):
92 with open(filename, "xb", 0) as fp:
93 fp.write(content)
94
95
Victor Stinner1de61d32020-11-17 23:08:10 +010096# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
97requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
98 'on AIX, splice() only accepts sockets')
99
100
Victor Stinner689830e2019-06-26 17:31:12 +0200101class MiscTests(unittest.TestCase):
102 def test_getcwd(self):
103 cwd = os.getcwd()
104 self.assertIsInstance(cwd, str)
105
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200106 def test_getcwd_long_path(self):
107 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
108 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
109 # longer path if longer paths support is enabled. Internally, the os
110 # module uses MAXPATHLEN which is at least 1024.
111 #
112 # Use a directory name of 200 characters to fit into Windows MAX_PATH
113 # limit.
114 #
115 # On Windows, the test can stop when trying to create a path longer
116 # than MAX_PATH if long paths support is disabled:
117 # see RtlAreLongPathsEnabled().
118 min_len = 2000 # characters
pxinwraa1b8a12020-11-29 04:21:30 +0800119 # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path
120 # longer than PATH_MAX will fail.
121 if sys.platform == 'vxworks':
122 min_len = 1000
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200123 dirlen = 200 # characters
124 dirname = 'python_test_dir_'
125 dirname = dirname + ('a' * (dirlen - len(dirname)))
126
127 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800128 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200129 expected = path
130
131 while True:
132 cwd = os.getcwd()
133 self.assertEqual(cwd, expected)
134
135 need = min_len - (len(cwd) + len(os.path.sep))
136 if need <= 0:
137 break
138 if len(dirname) > need and need > 0:
139 dirname = dirname[:need]
140
141 path = os.path.join(path, dirname)
142 try:
143 os.mkdir(path)
144 # On Windows, chdir() can fail
145 # even if mkdir() succeeded
146 os.chdir(path)
147 except FileNotFoundError:
148 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
149 # ERROR_FILENAME_EXCED_RANGE (206) errors
150 # ("The filename or extension is too long")
151 break
152 except OSError as exc:
153 if exc.errno == errno.ENAMETOOLONG:
154 break
155 else:
156 raise
157
158 expected = path
159
160 if support.verbose:
161 print(f"Tested current directory length: {len(cwd)}")
162
Victor Stinner689830e2019-06-26 17:31:12 +0200163 def test_getcwdb(self):
164 cwd = os.getcwdb()
165 self.assertIsInstance(cwd, bytes)
166 self.assertEqual(os.fsdecode(cwd), os.getcwd())
167
168
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000169# Tests creating TESTFN
170class FileTests(unittest.TestCase):
171 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800172 if os.path.lexists(os_helper.TESTFN):
173 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000174 tearDown = setUp
175
176 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800177 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000178 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800179 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000180
Christian Heimesfdab48e2008-01-20 09:06:41 +0000181 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800182 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000183 # We must allocate two consecutive file descriptors, otherwise
184 # it will mess up other file descriptors (perhaps even the three
185 # standard ones).
186 second = os.dup(first)
187 try:
188 retries = 0
189 while second != first + 1:
190 os.close(first)
191 retries += 1
192 if retries > 10:
193 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000194 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000195 first, second = second, os.dup(second)
196 finally:
197 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000198 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000199 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000200 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000201
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000202 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000203 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800204 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000205 old = sys.getrefcount(path)
206 self.assertRaises(TypeError, os.rename, path, 0)
207 new = sys.getrefcount(path)
208 self.assertEqual(old, new)
209
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000210 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800211 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000212 fobj.write(b"spam")
213 fobj.flush()
214 fd = fobj.fileno()
215 os.lseek(fd, 0, 0)
216 s = os.read(fd, 4)
217 self.assertEqual(type(s), bytes)
218 self.assertEqual(s, b"spam")
219
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200220 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200221 # Skip the test on 32-bit platforms: the number of bytes must fit in a
222 # Py_ssize_t type
223 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
224 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200225 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
226 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800227 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
228 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200229
230 # Issue #21932: Make sure that os.read() does not raise an
231 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800232 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200233 data = os.read(fp.fileno(), size)
234
Victor Stinner8c663fd2017-11-08 14:44:44 -0800235 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200236 # operating system is free to return less bytes than requested.
237 self.assertEqual(data, b'test')
238
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000239 def test_write(self):
240 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800241 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000242 self.assertRaises(TypeError, os.write, fd, "beans")
243 os.write(fd, b"bacon\n")
244 os.write(fd, bytearray(b"eggs\n"))
245 os.write(fd, memoryview(b"spam\n"))
246 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800247 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000248 self.assertEqual(fobj.read().splitlines(),
249 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000250
Victor Stinnere0daff12011-03-20 23:36:35 +0100251 def write_windows_console(self, *args):
252 retcode = subprocess.call(args,
253 # use a new console to not flood the test output
254 creationflags=subprocess.CREATE_NEW_CONSOLE,
255 # use a shell to hide the console window (SW_HIDE)
256 shell=True)
257 self.assertEqual(retcode, 0)
258
259 @unittest.skipUnless(sys.platform == 'win32',
260 'test specific to the Windows console')
261 def test_write_windows_console(self):
262 # Issue #11395: the Windows console returns an error (12: not enough
263 # space error) on writing into stdout if stdout mode is binary and the
264 # length is greater than 66,000 bytes (or less, depending on heap
265 # usage).
266 code = "print('x' * 100000)"
267 self.write_windows_console(sys.executable, "-c", code)
268 self.write_windows_console(sys.executable, "-u", "-c", code)
269
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000270 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800271 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Inada Naokia6925652021-04-29 11:35:36 +0900272 f = os.fdopen(fd, *args, encoding="utf-8")
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200273 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000274
275 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800276 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200277 os.close(fd)
278
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000279 self.fdopen_helper()
280 self.fdopen_helper('r')
281 self.fdopen_helper('r', 100)
282
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100283 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800284 TESTFN2 = os_helper.TESTFN + ".2"
285 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
286 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100287
Hai Shi0c4f0f32020-06-30 21:46:31 +0800288 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100289 create_file(TESTFN2, b"2")
290
Hai Shi0c4f0f32020-06-30 21:46:31 +0800291 os.replace(os_helper.TESTFN, TESTFN2)
292 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Inada Naokia6925652021-04-29 11:35:36 +0900293 with open(TESTFN2, 'r', encoding='utf-8') as f:
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100294 self.assertEqual(f.read(), "1")
295
Martin Panterbf19d162015-09-09 01:01:13 +0000296 def test_open_keywords(self):
297 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
298 dir_fd=None)
299 os.close(f)
300
301 def test_symlink_keywords(self):
302 symlink = support.get_attribute(os, "symlink")
303 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800304 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000305 target_is_directory=False, dir_fd=None)
306 except (NotImplementedError, OSError):
307 pass # No OS support or unprivileged user
308
Pablo Galindoaac4d032019-05-31 19:39:47 +0100309 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
310 def test_copy_file_range_invalid_values(self):
311 with self.assertRaises(ValueError):
312 os.copy_file_range(0, 1, -10)
313
314 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
315 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800316 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100317 data = b'0123456789'
318
Hai Shi0c4f0f32020-06-30 21:46:31 +0800319 create_file(os_helper.TESTFN, data)
320 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100321
Hai Shi0c4f0f32020-06-30 21:46:31 +0800322 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100323 self.addCleanup(in_file.close)
324 in_fd = in_file.fileno()
325
326 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800327 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100328 self.addCleanup(out_file.close)
329 out_fd = out_file.fileno()
330
331 try:
332 i = os.copy_file_range(in_fd, out_fd, 5)
333 except OSError as e:
334 # Handle the case in which Python was compiled
335 # in a system with the syscall but without support
336 # in the kernel.
337 if e.errno != errno.ENOSYS:
338 raise
339 self.skipTest(e)
340 else:
341 # The number of copied bytes can be less than
342 # the number of bytes originally requested.
343 self.assertIn(i, range(0, 6));
344
345 with open(TESTFN2, 'rb') as in_file:
346 self.assertEqual(in_file.read(), data[:i])
347
348 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
349 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800350 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100351 data = b'0123456789'
352 bytes_to_copy = 6
353 in_skip = 3
354 out_seek = 5
355
Hai Shi0c4f0f32020-06-30 21:46:31 +0800356 create_file(os_helper.TESTFN, data)
357 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100358
Hai Shi0c4f0f32020-06-30 21:46:31 +0800359 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100360 self.addCleanup(in_file.close)
361 in_fd = in_file.fileno()
362
363 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800364 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100365 self.addCleanup(out_file.close)
366 out_fd = out_file.fileno()
367
368 try:
369 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
370 offset_src=in_skip,
371 offset_dst=out_seek)
372 except OSError as e:
373 # Handle the case in which Python was compiled
374 # in a system with the syscall but without support
375 # in the kernel.
376 if e.errno != errno.ENOSYS:
377 raise
378 self.skipTest(e)
379 else:
380 # The number of copied bytes can be less than
381 # the number of bytes originally requested.
382 self.assertIn(i, range(0, bytes_to_copy+1));
383
384 with open(TESTFN4, 'rb') as in_file:
385 read = in_file.read()
386 # seeked bytes (5) are zero'ed
387 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
388 # 012 are skipped (in_skip)
389 # 345678 are copied in the file (in_skip + bytes_to_copy)
390 self.assertEqual(read[out_seek:],
391 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200392
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000393 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
394 def test_splice_invalid_values(self):
395 with self.assertRaises(ValueError):
396 os.splice(0, 1, -10)
397
398 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100399 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000400 def test_splice(self):
401 TESTFN2 = os_helper.TESTFN + ".3"
402 data = b'0123456789'
403
404 create_file(os_helper.TESTFN, data)
405 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
406
407 in_file = open(os_helper.TESTFN, 'rb')
408 self.addCleanup(in_file.close)
409 in_fd = in_file.fileno()
410
411 read_fd, write_fd = os.pipe()
412 self.addCleanup(lambda: os.close(read_fd))
413 self.addCleanup(lambda: os.close(write_fd))
414
415 try:
416 i = os.splice(in_fd, write_fd, 5)
417 except OSError as e:
418 # Handle the case in which Python was compiled
419 # in a system with the syscall but without support
420 # in the kernel.
421 if e.errno != errno.ENOSYS:
422 raise
423 self.skipTest(e)
424 else:
425 # The number of copied bytes can be less than
426 # the number of bytes originally requested.
427 self.assertIn(i, range(0, 6));
428
429 self.assertEqual(os.read(read_fd, 100), data[:i])
430
431 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100432 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000433 def test_splice_offset_in(self):
434 TESTFN4 = os_helper.TESTFN + ".4"
435 data = b'0123456789'
436 bytes_to_copy = 6
437 in_skip = 3
438
439 create_file(os_helper.TESTFN, data)
440 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
441
442 in_file = open(os_helper.TESTFN, 'rb')
443 self.addCleanup(in_file.close)
444 in_fd = in_file.fileno()
445
446 read_fd, write_fd = os.pipe()
447 self.addCleanup(lambda: os.close(read_fd))
448 self.addCleanup(lambda: os.close(write_fd))
449
450 try:
451 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
452 except OSError as e:
453 # Handle the case in which Python was compiled
454 # in a system with the syscall but without support
455 # in the kernel.
456 if e.errno != errno.ENOSYS:
457 raise
458 self.skipTest(e)
459 else:
460 # The number of copied bytes can be less than
461 # the number of bytes originally requested.
462 self.assertIn(i, range(0, bytes_to_copy+1));
463
464 read = os.read(read_fd, 100)
465 # 012 are skipped (in_skip)
466 # 345678 are copied in the file (in_skip + bytes_to_copy)
467 self.assertEqual(read, data[in_skip:in_skip+i])
468
469 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100470 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000471 def test_splice_offset_out(self):
472 TESTFN4 = os_helper.TESTFN + ".4"
473 data = b'0123456789'
474 bytes_to_copy = 6
475 out_seek = 3
476
477 create_file(os_helper.TESTFN, data)
478 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
479
480 read_fd, write_fd = os.pipe()
481 self.addCleanup(lambda: os.close(read_fd))
482 self.addCleanup(lambda: os.close(write_fd))
483 os.write(write_fd, data)
484
485 out_file = open(TESTFN4, 'w+b')
486 self.addCleanup(os_helper.unlink, TESTFN4)
487 self.addCleanup(out_file.close)
488 out_fd = out_file.fileno()
489
490 try:
491 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
492 except OSError as e:
493 # Handle the case in which Python was compiled
494 # in a system with the syscall but without support
495 # in the kernel.
496 if e.errno != errno.ENOSYS:
497 raise
498 self.skipTest(e)
499 else:
500 # The number of copied bytes can be less than
501 # the number of bytes originally requested.
502 self.assertIn(i, range(0, bytes_to_copy+1));
503
504 with open(TESTFN4, 'rb') as in_file:
505 read = in_file.read()
506 # seeked bytes (5) are zero'ed
507 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
508 # 012 are skipped (in_skip)
509 # 345678 are copied in the file (in_skip + bytes_to_copy)
510 self.assertEqual(read[out_seek:], data[:i])
511
512
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000513# Test attributes on return values from os.*stat* family.
514class StatAttributeTests(unittest.TestCase):
515 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800516 self.fname = os_helper.TESTFN
517 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100518 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000519
Antoine Pitrou38425292010-09-21 18:19:07 +0000520 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000521 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000522
523 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000524 self.assertEqual(result[stat.ST_SIZE], 3)
525 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000526
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000527 # Make sure all the attributes are there
528 members = dir(result)
529 for name in dir(stat):
530 if name[:3] == 'ST_':
531 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000532 if name.endswith("TIME"):
533 def trunc(x): return int(x)
534 else:
535 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000536 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000537 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000538 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000539
Larry Hastings6fe20b32012-04-19 15:07:49 -0700540 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700541 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700542 for name in 'st_atime st_mtime st_ctime'.split():
543 floaty = int(getattr(result, name) * 100000)
544 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700545 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700546
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000547 try:
548 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200549 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000550 except IndexError:
551 pass
552
553 # Make sure that assignment fails
554 try:
555 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200556 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000557 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000558 pass
559
560 try:
561 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200562 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000563 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000564 pass
565
566 try:
567 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200568 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000569 except AttributeError:
570 pass
571
572 # Use the stat_result constructor with a too-short tuple.
573 try:
574 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200575 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000576 except TypeError:
577 pass
578
Ezio Melotti42da6632011-03-15 05:18:48 +0200579 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000580 try:
581 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
582 except TypeError:
583 pass
584
Antoine Pitrou38425292010-09-21 18:19:07 +0000585 def test_stat_attributes(self):
586 self.check_stat_attributes(self.fname)
587
588 def test_stat_attributes_bytes(self):
589 try:
590 fname = self.fname.encode(sys.getfilesystemencoding())
591 except UnicodeEncodeError:
592 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700593 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000594
Christian Heimes25827622013-10-12 01:27:08 +0200595 def test_stat_result_pickle(self):
596 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200597 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
598 p = pickle.dumps(result, proto)
599 self.assertIn(b'stat_result', p)
600 if proto < 4:
601 self.assertIn(b'cos\nstat_result\n', p)
602 unpickled = pickle.loads(p)
603 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200604
Serhiy Storchaka43767632013-11-03 21:31:38 +0200605 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000606 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700607 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000608
609 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000610 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000611
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000612 # Make sure all the attributes are there.
613 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
614 'ffree', 'favail', 'flag', 'namemax')
615 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000616 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000617
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100618 self.assertTrue(isinstance(result.f_fsid, int))
619
620 # Test that the size of the tuple doesn't change
621 self.assertEqual(len(result), 10)
622
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000623 # Make sure that assignment really fails
624 try:
625 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200626 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000627 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000628 pass
629
630 try:
631 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200632 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000633 except AttributeError:
634 pass
635
636 # Use the constructor with a too-short tuple.
637 try:
638 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200639 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000640 except TypeError:
641 pass
642
Ezio Melotti42da6632011-03-15 05:18:48 +0200643 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000644 try:
645 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
646 except TypeError:
647 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000648
Christian Heimes25827622013-10-12 01:27:08 +0200649 @unittest.skipUnless(hasattr(os, 'statvfs'),
650 "need os.statvfs()")
651 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700652 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200653
Serhiy Storchakabad12572014-12-15 14:03:42 +0200654 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
655 p = pickle.dumps(result, proto)
656 self.assertIn(b'statvfs_result', p)
657 if proto < 4:
658 self.assertIn(b'cos\nstatvfs_result\n', p)
659 unpickled = pickle.loads(p)
660 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200661
Serhiy Storchaka43767632013-11-03 21:31:38 +0200662 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
663 def test_1686475(self):
664 # Verify that an open file can be stat'ed
665 try:
666 os.stat(r"c:\pagefile.sys")
667 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600668 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200669 except OSError as e:
670 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000671
Serhiy Storchaka43767632013-11-03 21:31:38 +0200672 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
673 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
674 def test_15261(self):
675 # Verify that stat'ing a closed fd does not cause crash
676 r, w = os.pipe()
677 try:
678 os.stat(r) # should not raise error
679 finally:
680 os.close(r)
681 os.close(w)
682 with self.assertRaises(OSError) as ctx:
683 os.stat(r)
684 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100685
Zachary Ware63f277b2014-06-19 09:46:37 -0500686 def check_file_attributes(self, result):
687 self.assertTrue(hasattr(result, 'st_file_attributes'))
688 self.assertTrue(isinstance(result.st_file_attributes, int))
689 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
690
691 @unittest.skipUnless(sys.platform == "win32",
692 "st_file_attributes is Win32 specific")
693 def test_file_attributes(self):
694 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
695 result = os.stat(self.fname)
696 self.check_file_attributes(result)
697 self.assertEqual(
698 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
699 0)
700
701 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800702 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200703 os.mkdir(dirname)
704 self.addCleanup(os.rmdir, dirname)
705
706 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500707 self.check_file_attributes(result)
708 self.assertEqual(
709 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
710 stat.FILE_ATTRIBUTE_DIRECTORY)
711
Berker Peksag0b4dc482016-09-17 15:49:59 +0300712 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
713 def test_access_denied(self):
714 # Default to FindFirstFile WIN32_FIND_DATA when access is
715 # denied. See issue 28075.
716 # os.environ['TEMP'] should be located on a volume that
717 # supports file ACLs.
718 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800719 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300720 create_file(fname, b'ABC')
721 # Deny the right to [S]YNCHRONIZE on the file to
722 # force CreateFile to fail with ERROR_ACCESS_DENIED.
723 DETACHED_PROCESS = 8
724 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500725 # bpo-30584: Use security identifier *S-1-5-32-545 instead
726 # of localized "Users" to not depend on the locale.
727 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300728 creationflags=DETACHED_PROCESS
729 )
730 result = os.stat(fname)
731 self.assertNotEqual(result.st_size, 0)
732
Steve Dower772ec0f2019-09-04 14:42:54 -0700733 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
734 def test_stat_block_device(self):
735 # bpo-38030: os.stat fails for block devices
736 # Test a filename like "//./C:"
737 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
738 result = os.stat(fname)
739 self.assertEqual(result.st_mode, stat.S_IFBLK)
740
Victor Stinner47aacc82015-06-12 17:26:23 +0200741
742class UtimeTests(unittest.TestCase):
743 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800744 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200745 self.fname = os.path.join(self.dirname, "f1")
746
Hai Shi0c4f0f32020-06-30 21:46:31 +0800747 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200748 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100749 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200750
Victor Stinner47aacc82015-06-12 17:26:23 +0200751 def support_subsecond(self, filename):
752 # Heuristic to check if the filesystem supports timestamp with
753 # subsecond resolution: check if float and int timestamps are different
754 st = os.stat(filename)
755 return ((st.st_atime != st[7])
756 or (st.st_mtime != st[8])
757 or (st.st_ctime != st[9]))
758
759 def _test_utime(self, set_time, filename=None):
760 if not filename:
761 filename = self.fname
762
763 support_subsecond = self.support_subsecond(filename)
764 if support_subsecond:
765 # Timestamp with a resolution of 1 microsecond (10^-6).
766 #
767 # The resolution of the C internal function used by os.utime()
768 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
769 # test with a resolution of 1 ns requires more work:
770 # see the issue #15745.
771 atime_ns = 1002003000 # 1.002003 seconds
772 mtime_ns = 4005006000 # 4.005006 seconds
773 else:
774 # use a resolution of 1 second
775 atime_ns = 5 * 10**9
776 mtime_ns = 8 * 10**9
777
778 set_time(filename, (atime_ns, mtime_ns))
779 st = os.stat(filename)
780
781 if support_subsecond:
782 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
783 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
784 else:
785 self.assertEqual(st.st_atime, atime_ns * 1e-9)
786 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
787 self.assertEqual(st.st_atime_ns, atime_ns)
788 self.assertEqual(st.st_mtime_ns, mtime_ns)
789
790 def test_utime(self):
791 def set_time(filename, ns):
792 # test the ns keyword parameter
793 os.utime(filename, ns=ns)
794 self._test_utime(set_time)
795
796 @staticmethod
797 def ns_to_sec(ns):
798 # Convert a number of nanosecond (int) to a number of seconds (float).
799 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
800 # issue, os.utime() rounds towards minus infinity.
801 return (ns * 1e-9) + 0.5e-9
802
803 def test_utime_by_indexed(self):
804 # pass times as floating point seconds as the second indexed parameter
805 def set_time(filename, ns):
806 atime_ns, mtime_ns = ns
807 atime = self.ns_to_sec(atime_ns)
808 mtime = self.ns_to_sec(mtime_ns)
809 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
810 # or utime(time_t)
811 os.utime(filename, (atime, mtime))
812 self._test_utime(set_time)
813
814 def test_utime_by_times(self):
815 def set_time(filename, ns):
816 atime_ns, mtime_ns = ns
817 atime = self.ns_to_sec(atime_ns)
818 mtime = self.ns_to_sec(mtime_ns)
819 # test the times keyword parameter
820 os.utime(filename, times=(atime, mtime))
821 self._test_utime(set_time)
822
823 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
824 "follow_symlinks support for utime required "
825 "for this test.")
826 def test_utime_nofollow_symlinks(self):
827 def set_time(filename, ns):
828 # use follow_symlinks=False to test utimensat(timespec)
829 # or lutimes(timeval)
830 os.utime(filename, ns=ns, follow_symlinks=False)
831 self._test_utime(set_time)
832
833 @unittest.skipUnless(os.utime in os.supports_fd,
834 "fd support for utime required for this test.")
835 def test_utime_fd(self):
836 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100837 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200838 # use a file descriptor to test futimens(timespec)
839 # or futimes(timeval)
840 os.utime(fp.fileno(), ns=ns)
841 self._test_utime(set_time)
842
843 @unittest.skipUnless(os.utime in os.supports_dir_fd,
844 "dir_fd support for utime required for this test.")
845 def test_utime_dir_fd(self):
846 def set_time(filename, ns):
847 dirname, name = os.path.split(filename)
848 dirfd = os.open(dirname, os.O_RDONLY)
849 try:
850 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
851 os.utime(name, dir_fd=dirfd, ns=ns)
852 finally:
853 os.close(dirfd)
854 self._test_utime(set_time)
855
856 def test_utime_directory(self):
857 def set_time(filename, ns):
858 # test calling os.utime() on a directory
859 os.utime(filename, ns=ns)
860 self._test_utime(set_time, filename=self.dirname)
861
862 def _test_utime_current(self, set_time):
863 # Get the system clock
864 current = time.time()
865
866 # Call os.utime() to set the timestamp to the current system clock
867 set_time(self.fname)
868
869 if not self.support_subsecond(self.fname):
870 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700871 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200872 # On Windows, the usual resolution of time.time() is 15.6 ms.
873 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700874 #
875 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
876 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200877 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200878 st = os.stat(self.fname)
879 msg = ("st_time=%r, current=%r, dt=%r"
880 % (st.st_mtime, current, st.st_mtime - current))
881 self.assertAlmostEqual(st.st_mtime, current,
882 delta=delta, msg=msg)
883
884 def test_utime_current(self):
885 def set_time(filename):
886 # Set to the current time in the new way
887 os.utime(self.fname)
888 self._test_utime_current(set_time)
889
890 def test_utime_current_old(self):
891 def set_time(filename):
892 # Set to the current time in the old explicit way.
893 os.utime(self.fname, None)
894 self._test_utime_current(set_time)
895
896 def get_file_system(self, path):
897 if sys.platform == 'win32':
898 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
899 import ctypes
900 kernel32 = ctypes.windll.kernel32
901 buf = ctypes.create_unicode_buffer("", 100)
902 ok = kernel32.GetVolumeInformationW(root, None, 0,
903 None, None, None,
904 buf, len(buf))
905 if ok:
906 return buf.value
907 # return None if the filesystem is unknown
908
909 def test_large_time(self):
910 # Many filesystems are limited to the year 2038. At least, the test
911 # pass with NTFS filesystem.
912 if self.get_file_system(self.dirname) != "NTFS":
913 self.skipTest("requires NTFS")
914
915 large = 5000000000 # some day in 2128
916 os.utime(self.fname, (large, large))
917 self.assertEqual(os.stat(self.fname).st_mtime, large)
918
919 def test_utime_invalid_arguments(self):
920 # seconds and nanoseconds parameters are mutually exclusive
921 with self.assertRaises(ValueError):
922 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200923 with self.assertRaises(TypeError):
924 os.utime(self.fname, [5, 5])
925 with self.assertRaises(TypeError):
926 os.utime(self.fname, (5,))
927 with self.assertRaises(TypeError):
928 os.utime(self.fname, (5, 5, 5))
929 with self.assertRaises(TypeError):
930 os.utime(self.fname, ns=[5, 5])
931 with self.assertRaises(TypeError):
932 os.utime(self.fname, ns=(5,))
933 with self.assertRaises(TypeError):
934 os.utime(self.fname, ns=(5, 5, 5))
935
936 if os.utime not in os.supports_follow_symlinks:
937 with self.assertRaises(NotImplementedError):
938 os.utime(self.fname, (5, 5), follow_symlinks=False)
939 if os.utime not in os.supports_fd:
940 with open(self.fname, 'wb', 0) as fp:
941 with self.assertRaises(TypeError):
942 os.utime(fp.fileno(), (5, 5))
943 if os.utime not in os.supports_dir_fd:
944 with self.assertRaises(NotImplementedError):
945 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200946
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300947 @support.cpython_only
948 def test_issue31577(self):
949 # The interpreter shouldn't crash in case utime() received a bad
950 # ns argument.
951 def get_bad_int(divmod_ret_val):
952 class BadInt:
953 def __divmod__(*args):
954 return divmod_ret_val
955 return BadInt()
956 with self.assertRaises(TypeError):
957 os.utime(self.fname, ns=(get_bad_int(42), 1))
958 with self.assertRaises(TypeError):
959 os.utime(self.fname, ns=(get_bad_int(()), 1))
960 with self.assertRaises(TypeError):
961 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
962
Victor Stinner47aacc82015-06-12 17:26:23 +0200963
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000964from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000965
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000966class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000967 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000968 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000969
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000970 def setUp(self):
971 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000972 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000973 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000974 for key, value in self._reference().items():
975 os.environ[key] = value
976
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000977 def tearDown(self):
978 os.environ.clear()
979 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000980 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000981 os.environb.clear()
982 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000983
Christian Heimes90333392007-11-01 19:08:42 +0000984 def _reference(self):
985 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
986
987 def _empty_mapping(self):
988 os.environ.clear()
989 return os.environ
990
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000991 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200992 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
993 'requires a shell')
pxinwre1e3c2d2020-12-16 05:20:07 +0800994 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
Martin v. Löwis5510f652005-02-17 21:23:20 +0000995 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000996 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300997 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200998 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300999 value = popen.read().strip()
1000 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +00001001
Xavier de Gayed1415312016-07-22 12:15:29 +02001002 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1003 'requires a shell')
pxinwre1e3c2d2020-12-16 05:20:07 +08001004 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
Christian Heimes1a13d592007-11-08 14:16:55 +00001005 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +02001006 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
1007 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +03001008 it = iter(popen)
1009 self.assertEqual(next(it), "line1\n")
1010 self.assertEqual(next(it), "line2\n")
1011 self.assertEqual(next(it), "line3\n")
1012 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +00001013
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001014 # Verify environ keys and values from the OS are of the
1015 # correct str type.
1016 def test_keyvalue_types(self):
1017 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001018 self.assertEqual(type(key), str)
1019 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001020
Christian Heimes90333392007-11-01 19:08:42 +00001021 def test_items(self):
1022 for key, value in self._reference().items():
1023 self.assertEqual(os.environ.get(key), value)
1024
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001025 # Issue 7310
1026 def test___repr__(self):
1027 """Check that the repr() of os.environ looks like environ({...})."""
1028 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +00001029 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
1030 '{!r}: {!r}'.format(key, value)
1031 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001032
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +00001033 def test_get_exec_path(self):
1034 defpath_list = os.defpath.split(os.pathsep)
1035 test_path = ['/monty', '/python', '', '/flying/circus']
1036 test_env = {'PATH': os.pathsep.join(test_path)}
1037
1038 saved_environ = os.environ
1039 try:
1040 os.environ = dict(test_env)
1041 # Test that defaulting to os.environ works.
1042 self.assertSequenceEqual(test_path, os.get_exec_path())
1043 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1044 finally:
1045 os.environ = saved_environ
1046
1047 # No PATH environment variable
1048 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1049 # Empty PATH environment variable
1050 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1051 # Supplied PATH environment variable
1052 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1053
Victor Stinnerb745a742010-05-18 17:17:23 +00001054 if os.supports_bytes_environ:
1055 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +00001056 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001057 # ignore BytesWarning warning
1058 with warnings.catch_warnings(record=True):
1059 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +00001060 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001061 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +00001062 pass
1063 else:
1064 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +00001065
1066 # bytes key and/or value
1067 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1068 ['abc'])
1069 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1070 ['abc'])
1071 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1072 ['abc'])
1073
1074 @unittest.skipUnless(os.supports_bytes_environ,
1075 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +00001076 def test_environb(self):
1077 # os.environ -> os.environb
1078 value = 'euro\u20ac'
1079 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001080 value_bytes = value.encode(sys.getfilesystemencoding(),
1081 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +00001082 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001083 msg = "U+20AC character is not encodable to %s" % (
1084 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +00001085 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +00001086 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(os.environ['unicode'], value)
1088 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +00001089
1090 # os.environb -> os.environ
1091 value = b'\xff'
1092 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001093 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +00001094 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001095 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001096
Victor Stinner161e7b32020-01-24 11:53:44 +01001097 def test_putenv_unsetenv(self):
1098 name = "PYTHONTESTVAR"
1099 value = "testvalue"
1100 code = f'import os; print(repr(os.environ.get({name!r})))'
1101
Hai Shi0c4f0f32020-06-30 21:46:31 +08001102 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +01001103 env.pop(name, None)
1104
1105 os.putenv(name, value)
1106 proc = subprocess.run([sys.executable, '-c', code], check=True,
1107 stdout=subprocess.PIPE, text=True)
1108 self.assertEqual(proc.stdout.rstrip(), repr(value))
1109
1110 os.unsetenv(name)
1111 proc = subprocess.run([sys.executable, '-c', code], check=True,
1112 stdout=subprocess.PIPE, text=True)
1113 self.assertEqual(proc.stdout.rstrip(), repr(None))
1114
Victor Stinner13ff2452018-01-22 18:32:50 +01001115 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +01001116 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +01001117 def test_putenv_unsetenv_error(self):
1118 # Empty variable name is invalid.
1119 # "=" and null character are not allowed in a variable name.
1120 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1121 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1122 self.assertRaises((OSError, ValueError), os.unsetenv, name)
1123
Victor Stinnerb73dd022020-01-22 21:11:17 +01001124 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +01001125 # On Windows, an environment variable string ("name=value" string)
1126 # is limited to 32,767 characters
1127 longstr = 'x' * 32_768
1128 self.assertRaises(ValueError, os.putenv, longstr, "1")
1129 self.assertRaises(ValueError, os.putenv, "X", longstr)
1130 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +01001131
Victor Stinner6d101392013-04-14 16:35:04 +02001132 def test_key_type(self):
1133 missing = 'missingkey'
1134 self.assertNotIn(missing, os.environ)
1135
Victor Stinner839e5ea2013-04-14 16:43:03 +02001136 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001137 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001138 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001139 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001140
Victor Stinner839e5ea2013-04-14 16:43:03 +02001141 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001142 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001143 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001144 self.assertTrue(cm.exception.__suppress_context__)
1145
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001146 def _test_environ_iteration(self, collection):
1147 iterator = iter(collection)
1148 new_key = "__new_key__"
1149
1150 next(iterator) # start iteration over os.environ.items
1151
1152 # add a new key in os.environ mapping
1153 os.environ[new_key] = "test_environ_iteration"
1154
1155 try:
1156 next(iterator) # force iteration over modified mapping
1157 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1158 finally:
1159 del os.environ[new_key]
1160
1161 def test_iter_error_when_changing_os_environ(self):
1162 self._test_environ_iteration(os.environ)
1163
1164 def test_iter_error_when_changing_os_environ_items(self):
1165 self._test_environ_iteration(os.environ.items())
1166
1167 def test_iter_error_when_changing_os_environ_values(self):
1168 self._test_environ_iteration(os.environ.values())
1169
Charles Burklandd648ef12020-03-13 09:04:43 -07001170 def _test_underlying_process_env(self, var, expected):
1171 if not (unix_shell and os.path.exists(unix_shell)):
1172 return
1173
1174 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1175 value = popen.read().strip()
1176
1177 self.assertEqual(expected, value)
1178
1179 def test_or_operator(self):
1180 overridden_key = '_TEST_VAR_'
1181 original_value = 'original_value'
1182 os.environ[overridden_key] = original_value
1183
1184 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1185 expected = dict(os.environ)
1186 expected.update(new_vars_dict)
1187
1188 actual = os.environ | new_vars_dict
1189 self.assertDictEqual(expected, actual)
1190 self.assertEqual('3', actual[overridden_key])
1191
1192 new_vars_items = new_vars_dict.items()
1193 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1194
1195 self._test_underlying_process_env('_A_', '')
1196 self._test_underlying_process_env(overridden_key, original_value)
1197
1198 def test_ior_operator(self):
1199 overridden_key = '_TEST_VAR_'
1200 os.environ[overridden_key] = 'original_value'
1201
1202 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1203 expected = dict(os.environ)
1204 expected.update(new_vars_dict)
1205
1206 os.environ |= new_vars_dict
1207 self.assertEqual(expected, os.environ)
1208 self.assertEqual('3', os.environ[overridden_key])
1209
1210 self._test_underlying_process_env('_A_', '1')
1211 self._test_underlying_process_env(overridden_key, '3')
1212
1213 def test_ior_operator_invalid_dicts(self):
1214 os_environ_copy = os.environ.copy()
1215 with self.assertRaises(TypeError):
1216 dict_with_bad_key = {1: '_A_'}
1217 os.environ |= dict_with_bad_key
1218
1219 with self.assertRaises(TypeError):
1220 dict_with_bad_val = {'_A_': 1}
1221 os.environ |= dict_with_bad_val
1222
1223 # Check nothing was added.
1224 self.assertEqual(os_environ_copy, os.environ)
1225
1226 def test_ior_operator_key_value_iterable(self):
1227 overridden_key = '_TEST_VAR_'
1228 os.environ[overridden_key] = 'original_value'
1229
1230 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1231 expected = dict(os.environ)
1232 expected.update(new_vars_items)
1233
1234 os.environ |= new_vars_items
1235 self.assertEqual(expected, os.environ)
1236 self.assertEqual('3', os.environ[overridden_key])
1237
1238 self._test_underlying_process_env('_A_', '1')
1239 self._test_underlying_process_env(overridden_key, '3')
1240
1241 def test_ror_operator(self):
1242 overridden_key = '_TEST_VAR_'
1243 original_value = 'original_value'
1244 os.environ[overridden_key] = original_value
1245
1246 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1247 expected = dict(new_vars_dict)
1248 expected.update(os.environ)
1249
1250 actual = new_vars_dict | os.environ
1251 self.assertDictEqual(expected, actual)
1252 self.assertEqual(original_value, actual[overridden_key])
1253
1254 new_vars_items = new_vars_dict.items()
1255 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1256
1257 self._test_underlying_process_env('_A_', '')
1258 self._test_underlying_process_env(overridden_key, original_value)
1259
Victor Stinner6d101392013-04-14 16:35:04 +02001260
Tim Petersc4e09402003-04-25 07:11:48 +00001261class WalkTests(unittest.TestCase):
1262 """Tests for os.walk()."""
1263
Victor Stinner0561c532015-03-12 10:28:24 +01001264 # Wrapper to hide minor differences between os.walk and os.fwalk
1265 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001266 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001267 if 'follow_symlinks' in kwargs:
1268 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001269 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001270
Charles-François Natali7372b062012-02-05 15:15:38 +01001271 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001272 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001273 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001274
1275 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001276 # TESTFN/
1277 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001278 # tmp1
1279 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001280 # tmp2
1281 # SUB11/ no kids
1282 # SUB2/ a file kid and a dirsymlink kid
1283 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001284 # SUB21/ not readable
1285 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001286 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001287 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001288 # broken_link2
1289 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001290 # TEST2/
1291 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001292 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001293 self.sub1_path = join(self.walk_path, "SUB1")
1294 self.sub11_path = join(self.sub1_path, "SUB11")
1295 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001296 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001297 tmp1_path = join(self.walk_path, "tmp1")
1298 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001299 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001300 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001301 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001302 t2_path = join(os_helper.TESTFN, "TEST2")
1303 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001304 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001305 broken_link2_path = join(sub2_path, "broken_link2")
1306 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001307
1308 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001309 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001310 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001311 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001312 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001313
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001314 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001315 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001316 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001317
Hai Shi0c4f0f32020-06-30 21:46:31 +08001318 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001319 os.symlink(os.path.abspath(t2_path), self.link_path)
1320 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001321 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1322 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001323 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001324 ["broken_link", "broken_link2", "broken_link3",
1325 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001326 else:
pxinwr3e028b22019-02-15 13:04:47 +08001327 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001328
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001329 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001330 try:
1331 os.listdir(sub21_path)
1332 except PermissionError:
1333 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1334 else:
1335 os.chmod(sub21_path, stat.S_IRWXU)
1336 os.unlink(tmp5_path)
1337 os.rmdir(sub21_path)
1338 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001339
Victor Stinner0561c532015-03-12 10:28:24 +01001340 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001341 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001342 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001343
Tim Petersc4e09402003-04-25 07:11:48 +00001344 self.assertEqual(len(all), 4)
1345 # We can't know which order SUB1 and SUB2 will appear in.
1346 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1347 # flipped: TESTFN, SUB2, SUB1, SUB11
1348 flipped = all[0][1][0] != "SUB1"
1349 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001350 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001351 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001352 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1353 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1354 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1355 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001356
Brett Cannon3f9183b2016-08-26 14:44:48 -07001357 def test_walk_prune(self, walk_path=None):
1358 if walk_path is None:
1359 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001360 # Prune the search.
1361 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001362 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001363 all.append((root, dirs, files))
1364 # Don't descend into SUB1.
1365 if 'SUB1' in dirs:
1366 # Note that this also mutates the dirs we appended to all!
1367 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001368
Victor Stinner0561c532015-03-12 10:28:24 +01001369 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001370 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001371
1372 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001373 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001374 self.assertEqual(all[1], self.sub2_tree)
1375
Brett Cannon3f9183b2016-08-26 14:44:48 -07001376 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001377 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001378
Victor Stinner0561c532015-03-12 10:28:24 +01001379 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001380 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001381 all = list(self.walk(self.walk_path, topdown=False))
1382
Victor Stinner53b0a412016-03-26 01:12:36 +01001383 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001384 # We can't know which order SUB1 and SUB2 will appear in.
1385 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1386 # flipped: SUB2, SUB11, SUB1, TESTFN
1387 flipped = all[3][1][0] != "SUB1"
1388 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001389 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001390 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001391 self.assertEqual(all[3],
1392 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1393 self.assertEqual(all[flipped],
1394 (self.sub11_path, [], []))
1395 self.assertEqual(all[flipped + 1],
1396 (self.sub1_path, ["SUB11"], ["tmp2"]))
1397 self.assertEqual(all[2 - 2 * flipped],
1398 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001399
Victor Stinner0561c532015-03-12 10:28:24 +01001400 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001401 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001402 self.skipTest("need symlink support")
1403
1404 # Walk, following symlinks.
1405 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1406 for root, dirs, files in walk_it:
1407 if root == self.link_path:
1408 self.assertEqual(dirs, [])
1409 self.assertEqual(files, ["tmp4"])
1410 break
1411 else:
1412 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001413
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001414 def test_walk_bad_dir(self):
1415 # Walk top-down.
1416 errors = []
1417 walk_it = self.walk(self.walk_path, onerror=errors.append)
1418 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001419 self.assertEqual(errors, [])
1420 dir1 = 'SUB1'
1421 path1 = os.path.join(root, dir1)
1422 path1new = os.path.join(root, dir1 + '.new')
1423 os.rename(path1, path1new)
1424 try:
1425 roots = [r for r, d, f in walk_it]
1426 self.assertTrue(errors)
1427 self.assertNotIn(path1, roots)
1428 self.assertNotIn(path1new, roots)
1429 for dir2 in dirs:
1430 if dir2 != dir1:
1431 self.assertIn(os.path.join(root, dir2), roots)
1432 finally:
1433 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001434
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001435 def test_walk_many_open_files(self):
1436 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001437 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001438 p = os.path.join(base, *(['d']*depth))
1439 os.makedirs(p)
1440
1441 iters = [self.walk(base, topdown=False) for j in range(100)]
1442 for i in range(depth + 1):
1443 expected = (p, ['d'] if i else [], [])
1444 for it in iters:
1445 self.assertEqual(next(it), expected)
1446 p = os.path.dirname(p)
1447
1448 iters = [self.walk(base, topdown=True) for j in range(100)]
1449 p = base
1450 for i in range(depth + 1):
1451 expected = (p, ['d'] if i < depth else [], [])
1452 for it in iters:
1453 self.assertEqual(next(it), expected)
1454 p = os.path.join(p, 'd')
1455
Charles-François Natali7372b062012-02-05 15:15:38 +01001456
1457@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1458class FwalkTests(WalkTests):
1459 """Tests for os.fwalk()."""
1460
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001461 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001462 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001463 yield (root, dirs, files)
1464
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001465 def fwalk(self, *args, **kwargs):
1466 return os.fwalk(*args, **kwargs)
1467
Larry Hastingsc48fe982012-06-25 04:49:05 -07001468 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1469 """
1470 compare with walk() results.
1471 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001472 walk_kwargs = walk_kwargs.copy()
1473 fwalk_kwargs = fwalk_kwargs.copy()
1474 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1475 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1476 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001477
Charles-François Natali7372b062012-02-05 15:15:38 +01001478 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001479 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001480 expected[root] = (set(dirs), set(files))
1481
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001482 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001483 self.assertIn(root, expected)
1484 self.assertEqual(expected[root], (set(dirs), set(files)))
1485
Larry Hastingsc48fe982012-06-25 04:49:05 -07001486 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001487 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001488 self._compare_to_walk(kwargs, kwargs)
1489
Charles-François Natali7372b062012-02-05 15:15:38 +01001490 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001491 try:
1492 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001493 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001494 fwalk_kwargs = walk_kwargs.copy()
1495 fwalk_kwargs['dir_fd'] = fd
1496 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1497 finally:
1498 os.close(fd)
1499
1500 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001501 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001502 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001503 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001504 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001505 # check that the FD is valid
1506 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001507 # redundant check
1508 os.stat(rootfd)
1509 # check that listdir() returns consistent information
1510 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001511
1512 def test_fd_leak(self):
1513 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1514 # we both check that calling fwalk() a large number of times doesn't
1515 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1516 minfd = os.dup(1)
1517 os.close(minfd)
1518 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001519 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001520 pass
1521 newfd = os.dup(1)
1522 self.addCleanup(os.close, newfd)
1523 self.assertEqual(newfd, minfd)
1524
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001525 # fwalk() keeps file descriptors open
1526 test_walk_many_open_files = None
1527
1528
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001529class BytesWalkTests(WalkTests):
1530 """Tests for os.walk() with bytes."""
1531 def walk(self, top, **kwargs):
1532 if 'follow_symlinks' in kwargs:
1533 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1534 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1535 root = os.fsdecode(broot)
1536 dirs = list(map(os.fsdecode, bdirs))
1537 files = list(map(os.fsdecode, bfiles))
1538 yield (root, dirs, files)
1539 bdirs[:] = list(map(os.fsencode, dirs))
1540 bfiles[:] = list(map(os.fsencode, files))
1541
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001542@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1543class BytesFwalkTests(FwalkTests):
1544 """Tests for os.walk() with bytes."""
1545 def fwalk(self, top='.', *args, **kwargs):
1546 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1547 root = os.fsdecode(broot)
1548 dirs = list(map(os.fsdecode, bdirs))
1549 files = list(map(os.fsdecode, bfiles))
1550 yield (root, dirs, files, topfd)
1551 bdirs[:] = list(map(os.fsencode, dirs))
1552 bfiles[:] = list(map(os.fsencode, files))
1553
Charles-François Natali7372b062012-02-05 15:15:38 +01001554
Guido van Rossume7ba4952007-06-06 23:52:48 +00001555class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001556 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001557 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001558
1559 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001560 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001561 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1562 os.makedirs(path) # Should work
1563 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1564 os.makedirs(path)
1565
1566 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001567 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001568 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1569 os.makedirs(path)
1570 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1571 'dir5', 'dir6')
1572 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001573
Serhiy Storchakae304e332017-03-24 13:27:42 +02001574 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001575 with os_helper.temp_umask(0o002):
1576 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001577 parent = os.path.join(base, 'dir1')
1578 path = os.path.join(parent, 'dir2')
1579 os.makedirs(path, 0o555)
1580 self.assertTrue(os.path.exists(path))
1581 self.assertTrue(os.path.isdir(path))
1582 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001583 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1584 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001585
Terry Reedy5a22b652010-12-02 07:05:56 +00001586 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001587 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001588 mode = 0o777
1589 old_mask = os.umask(0o022)
1590 os.makedirs(path, mode)
1591 self.assertRaises(OSError, os.makedirs, path, mode)
1592 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001593 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001594 os.makedirs(path, mode=mode, exist_ok=True)
1595 os.umask(old_mask)
1596
Martin Pantera82642f2015-11-19 04:48:44 +00001597 # Issue #25583: A drive root could raise PermissionError on Windows
1598 os.makedirs(os.path.abspath('/'), exist_ok=True)
1599
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001600 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001601 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001602 S_ISGID = stat.S_ISGID
1603 mode = 0o777
1604 old_mask = os.umask(0o022)
1605 try:
1606 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001607 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001608 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001609 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001610 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001611 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001612 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001613 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1614 # The os should apply S_ISGID from the parent dir for us, but
1615 # this test need not depend on that behavior. Be explicit.
1616 os.makedirs(path, mode | S_ISGID)
1617 # http://bugs.python.org/issue14992
1618 # Should not fail when the bit is already set.
1619 os.makedirs(path, mode, exist_ok=True)
1620 # remove the bit.
1621 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001622 # May work even when the bit is not already set when demanded.
1623 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001624 finally:
1625 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001626
1627 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001628 base = os_helper.TESTFN
1629 path = os.path.join(os_helper.TESTFN, 'dir1')
Inada Naokia6925652021-04-29 11:35:36 +09001630 with open(path, 'w', encoding='utf-8') as f:
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001631 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001632 self.assertRaises(OSError, os.makedirs, path)
1633 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1634 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1635 os.remove(path)
1636
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001637 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001638 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001639 'dir4', 'dir5', 'dir6')
1640 # If the tests failed, the bottom-most directory ('../dir6')
1641 # may not have been created, so we look for the outermost directory
1642 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001643 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001644 path = os.path.dirname(path)
1645
1646 os.removedirs(path)
1647
Andrew Svetlov405faed2012-12-25 12:18:09 +02001648
R David Murrayf2ad1732014-12-25 18:36:56 -05001649@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1650class ChownFileTests(unittest.TestCase):
1651
Berker Peksag036a71b2015-07-21 09:29:48 +03001652 @classmethod
1653 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001654 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001655
1656 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001657 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001658 uid = stat.st_uid
1659 gid = stat.st_gid
1660 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001661 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1662 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1663 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1664 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001665
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001666 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1667 def test_chown_gid(self):
1668 groups = os.getgroups()
1669 if len(groups) < 2:
1670 self.skipTest("test needs at least 2 groups")
1671
R David Murrayf2ad1732014-12-25 18:36:56 -05001672 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001673 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001674
Hai Shi0c4f0f32020-06-30 21:46:31 +08001675 os.chown(os_helper.TESTFN, uid, gid_1)
1676 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001677 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001678
Hai Shi0c4f0f32020-06-30 21:46:31 +08001679 os.chown(os_helper.TESTFN, uid, gid_2)
1680 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001681 self.assertEqual(gid, gid_2)
1682
1683 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1684 "test needs root privilege and more than one user")
1685 def test_chown_with_root(self):
1686 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001687 gid = os.stat(os_helper.TESTFN).st_gid
1688 os.chown(os_helper.TESTFN, uid_1, gid)
1689 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001690 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001691 os.chown(os_helper.TESTFN, uid_2, gid)
1692 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001693 self.assertEqual(uid, uid_2)
1694
1695 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1696 "test needs non-root account and more than one user")
1697 def test_chown_without_permission(self):
1698 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001699 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001700 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001701 os.chown(os_helper.TESTFN, uid_1, gid)
1702 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001703
Berker Peksag036a71b2015-07-21 09:29:48 +03001704 @classmethod
1705 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001706 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001707
1708
Andrew Svetlov405faed2012-12-25 12:18:09 +02001709class RemoveDirsTests(unittest.TestCase):
1710 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001711 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001712
1713 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001714 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001715
1716 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001717 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001718 os.mkdir(dira)
1719 dirb = os.path.join(dira, 'dirb')
1720 os.mkdir(dirb)
1721 os.removedirs(dirb)
1722 self.assertFalse(os.path.exists(dirb))
1723 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001724 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001725
1726 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001727 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001728 os.mkdir(dira)
1729 dirb = os.path.join(dira, 'dirb')
1730 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001731 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001732 os.removedirs(dirb)
1733 self.assertFalse(os.path.exists(dirb))
1734 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001735 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001736
1737 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001738 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001739 os.mkdir(dira)
1740 dirb = os.path.join(dira, 'dirb')
1741 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001742 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001743 with self.assertRaises(OSError):
1744 os.removedirs(dirb)
1745 self.assertTrue(os.path.exists(dirb))
1746 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001747 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001748
1749
Guido van Rossume7ba4952007-06-06 23:52:48 +00001750class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001751 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001752 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001753 f.write(b'hello')
1754 f.close()
1755 with open(os.devnull, 'rb') as f:
1756 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001757
Andrew Svetlov405faed2012-12-25 12:18:09 +02001758
Guido van Rossume7ba4952007-06-06 23:52:48 +00001759class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001760 def test_urandom_length(self):
1761 self.assertEqual(len(os.urandom(0)), 0)
1762 self.assertEqual(len(os.urandom(1)), 1)
1763 self.assertEqual(len(os.urandom(10)), 10)
1764 self.assertEqual(len(os.urandom(100)), 100)
1765 self.assertEqual(len(os.urandom(1000)), 1000)
1766
1767 def test_urandom_value(self):
1768 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001769 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001770 data2 = os.urandom(16)
1771 self.assertNotEqual(data1, data2)
1772
1773 def get_urandom_subprocess(self, count):
1774 code = '\n'.join((
1775 'import os, sys',
1776 'data = os.urandom(%s)' % count,
1777 'sys.stdout.buffer.write(data)',
1778 'sys.stdout.buffer.flush()'))
1779 out = assert_python_ok('-c', code)
1780 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001781 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001782 return stdout
1783
1784 def test_urandom_subprocess(self):
1785 data1 = self.get_urandom_subprocess(16)
1786 data2 = self.get_urandom_subprocess(16)
1787 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001788
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001789
Victor Stinner9b1f4742016-09-06 16:18:52 -07001790@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1791class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001792 @classmethod
1793 def setUpClass(cls):
1794 try:
1795 os.getrandom(1)
1796 except OSError as exc:
1797 if exc.errno == errno.ENOSYS:
1798 # Python compiled on a more recent Linux version
1799 # than the current Linux kernel
1800 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1801 else:
1802 raise
1803
Victor Stinner9b1f4742016-09-06 16:18:52 -07001804 def test_getrandom_type(self):
1805 data = os.getrandom(16)
1806 self.assertIsInstance(data, bytes)
1807 self.assertEqual(len(data), 16)
1808
1809 def test_getrandom0(self):
1810 empty = os.getrandom(0)
1811 self.assertEqual(empty, b'')
1812
1813 def test_getrandom_random(self):
1814 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1815
1816 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1817 # resource /dev/random
1818
1819 def test_getrandom_nonblock(self):
1820 # The call must not fail. Check also that the flag exists
1821 try:
1822 os.getrandom(1, os.GRND_NONBLOCK)
1823 except BlockingIOError:
1824 # System urandom is not initialized yet
1825 pass
1826
1827 def test_getrandom_value(self):
1828 data1 = os.getrandom(16)
1829 data2 = os.getrandom(16)
1830 self.assertNotEqual(data1, data2)
1831
1832
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001833# os.urandom() doesn't use a file descriptor when it is implemented with the
1834# getentropy() function, the getrandom() function or the getrandom() syscall
1835OS_URANDOM_DONT_USE_FD = (
1836 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1837 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1838 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001839
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001840@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1841 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001842@unittest.skipIf(sys.platform == "vxworks",
1843 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001844class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001845 @unittest.skipUnless(resource, "test requires the resource module")
1846 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001847 # Check urandom() failing when it is not able to open /dev/random.
1848 # We spawn a new process to make the test more robust (if getrlimit()
1849 # failed to restore the file descriptor limit after this, the whole
1850 # test suite would crash; this actually happened on the OS X Tiger
1851 # buildbot).
1852 code = """if 1:
1853 import errno
1854 import os
1855 import resource
1856
1857 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1858 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1859 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001860 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001861 except OSError as e:
1862 assert e.errno == errno.EMFILE, e.errno
1863 else:
1864 raise AssertionError("OSError not raised")
1865 """
1866 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001867
Antoine Pitroue472aea2014-04-26 14:33:03 +02001868 def test_urandom_fd_closed(self):
1869 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1870 # closed.
1871 code = """if 1:
1872 import os
1873 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001874 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001875 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001876 with test.support.SuppressCrashReport():
1877 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001878 sys.stdout.buffer.write(os.urandom(4))
1879 """
1880 rc, out, err = assert_python_ok('-Sc', code)
1881
1882 def test_urandom_fd_reopened(self):
1883 # Issue #21207: urandom() should detect its fd to /dev/urandom
1884 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001885 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1886 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001887
Antoine Pitroue472aea2014-04-26 14:33:03 +02001888 code = """if 1:
1889 import os
1890 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001891 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001892 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001893 with test.support.SuppressCrashReport():
1894 for fd in range(3, 256):
1895 try:
1896 os.close(fd)
1897 except OSError:
1898 pass
1899 else:
1900 # Found the urandom fd (XXX hopefully)
1901 break
1902 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001903 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001904 new_fd = f.fileno()
1905 # Issue #26935: posix allows new_fd and fd to be equal but
1906 # some libc implementations have dup2 return an error in this
1907 # case.
1908 if new_fd != fd:
1909 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001910 sys.stdout.buffer.write(os.urandom(4))
1911 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001912 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001913 rc, out, err = assert_python_ok('-Sc', code)
1914 self.assertEqual(len(out), 8)
1915 self.assertNotEqual(out[0:4], out[4:8])
1916 rc, out2, err2 = assert_python_ok('-Sc', code)
1917 self.assertEqual(len(out2), 8)
1918 self.assertNotEqual(out2, out)
1919
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001920
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001921@contextlib.contextmanager
1922def _execvpe_mockup(defpath=None):
1923 """
1924 Stubs out execv and execve functions when used as context manager.
1925 Records exec calls. The mock execv and execve functions always raise an
1926 exception as they would normally never return.
1927 """
1928 # A list of tuples containing (function name, first arg, args)
1929 # of calls to execv or execve that have been made.
1930 calls = []
1931
1932 def mock_execv(name, *args):
1933 calls.append(('execv', name, args))
1934 raise RuntimeError("execv called")
1935
1936 def mock_execve(name, *args):
1937 calls.append(('execve', name, args))
1938 raise OSError(errno.ENOTDIR, "execve called")
1939
1940 try:
1941 orig_execv = os.execv
1942 orig_execve = os.execve
1943 orig_defpath = os.defpath
1944 os.execv = mock_execv
1945 os.execve = mock_execve
1946 if defpath is not None:
1947 os.defpath = defpath
1948 yield calls
1949 finally:
1950 os.execv = orig_execv
1951 os.execve = orig_execve
1952 os.defpath = orig_defpath
1953
pxinwrf2d7ac72019-05-21 18:46:37 +08001954@unittest.skipUnless(hasattr(os, 'execv'),
1955 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001956class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001957 @unittest.skipIf(USING_LINUXTHREADS,
1958 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001959 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001960 self.assertRaises(OSError, os.execvpe, 'no such app-',
1961 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001962
Steve Dowerbce26262016-11-19 19:17:26 -08001963 def test_execv_with_bad_arglist(self):
1964 self.assertRaises(ValueError, os.execv, 'notepad', ())
1965 self.assertRaises(ValueError, os.execv, 'notepad', [])
1966 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1967 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1968
Thomas Heller6790d602007-08-30 17:15:14 +00001969 def test_execvpe_with_bad_arglist(self):
1970 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001971 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1972 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001973
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001974 @unittest.skipUnless(hasattr(os, '_execvpe'),
1975 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001976 def _test_internal_execvpe(self, test_type):
1977 program_path = os.sep + 'absolutepath'
1978 if test_type is bytes:
1979 program = b'executable'
1980 fullpath = os.path.join(os.fsencode(program_path), program)
1981 native_fullpath = fullpath
1982 arguments = [b'progname', 'arg1', 'arg2']
1983 else:
1984 program = 'executable'
1985 arguments = ['progname', 'arg1', 'arg2']
1986 fullpath = os.path.join(program_path, program)
1987 if os.name != "nt":
1988 native_fullpath = os.fsencode(fullpath)
1989 else:
1990 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001991 env = {'spam': 'beans'}
1992
Victor Stinnerb745a742010-05-18 17:17:23 +00001993 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001994 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001995 self.assertRaises(RuntimeError,
1996 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001997 self.assertEqual(len(calls), 1)
1998 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1999
Victor Stinnerb745a742010-05-18 17:17:23 +00002000 # test os._execvpe() with a relative path:
2001 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002002 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00002003 self.assertRaises(OSError,
2004 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002005 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00002006 self.assertSequenceEqual(calls[0],
2007 ('execve', native_fullpath, (arguments, env)))
2008
2009 # test os._execvpe() with a relative path:
2010 # os.get_exec_path() reads the 'PATH' variable
2011 with _execvpe_mockup() as calls:
2012 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00002013 if test_type is bytes:
2014 env_path[b'PATH'] = program_path
2015 else:
2016 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00002017 self.assertRaises(OSError,
2018 os._execvpe, program, arguments, env=env_path)
2019 self.assertEqual(len(calls), 1)
2020 self.assertSequenceEqual(calls[0],
2021 ('execve', native_fullpath, (arguments, env_path)))
2022
2023 def test_internal_execvpe_str(self):
2024 self._test_internal_execvpe(str)
2025 if os.name != "nt":
2026 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002027
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002028 def test_execve_invalid_env(self):
2029 args = [sys.executable, '-c', 'pass']
2030
Ville Skyttä49b27342017-08-03 09:00:59 +03002031 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002032 newenv = os.environ.copy()
2033 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2034 with self.assertRaises(ValueError):
2035 os.execve(args[0], args, newenv)
2036
Ville Skyttä49b27342017-08-03 09:00:59 +03002037 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002038 newenv = os.environ.copy()
2039 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2040 with self.assertRaises(ValueError):
2041 os.execve(args[0], args, newenv)
2042
Ville Skyttä49b27342017-08-03 09:00:59 +03002043 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002044 newenv = os.environ.copy()
2045 newenv["FRUIT=ORANGE"] = "lemon"
2046 with self.assertRaises(ValueError):
2047 os.execve(args[0], args, newenv)
2048
Alexey Izbyshev83460312018-10-20 03:28:22 +03002049 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2050 def test_execve_with_empty_path(self):
2051 # bpo-32890: Check GetLastError() misuse
2052 try:
2053 os.execve('', ['arg'], {})
2054 except OSError as e:
2055 self.assertTrue(e.winerror is None or e.winerror != 0)
2056 else:
2057 self.fail('No OSError raised')
2058
Gregory P. Smith4ae37772010-05-08 18:05:46 +00002059
Serhiy Storchaka43767632013-11-03 21:31:38 +02002060@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002061class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01002062 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01002063 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002064 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01002065 except FileNotFoundError:
2066 exists = False
2067 except OSError as exc:
2068 exists = True
2069 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08002070 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01002071 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002072 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01002073
Thomas Wouters477c8d52006-05-27 19:21:47 +00002074 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002075 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002076
2077 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002078 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002079
2080 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002081 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002082
2083 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002084 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002085
Hai Shi0c4f0f32020-06-30 21:46:31 +08002086 with open(os_helper.TESTFN, "x") as f:
2087 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002088
2089 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002090 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002091
Thomas Wouters477c8d52006-05-27 19:21:47 +00002092 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002093 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002094
Victor Stinnere77c9742016-03-25 10:28:23 +01002095
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002096class TestInvalidFD(unittest.TestCase):
Inada Naokia6925652021-04-29 11:35:36 +09002097 singles = ["fchdir", "dup", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002098 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2099 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07002100 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002101 def get_single(f):
2102 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00002103 if hasattr(os, f):
2104 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002105 return helper
2106 for f in singles:
2107 locals()["test_"+f] = get_single(f)
2108
Inada Naokia6925652021-04-29 11:35:36 +09002109 def check(self, f, *args, **kwargs):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002110 try:
Inada Naokia6925652021-04-29 11:35:36 +09002111 f(os_helper.make_bad_fd(), *args, **kwargs)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002112 except OSError as e:
2113 self.assertEqual(e.errno, errno.EBADF)
2114 else:
Martin Panter7462b6492015-11-02 03:37:02 +00002115 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002116 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00002117
Inada Naokia6925652021-04-29 11:35:36 +09002118 def test_fdopen(self):
2119 self.check(os.fdopen, encoding="utf-8")
2120
Serhiy Storchaka43767632013-11-03 21:31:38 +02002121 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002122 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002123 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002124
Serhiy Storchaka43767632013-11-03 21:31:38 +02002125 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002126 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002127 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002128 # Make sure none of the descriptors we are about to close are
2129 # currently valid (issue 6542).
2130 for i in range(10):
2131 try: os.fstat(fd+i)
2132 except OSError:
2133 pass
2134 else:
2135 break
2136 if i < 2:
2137 raise unittest.SkipTest(
2138 "Unable to acquire a range of invalid file descriptors")
2139 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002140
Serhiy Storchaka43767632013-11-03 21:31:38 +02002141 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002142 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002143 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002144
Serhiy Storchaka43767632013-11-03 21:31:38 +02002145 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002146 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002147 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002148
Serhiy Storchaka43767632013-11-03 21:31:38 +02002149 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002150 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002151 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002152
Serhiy Storchaka43767632013-11-03 21:31:38 +02002153 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002154 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002155 self.check(os.pathconf, "PC_NAME_MAX")
2156 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002157
Serhiy Storchaka43767632013-11-03 21:31:38 +02002158 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002159 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002160 self.check(os.truncate, 0)
2161 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002162
Serhiy Storchaka43767632013-11-03 21:31:38 +02002163 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002164 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002165 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002166
Serhiy Storchaka43767632013-11-03 21:31:38 +02002167 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002168 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002169 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002170
Victor Stinner57ddf782014-01-08 15:21:28 +01002171 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2172 def test_readv(self):
2173 buf = bytearray(10)
2174 self.check(os.readv, [buf])
2175
Serhiy Storchaka43767632013-11-03 21:31:38 +02002176 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002177 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002178 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002179
Serhiy Storchaka43767632013-11-03 21:31:38 +02002180 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002181 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002182 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002183
Victor Stinner57ddf782014-01-08 15:21:28 +01002184 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2185 def test_writev(self):
2186 self.check(os.writev, [b'abc'])
2187
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002188 def test_inheritable(self):
2189 self.check(os.get_inheritable)
2190 self.check(os.set_inheritable, True)
2191
2192 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2193 'needs os.get_blocking() and os.set_blocking()')
2194 def test_blocking(self):
2195 self.check(os.get_blocking)
2196 self.check(os.set_blocking, True)
2197
Brian Curtin1b9df392010-11-24 20:24:31 +00002198
2199class LinkTests(unittest.TestCase):
2200 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002201 self.file1 = os_helper.TESTFN
2202 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002203
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002204 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002205 for file in (self.file1, self.file2):
2206 if os.path.exists(file):
2207 os.unlink(file)
2208
Brian Curtin1b9df392010-11-24 20:24:31 +00002209 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002210 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002211
xdegaye6a55d092017-11-12 17:57:04 +01002212 try:
2213 os.link(file1, file2)
2214 except PermissionError as e:
2215 self.skipTest('os.link(): %s' % e)
Inada Naokia6925652021-04-29 11:35:36 +09002216 with open(file1, "rb") as f1, open(file2, "rb") as f2:
Brian Curtin1b9df392010-11-24 20:24:31 +00002217 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2218
2219 def test_link(self):
2220 self._test_link(self.file1, self.file2)
2221
2222 def test_link_bytes(self):
2223 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2224 bytes(self.file2, sys.getfilesystemencoding()))
2225
Brian Curtinf498b752010-11-30 15:54:04 +00002226 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002227 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002228 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002229 except UnicodeError:
2230 raise unittest.SkipTest("Unable to encode for this platform.")
2231
Brian Curtinf498b752010-11-30 15:54:04 +00002232 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002233 self.file2 = self.file1 + "2"
2234 self._test_link(self.file1, self.file2)
2235
Serhiy Storchaka43767632013-11-03 21:31:38 +02002236@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2237class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002238 # uid_t and gid_t are 32-bit unsigned integers on Linux
2239 UID_OVERFLOW = (1 << 32)
2240 GID_OVERFLOW = (1 << 32)
2241
Serhiy Storchaka43767632013-11-03 21:31:38 +02002242 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2243 def test_setuid(self):
2244 if os.getuid() != 0:
2245 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002246 self.assertRaises(TypeError, os.setuid, 'not an int')
2247 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002248
Serhiy Storchaka43767632013-11-03 21:31:38 +02002249 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2250 def test_setgid(self):
2251 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2252 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002253 self.assertRaises(TypeError, os.setgid, 'not an int')
2254 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002255
Serhiy Storchaka43767632013-11-03 21:31:38 +02002256 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2257 def test_seteuid(self):
2258 if os.getuid() != 0:
2259 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002260 self.assertRaises(TypeError, os.setegid, 'not an int')
2261 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002262
Serhiy Storchaka43767632013-11-03 21:31:38 +02002263 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2264 def test_setegid(self):
2265 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2266 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002267 self.assertRaises(TypeError, os.setegid, 'not an int')
2268 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002269
Serhiy Storchaka43767632013-11-03 21:31:38 +02002270 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2271 def test_setreuid(self):
2272 if os.getuid() != 0:
2273 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002274 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2275 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2276 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2277 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002278
Serhiy Storchaka43767632013-11-03 21:31:38 +02002279 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2280 def test_setreuid_neg1(self):
2281 # Needs to accept -1. We run this in a subprocess to avoid
2282 # altering the test runner's process state (issue8045).
2283 subprocess.check_call([
2284 sys.executable, '-c',
2285 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002286
Serhiy Storchaka43767632013-11-03 21:31:38 +02002287 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2288 def test_setregid(self):
2289 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2290 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002291 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2292 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2293 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2294 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002295
Serhiy Storchaka43767632013-11-03 21:31:38 +02002296 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2297 def test_setregid_neg1(self):
2298 # Needs to accept -1. We run this in a subprocess to avoid
2299 # altering the test runner's process state (issue8045).
2300 subprocess.check_call([
2301 sys.executable, '-c',
2302 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002303
Serhiy Storchaka43767632013-11-03 21:31:38 +02002304@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2305class Pep383Tests(unittest.TestCase):
2306 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002307 if os_helper.TESTFN_UNENCODABLE:
2308 self.dir = os_helper.TESTFN_UNENCODABLE
2309 elif os_helper.TESTFN_NONASCII:
2310 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002311 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002312 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002313 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002314
Serhiy Storchaka43767632013-11-03 21:31:38 +02002315 bytesfn = []
2316 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002317 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002318 fn = os.fsencode(fn)
2319 except UnicodeEncodeError:
2320 return
2321 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002322 add_filename(os_helper.TESTFN_UNICODE)
2323 if os_helper.TESTFN_UNENCODABLE:
2324 add_filename(os_helper.TESTFN_UNENCODABLE)
2325 if os_helper.TESTFN_NONASCII:
2326 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002327 if not bytesfn:
2328 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002329
Serhiy Storchaka43767632013-11-03 21:31:38 +02002330 self.unicodefn = set()
2331 os.mkdir(self.dir)
2332 try:
2333 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002334 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002335 fn = os.fsdecode(fn)
2336 if fn in self.unicodefn:
2337 raise ValueError("duplicate filename")
2338 self.unicodefn.add(fn)
2339 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002340 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002341 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002342
Serhiy Storchaka43767632013-11-03 21:31:38 +02002343 def tearDown(self):
2344 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002345
Serhiy Storchaka43767632013-11-03 21:31:38 +02002346 def test_listdir(self):
2347 expected = self.unicodefn
2348 found = set(os.listdir(self.dir))
2349 self.assertEqual(found, expected)
2350 # test listdir without arguments
2351 current_directory = os.getcwd()
2352 try:
2353 os.chdir(os.sep)
2354 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2355 finally:
2356 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002357
Serhiy Storchaka43767632013-11-03 21:31:38 +02002358 def test_open(self):
2359 for fn in self.unicodefn:
2360 f = open(os.path.join(self.dir, fn), 'rb')
2361 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002362
Serhiy Storchaka43767632013-11-03 21:31:38 +02002363 @unittest.skipUnless(hasattr(os, 'statvfs'),
2364 "need os.statvfs()")
2365 def test_statvfs(self):
2366 # issue #9645
2367 for fn in self.unicodefn:
2368 # should not fail with file not found error
2369 fullname = os.path.join(self.dir, fn)
2370 os.statvfs(fullname)
2371
2372 def test_stat(self):
2373 for fn in self.unicodefn:
2374 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002375
Brian Curtineb24d742010-04-12 17:16:38 +00002376@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2377class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002378 def _kill(self, sig):
2379 # Start sys.executable as a subprocess and communicate from the
2380 # subprocess to the parent that the interpreter is ready. When it
2381 # becomes ready, send *sig* via os.kill to the subprocess and check
2382 # that the return code is equal to *sig*.
2383 import ctypes
2384 from ctypes import wintypes
2385 import msvcrt
2386
2387 # Since we can't access the contents of the process' stdout until the
2388 # process has exited, use PeekNamedPipe to see what's inside stdout
2389 # without waiting. This is done so we can tell that the interpreter
2390 # is started and running at a point where it could handle a signal.
2391 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2392 PeekNamedPipe.restype = wintypes.BOOL
2393 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2394 ctypes.POINTER(ctypes.c_char), # stdout buf
2395 wintypes.DWORD, # Buffer size
2396 ctypes.POINTER(wintypes.DWORD), # bytes read
2397 ctypes.POINTER(wintypes.DWORD), # bytes avail
2398 ctypes.POINTER(wintypes.DWORD)) # bytes left
2399 msg = "running"
2400 proc = subprocess.Popen([sys.executable, "-c",
2401 "import sys;"
2402 "sys.stdout.write('{}');"
2403 "sys.stdout.flush();"
2404 "input()".format(msg)],
2405 stdout=subprocess.PIPE,
2406 stderr=subprocess.PIPE,
2407 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002408 self.addCleanup(proc.stdout.close)
2409 self.addCleanup(proc.stderr.close)
2410 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002411
2412 count, max = 0, 100
2413 while count < max and proc.poll() is None:
2414 # Create a string buffer to store the result of stdout from the pipe
2415 buf = ctypes.create_string_buffer(len(msg))
2416 # Obtain the text currently in proc.stdout
2417 # Bytes read/avail/left are left as NULL and unused
2418 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2419 buf, ctypes.sizeof(buf), None, None, None)
2420 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2421 if buf.value:
2422 self.assertEqual(msg, buf.value.decode())
2423 break
2424 time.sleep(0.1)
2425 count += 1
2426 else:
2427 self.fail("Did not receive communication from the subprocess")
2428
Brian Curtineb24d742010-04-12 17:16:38 +00002429 os.kill(proc.pid, sig)
2430 self.assertEqual(proc.wait(), sig)
2431
2432 def test_kill_sigterm(self):
2433 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002434 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002435
2436 def test_kill_int(self):
2437 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002438 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002439
2440 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002441 tagname = "test_os_%s" % uuid.uuid1()
2442 m = mmap.mmap(-1, 1, tagname)
2443 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002444 # Run a script which has console control handling enabled.
2445 proc = subprocess.Popen([sys.executable,
2446 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002447 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002448 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2449 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002450 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002451 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002452 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002453 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002454 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002455 count += 1
2456 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002457 # Forcefully kill the process if we weren't able to signal it.
2458 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002459 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002460 os.kill(proc.pid, event)
2461 # proc.send_signal(event) could also be done here.
2462 # Allow time for the signal to be passed and the process to exit.
2463 time.sleep(0.5)
2464 if not proc.poll():
2465 # Forcefully kill the process if we weren't able to signal it.
2466 os.kill(proc.pid, signal.SIGINT)
2467 self.fail("subprocess did not stop on {}".format(name))
2468
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002469 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002470 def test_CTRL_C_EVENT(self):
2471 from ctypes import wintypes
2472 import ctypes
2473
2474 # Make a NULL value by creating a pointer with no argument.
2475 NULL = ctypes.POINTER(ctypes.c_int)()
2476 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2477 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2478 wintypes.BOOL)
2479 SetConsoleCtrlHandler.restype = wintypes.BOOL
2480
2481 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002482 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002483 # by subprocesses.
2484 SetConsoleCtrlHandler(NULL, 0)
2485
2486 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2487
2488 def test_CTRL_BREAK_EVENT(self):
2489 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2490
2491
Brian Curtind40e6f72010-07-08 21:39:08 +00002492@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002493class Win32ListdirTests(unittest.TestCase):
2494 """Test listdir on Windows."""
2495
2496 def setUp(self):
2497 self.created_paths = []
2498 for i in range(2):
2499 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002500 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002501 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002502 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002503 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002504 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002505 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2506 self.created_paths.extend([dir_name, file_name])
2507 self.created_paths.sort()
2508
2509 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002510 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002511
2512 def test_listdir_no_extended_path(self):
2513 """Test when the path is not an "extended" path."""
2514 # unicode
2515 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002516 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002517 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002518
Tim Golden781bbeb2013-10-25 20:24:06 +01002519 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002520 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002521 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002522 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002523
2524 def test_listdir_extended_path(self):
2525 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002526 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002527 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002528 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002529 self.assertEqual(
2530 sorted(os.listdir(path)),
2531 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002532
Tim Golden781bbeb2013-10-25 20:24:06 +01002533 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002534 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002535 self.assertEqual(
2536 sorted(os.listdir(path)),
2537 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002538
2539
Berker Peksage0b5b202018-08-15 13:03:41 +03002540@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2541class ReadlinkTests(unittest.TestCase):
2542 filelink = 'readlinktest'
2543 filelink_target = os.path.abspath(__file__)
2544 filelinkb = os.fsencode(filelink)
2545 filelinkb_target = os.fsencode(filelink_target)
2546
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002547 def assertPathEqual(self, left, right):
2548 left = os.path.normcase(left)
2549 right = os.path.normcase(right)
2550 if sys.platform == 'win32':
2551 # Bad practice to blindly strip the prefix as it may be required to
2552 # correctly refer to the file, but we're only comparing paths here.
2553 has_prefix = lambda p: p.startswith(
2554 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2555 if has_prefix(left):
2556 left = left[4:]
2557 if has_prefix(right):
2558 right = right[4:]
2559 self.assertEqual(left, right)
2560
Berker Peksage0b5b202018-08-15 13:03:41 +03002561 def setUp(self):
2562 self.assertTrue(os.path.exists(self.filelink_target))
2563 self.assertTrue(os.path.exists(self.filelinkb_target))
2564 self.assertFalse(os.path.exists(self.filelink))
2565 self.assertFalse(os.path.exists(self.filelinkb))
2566
2567 def test_not_symlink(self):
2568 filelink_target = FakePath(self.filelink_target)
2569 self.assertRaises(OSError, os.readlink, self.filelink_target)
2570 self.assertRaises(OSError, os.readlink, filelink_target)
2571
2572 def test_missing_link(self):
2573 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2574 self.assertRaises(FileNotFoundError, os.readlink,
2575 FakePath('missing-link'))
2576
Hai Shi0c4f0f32020-06-30 21:46:31 +08002577 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002578 def test_pathlike(self):
2579 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002580 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002581 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002582 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002583
Hai Shi0c4f0f32020-06-30 21:46:31 +08002584 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002585 def test_pathlike_bytes(self):
2586 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002587 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002588 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002589 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002590 self.assertIsInstance(path, bytes)
2591
Hai Shi0c4f0f32020-06-30 21:46:31 +08002592 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002593 def test_bytes(self):
2594 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002595 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002596 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002597 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002598 self.assertIsInstance(path, bytes)
2599
2600
Tim Golden781bbeb2013-10-25 20:24:06 +01002601@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002602@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002603class Win32SymlinkTests(unittest.TestCase):
2604 filelink = 'filelinktest'
2605 filelink_target = os.path.abspath(__file__)
2606 dirlink = 'dirlinktest'
2607 dirlink_target = os.path.dirname(filelink_target)
2608 missing_link = 'missing link'
2609
2610 def setUp(self):
2611 assert os.path.exists(self.dirlink_target)
2612 assert os.path.exists(self.filelink_target)
2613 assert not os.path.exists(self.dirlink)
2614 assert not os.path.exists(self.filelink)
2615 assert not os.path.exists(self.missing_link)
2616
2617 def tearDown(self):
2618 if os.path.exists(self.filelink):
2619 os.remove(self.filelink)
2620 if os.path.exists(self.dirlink):
2621 os.rmdir(self.dirlink)
2622 if os.path.lexists(self.missing_link):
2623 os.remove(self.missing_link)
2624
2625 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002626 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002627 self.assertTrue(os.path.exists(self.dirlink))
2628 self.assertTrue(os.path.isdir(self.dirlink))
2629 self.assertTrue(os.path.islink(self.dirlink))
2630 self.check_stat(self.dirlink, self.dirlink_target)
2631
2632 def test_file_link(self):
2633 os.symlink(self.filelink_target, self.filelink)
2634 self.assertTrue(os.path.exists(self.filelink))
2635 self.assertTrue(os.path.isfile(self.filelink))
2636 self.assertTrue(os.path.islink(self.filelink))
2637 self.check_stat(self.filelink, self.filelink_target)
2638
2639 def _create_missing_dir_link(self):
2640 'Create a "directory" link to a non-existent target'
2641 linkname = self.missing_link
2642 if os.path.lexists(linkname):
2643 os.remove(linkname)
2644 target = r'c:\\target does not exist.29r3c740'
2645 assert not os.path.exists(target)
2646 target_is_dir = True
2647 os.symlink(target, linkname, target_is_dir)
2648
2649 def test_remove_directory_link_to_missing_target(self):
2650 self._create_missing_dir_link()
2651 # For compatibility with Unix, os.remove will check the
2652 # directory status and call RemoveDirectory if the symlink
2653 # was created with target_is_dir==True.
2654 os.remove(self.missing_link)
2655
Brian Curtind40e6f72010-07-08 21:39:08 +00002656 def test_isdir_on_directory_link_to_missing_target(self):
2657 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002658 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002659
Brian Curtind40e6f72010-07-08 21:39:08 +00002660 def test_rmdir_on_directory_link_to_missing_target(self):
2661 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002662 os.rmdir(self.missing_link)
2663
2664 def check_stat(self, link, target):
2665 self.assertEqual(os.stat(link), os.stat(target))
2666 self.assertNotEqual(os.lstat(link), os.stat(link))
2667
Brian Curtind25aef52011-06-13 15:16:04 -05002668 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002669 self.assertEqual(os.stat(bytes_link), os.stat(target))
2670 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002671
2672 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002673 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002674 level2 = os.path.join(level1, "level2")
2675 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002676 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002677
2678 os.mkdir(level1)
2679 os.mkdir(level2)
2680 os.mkdir(level3)
2681
2682 file1 = os.path.abspath(os.path.join(level1, "file1"))
2683 create_file(file1)
2684
2685 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002686 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002687 os.chdir(level2)
2688 link = os.path.join(level2, "link")
2689 os.symlink(os.path.relpath(file1), "link")
2690 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002691
Victor Stinnerae39d232016-03-24 17:12:55 +01002692 # Check os.stat calls from the same dir as the link
2693 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002694
Victor Stinnerae39d232016-03-24 17:12:55 +01002695 # Check os.stat calls from a dir below the link
2696 os.chdir(level1)
2697 self.assertEqual(os.stat(file1),
2698 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002699
Victor Stinnerae39d232016-03-24 17:12:55 +01002700 # Check os.stat calls from a dir above the link
2701 os.chdir(level3)
2702 self.assertEqual(os.stat(file1),
2703 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002704 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002705 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002706
SSE43c34aad2018-02-13 00:10:35 +07002707 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2708 and os.path.exists(r'C:\ProgramData'),
2709 'Test directories not found')
2710 def test_29248(self):
2711 # os.symlink() calls CreateSymbolicLink, which creates
2712 # the reparse data buffer with the print name stored
2713 # first, so the offset is always 0. CreateSymbolicLink
2714 # stores the "PrintName" DOS path (e.g. "C:\") first,
2715 # with an offset of 0, followed by the "SubstituteName"
2716 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2717 # the other hand, seems to have been created manually
2718 # with an inverted order.
2719 target = os.readlink(r'C:\Users\All Users')
2720 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2721
Steve Dower6921e732018-03-05 14:26:08 -08002722 def test_buffer_overflow(self):
2723 # Older versions would have a buffer overflow when detecting
2724 # whether a link source was a directory. This test ensures we
2725 # no longer crash, but does not otherwise validate the behavior
2726 segment = 'X' * 27
2727 path = os.path.join(*[segment] * 10)
2728 test_cases = [
2729 # overflow with absolute src
2730 ('\\' + path, segment),
2731 # overflow dest with relative src
2732 (segment, path),
2733 # overflow when joining src
2734 (path[:180], path[:180]),
2735 ]
2736 for src, dest in test_cases:
2737 try:
2738 os.symlink(src, dest)
2739 except FileNotFoundError:
2740 pass
2741 else:
2742 try:
2743 os.remove(dest)
2744 except OSError:
2745 pass
2746 # Also test with bytes, since that is a separate code path.
2747 try:
2748 os.symlink(os.fsencode(src), os.fsencode(dest))
2749 except FileNotFoundError:
2750 pass
2751 else:
2752 try:
2753 os.remove(dest)
2754 except OSError:
2755 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002756
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002757 def test_appexeclink(self):
2758 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002759 if not os.path.isdir(root):
2760 self.skipTest("test requires a WindowsApps directory")
2761
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002762 aliases = [os.path.join(root, a)
2763 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2764
2765 for alias in aliases:
2766 if support.verbose:
2767 print()
2768 print("Testing with", alias)
2769 st = os.lstat(alias)
2770 self.assertEqual(st, os.stat(alias))
2771 self.assertFalse(stat.S_ISLNK(st.st_mode))
2772 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2773 # testing the first one we see is sufficient
2774 break
2775 else:
2776 self.skipTest("test requires an app execution alias")
2777
Tim Golden0321cf22014-05-05 19:46:17 +01002778@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2779class Win32JunctionTests(unittest.TestCase):
2780 junction = 'junctiontest'
2781 junction_target = os.path.dirname(os.path.abspath(__file__))
2782
2783 def setUp(self):
2784 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002785 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002786
2787 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002788 if os.path.lexists(self.junction):
2789 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002790
2791 def test_create_junction(self):
2792 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002793 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002794 self.assertTrue(os.path.exists(self.junction))
2795 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002796 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2797 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002798
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002799 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002800 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002801 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2802 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002803
2804 def test_unlink_removes_junction(self):
2805 _winapi.CreateJunction(self.junction_target, self.junction)
2806 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002807 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002808
2809 os.unlink(self.junction)
2810 self.assertFalse(os.path.exists(self.junction))
2811
Mark Becwarb82bfac2019-02-02 16:08:23 -05002812@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2813class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002814 def test_getfinalpathname_handles(self):
Hai Shid94af3f2020-08-08 17:32:41 +08002815 nt = import_helper.import_module('nt')
2816 ctypes = import_helper.import_module('ctypes')
Berker Peksag6ef726a2019-04-22 18:46:28 +03002817 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002818
2819 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2820 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2821
2822 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2823 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2824 ctypes.wintypes.LPDWORD)
2825
2826 # This is a pseudo-handle that doesn't need to be closed
2827 hproc = kernel.GetCurrentProcess()
2828
2829 handle_count = ctypes.wintypes.DWORD()
2830 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2831 self.assertEqual(1, ok)
2832
2833 before_count = handle_count.value
2834
2835 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002836 filenames = [
2837 r'\\?\C:',
2838 r'\\?\NUL',
2839 r'\\?\CONIN',
2840 __file__,
2841 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002842
Berker Peksag6ef726a2019-04-22 18:46:28 +03002843 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002844 for name in filenames:
2845 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002846 nt._getfinalpathname(name)
2847 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002848 # Failure is expected
2849 pass
2850 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002851 os.stat(name)
2852 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002853 pass
2854
2855 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2856 self.assertEqual(1, ok)
2857
2858 handle_delta = handle_count.value - before_count
2859
2860 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002861
Hai Shi0c4f0f32020-06-30 21:46:31 +08002862@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002863class NonLocalSymlinkTests(unittest.TestCase):
2864
2865 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002866 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002867 Create this structure:
2868
2869 base
2870 \___ some_dir
2871 """
2872 os.makedirs('base/some_dir')
2873
2874 def tearDown(self):
2875 shutil.rmtree('base')
2876
2877 def test_directory_link_nonlocal(self):
2878 """
2879 The symlink target should resolve relative to the link, not relative
2880 to the current directory.
2881
2882 Then, link base/some_link -> base/some_dir and ensure that some_link
2883 is resolved as a directory.
2884
2885 In issue13772, it was discovered that directory detection failed if
2886 the symlink target was not specified relative to the current
2887 directory, which was a defect in the implementation.
2888 """
2889 src = os.path.join('base', 'some_link')
2890 os.symlink('some_dir', src)
2891 assert os.path.isdir(src)
2892
2893
Victor Stinnere8d51452010-08-19 01:05:19 +00002894class FSEncodingTests(unittest.TestCase):
2895 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002896 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2897 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002898
Victor Stinnere8d51452010-08-19 01:05:19 +00002899 def test_identity(self):
2900 # assert fsdecode(fsencode(x)) == x
2901 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2902 try:
2903 bytesfn = os.fsencode(fn)
2904 except UnicodeEncodeError:
2905 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002906 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002907
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002908
Brett Cannonefb00c02012-02-29 18:31:31 -05002909
2910class DeviceEncodingTests(unittest.TestCase):
2911
2912 def test_bad_fd(self):
2913 # Return None when an fd doesn't actually exist.
2914 self.assertIsNone(os.device_encoding(123456))
2915
Paul Monson62dfd7d2019-04-25 11:36:45 -07002916 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002917 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002918 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002919 def test_device_encoding(self):
2920 encoding = os.device_encoding(0)
2921 self.assertIsNotNone(encoding)
2922 self.assertTrue(codecs.lookup(encoding))
2923
2924
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002925class PidTests(unittest.TestCase):
2926 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2927 def test_getppid(self):
2928 p = subprocess.Popen([sys.executable, '-c',
2929 'import os; print(os.getppid())'],
2930 stdout=subprocess.PIPE)
2931 stdout, _ = p.communicate()
2932 # We are the parent of our subprocess
2933 self.assertEqual(int(stdout), os.getpid())
2934
Victor Stinner9bee32b2020-04-22 16:30:35 +02002935 def check_waitpid(self, code, exitcode, callback=None):
2936 if sys.platform == 'win32':
2937 # On Windows, os.spawnv() simply joins arguments with spaces:
2938 # arguments need to be quoted
2939 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2940 else:
2941 args = [sys.executable, '-c', code]
2942 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002943
Victor Stinner9bee32b2020-04-22 16:30:35 +02002944 if callback is not None:
2945 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002946
Victor Stinner9bee32b2020-04-22 16:30:35 +02002947 # don't use support.wait_process() to test directly os.waitpid()
2948 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002949 pid2, status = os.waitpid(pid, 0)
2950 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2951 self.assertEqual(pid2, pid)
2952
Victor Stinner9bee32b2020-04-22 16:30:35 +02002953 def test_waitpid(self):
2954 self.check_waitpid(code='pass', exitcode=0)
2955
2956 def test_waitstatus_to_exitcode(self):
2957 exitcode = 23
2958 code = f'import sys; sys.exit({exitcode})'
2959 self.check_waitpid(code, exitcode=exitcode)
2960
2961 with self.assertRaises(TypeError):
2962 os.waitstatus_to_exitcode(0.0)
2963
2964 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2965 def test_waitpid_windows(self):
2966 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2967 # with exit code larger than INT_MAX.
2968 STATUS_CONTROL_C_EXIT = 0xC000013A
2969 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2970 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2971
2972 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2973 def test_waitstatus_to_exitcode_windows(self):
2974 max_exitcode = 2 ** 32 - 1
2975 for exitcode in (0, 1, 5, max_exitcode):
2976 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2977 exitcode)
2978
2979 # invalid values
2980 with self.assertRaises(ValueError):
2981 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2982 with self.assertRaises(OverflowError):
2983 os.waitstatus_to_exitcode(-1)
2984
Victor Stinner65a796e2020-04-01 18:49:29 +02002985 # Skip the test on Windows
2986 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2987 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002988 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002989 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002990
Victor Stinner9bee32b2020-04-22 16:30:35 +02002991 def kill_process(pid):
2992 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002993
Victor Stinner9bee32b2020-04-22 16:30:35 +02002994 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002995
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002996
Victor Stinner4659ccf2016-09-14 10:57:00 +02002997class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002998 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002999 self.exitcode = 17
3000
Hai Shi0c4f0f32020-06-30 21:46:31 +08003001 filename = os_helper.TESTFN
3002 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003003
3004 if not with_env:
3005 code = 'import sys; sys.exit(%s)' % self.exitcode
3006 else:
3007 self.env = dict(os.environ)
3008 # create an unique key
3009 self.key = str(uuid.uuid4())
3010 self.env[self.key] = self.key
3011 # read the variable from os.environ to check that it exists
3012 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
3013 % (self.key, self.exitcode))
3014
Inada Naokia6925652021-04-29 11:35:36 +09003015 with open(filename, "w", encoding="utf-8") as fp:
Victor Stinner4659ccf2016-09-14 10:57:00 +02003016 fp.write(code)
3017
Berker Peksag81816462016-09-15 20:19:47 +03003018 args = [sys.executable, filename]
3019 if use_bytes:
3020 args = [os.fsencode(a) for a in args]
3021 self.env = {os.fsencode(k): os.fsencode(v)
3022 for k, v in self.env.items()}
3023
3024 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02003025
Berker Peksag4af23d72016-09-15 20:32:44 +03003026 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003027 def test_spawnl(self):
3028 args = self.create_args()
3029 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
3030 self.assertEqual(exitcode, self.exitcode)
3031
Berker Peksag4af23d72016-09-15 20:32:44 +03003032 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003033 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003034 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003035 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
3036 self.assertEqual(exitcode, self.exitcode)
3037
Berker Peksag4af23d72016-09-15 20:32:44 +03003038 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003039 def test_spawnlp(self):
3040 args = self.create_args()
3041 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
3042 self.assertEqual(exitcode, self.exitcode)
3043
Berker Peksag4af23d72016-09-15 20:32:44 +03003044 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003045 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003046 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003047 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
3048 self.assertEqual(exitcode, self.exitcode)
3049
Berker Peksag4af23d72016-09-15 20:32:44 +03003050 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003051 def test_spawnv(self):
3052 args = self.create_args()
3053 exitcode = os.spawnv(os.P_WAIT, args[0], args)
3054 self.assertEqual(exitcode, self.exitcode)
3055
Victor Stinner9bee32b2020-04-22 16:30:35 +02003056 # Test for PyUnicode_FSConverter()
3057 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
3058 self.assertEqual(exitcode, self.exitcode)
3059
Berker Peksag4af23d72016-09-15 20:32:44 +03003060 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003061 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003062 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003063 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3064 self.assertEqual(exitcode, self.exitcode)
3065
Berker Peksag4af23d72016-09-15 20:32:44 +03003066 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003067 def test_spawnvp(self):
3068 args = self.create_args()
3069 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
3070 self.assertEqual(exitcode, self.exitcode)
3071
Berker Peksag4af23d72016-09-15 20:32:44 +03003072 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003073 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003074 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003075 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
3076 self.assertEqual(exitcode, self.exitcode)
3077
Berker Peksag4af23d72016-09-15 20:32:44 +03003078 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003079 def test_nowait(self):
3080 args = self.create_args()
3081 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02003082 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003083
Berker Peksag4af23d72016-09-15 20:32:44 +03003084 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03003085 def test_spawnve_bytes(self):
3086 # Test bytes handling in parse_arglist and parse_envlist (#28114)
3087 args = self.create_args(with_env=True, use_bytes=True)
3088 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3089 self.assertEqual(exitcode, self.exitcode)
3090
Steve Dower859fd7b2016-11-19 18:53:19 -08003091 @requires_os_func('spawnl')
3092 def test_spawnl_noargs(self):
3093 args = self.create_args()
3094 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08003095 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08003096
3097 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08003098 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003099 args = self.create_args()
3100 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003101 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08003102
3103 @requires_os_func('spawnv')
3104 def test_spawnv_noargs(self):
3105 args = self.create_args()
3106 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
3107 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08003108 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
3109 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08003110
3111 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08003112 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003113 args = self.create_args()
3114 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
3115 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003116 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
3117 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02003118
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003119 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03003120 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003121
Ville Skyttä49b27342017-08-03 09:00:59 +03003122 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003123 newenv = os.environ.copy()
3124 newenv["FRUIT\0VEGETABLE"] = "cabbage"
3125 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003126 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003127 except ValueError:
3128 pass
3129 else:
3130 self.assertEqual(exitcode, 127)
3131
Ville Skyttä49b27342017-08-03 09:00:59 +03003132 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003133 newenv = os.environ.copy()
3134 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3135 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003136 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003137 except ValueError:
3138 pass
3139 else:
3140 self.assertEqual(exitcode, 127)
3141
Ville Skyttä49b27342017-08-03 09:00:59 +03003142 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003143 newenv = os.environ.copy()
3144 newenv["FRUIT=ORANGE"] = "lemon"
3145 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003146 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003147 except ValueError:
3148 pass
3149 else:
3150 self.assertEqual(exitcode, 127)
3151
Ville Skyttä49b27342017-08-03 09:00:59 +03003152 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003153 filename = os_helper.TESTFN
3154 self.addCleanup(os_helper.unlink, filename)
Inada Naokia6925652021-04-29 11:35:36 +09003155 with open(filename, "w", encoding="utf-8") as fp:
Serhiy Storchaka77703942017-06-25 07:33:01 +03003156 fp.write('import sys, os\n'
3157 'if os.getenv("FRUIT") != "orange=lemon":\n'
3158 ' raise AssertionError')
3159 args = [sys.executable, filename]
3160 newenv = os.environ.copy()
3161 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003162 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003163 self.assertEqual(exitcode, 0)
3164
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003165 @requires_os_func('spawnve')
3166 def test_spawnve_invalid_env(self):
3167 self._test_invalid_env(os.spawnve)
3168
3169 @requires_os_func('spawnvpe')
3170 def test_spawnvpe_invalid_env(self):
3171 self._test_invalid_env(os.spawnvpe)
3172
Serhiy Storchaka77703942017-06-25 07:33:01 +03003173
Brian Curtin0151b8e2010-09-24 13:43:43 +00003174# The introduction of this TestCase caused at least two different errors on
3175# *nix buildbots. Temporarily skip this to let the buildbots move along.
3176@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003177@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3178class LoginTests(unittest.TestCase):
3179 def test_getlogin(self):
3180 user_name = os.getlogin()
3181 self.assertNotEqual(len(user_name), 0)
3182
3183
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003184@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3185 "needs os.getpriority and os.setpriority")
3186class ProgramPriorityTests(unittest.TestCase):
3187 """Tests for os.getpriority() and os.setpriority()."""
3188
3189 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003190
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003191 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3192 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3193 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003194 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3195 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003196 raise unittest.SkipTest("unable to reliably test setpriority "
3197 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003198 else:
3199 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003200 finally:
3201 try:
3202 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3203 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003204 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003205 raise
3206
3207
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003208class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003209
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003210 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003211
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003212 def __init__(self, conn):
3213 asynchat.async_chat.__init__(self, conn)
3214 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003215 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 self.closed = False
3217 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003218
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 def handle_read(self):
3220 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003221 if self.accumulate:
3222 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003223
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003224 def get_data(self):
3225 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003226
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003227 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003228 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003229 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003230
3231 def handle_error(self):
3232 raise
3233
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003234 def __init__(self, address):
3235 threading.Thread.__init__(self)
3236 asyncore.dispatcher.__init__(self)
3237 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3238 self.bind(address)
3239 self.listen(5)
3240 self.host, self.port = self.socket.getsockname()[:2]
3241 self.handler_instance = None
3242 self._active = False
3243 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003244
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003245 # --- public API
3246
3247 @property
3248 def running(self):
3249 return self._active
3250
3251 def start(self):
3252 assert not self.running
3253 self.__flag = threading.Event()
3254 threading.Thread.start(self)
3255 self.__flag.wait()
3256
3257 def stop(self):
3258 assert self.running
3259 self._active = False
3260 self.join()
3261
3262 def wait(self):
3263 # wait for handler connection to be closed, then stop the server
3264 while not getattr(self.handler_instance, "closed", False):
3265 time.sleep(0.001)
3266 self.stop()
3267
3268 # --- internals
3269
3270 def run(self):
3271 self._active = True
3272 self.__flag.set()
3273 while self._active and asyncore.socket_map:
3274 self._active_lock.acquire()
3275 asyncore.loop(timeout=0.001, count=1)
3276 self._active_lock.release()
3277 asyncore.close_all()
3278
3279 def handle_accept(self):
3280 conn, addr = self.accept()
3281 self.handler_instance = self.Handler(conn)
3282
3283 def handle_connect(self):
3284 self.close()
3285 handle_read = handle_connect
3286
3287 def writable(self):
3288 return 0
3289
3290 def handle_error(self):
3291 raise
3292
3293
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003294@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3295class TestSendfile(unittest.TestCase):
3296
Victor Stinner8c663fd2017-11-08 14:44:44 -08003297 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003298 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003299 not sys.platform.startswith("solaris") and \
3300 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003301 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3302 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003303 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3304 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003305
3306 @classmethod
3307 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003308 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003309 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003310
3311 @classmethod
3312 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003313 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003314 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003315
3316 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003317 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003318 self.server.start()
3319 self.client = socket.socket()
3320 self.client.connect((self.server.host, self.server.port))
3321 self.client.settimeout(1)
3322 # synchronize by waiting for "220 ready" response
3323 self.client.recv(1024)
3324 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003325 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003326 self.fileno = self.file.fileno()
3327
3328 def tearDown(self):
3329 self.file.close()
3330 self.client.close()
3331 if self.server.running:
3332 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003333 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003334
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003335 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003336 """A higher level wrapper representing how an application is
3337 supposed to use sendfile().
3338 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003339 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003340 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003341 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003342 except OSError as err:
3343 if err.errno == errno.ECONNRESET:
3344 # disconnected
3345 raise
3346 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3347 # we have to retry send data
3348 continue
3349 else:
3350 raise
3351
3352 def test_send_whole_file(self):
3353 # normal send
3354 total_sent = 0
3355 offset = 0
3356 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003357 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003358 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3359 if sent == 0:
3360 break
3361 offset += sent
3362 total_sent += sent
3363 self.assertTrue(sent <= nbytes)
3364 self.assertEqual(offset, total_sent)
3365
3366 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003367 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003368 self.client.close()
3369 self.server.wait()
3370 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003371 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003372 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003373
3374 def test_send_at_certain_offset(self):
3375 # start sending a file at a certain offset
3376 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003377 offset = len(self.DATA) // 2
3378 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003379 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003380 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003381 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3382 if sent == 0:
3383 break
3384 offset += sent
3385 total_sent += sent
3386 self.assertTrue(sent <= nbytes)
3387
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003388 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003389 self.client.close()
3390 self.server.wait()
3391 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003392 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003393 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003394 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003395 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003396
3397 def test_offset_overflow(self):
3398 # specify an offset > file size
3399 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003400 try:
3401 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3402 except OSError as e:
3403 # Solaris can raise EINVAL if offset >= file length, ignore.
3404 if e.errno != errno.EINVAL:
3405 raise
3406 else:
3407 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003408 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003409 self.client.close()
3410 self.server.wait()
3411 data = self.server.handler_instance.get_data()
3412 self.assertEqual(data, b'')
3413
3414 def test_invalid_offset(self):
3415 with self.assertRaises(OSError) as cm:
3416 os.sendfile(self.sockno, self.fileno, -1, 4096)
3417 self.assertEqual(cm.exception.errno, errno.EINVAL)
3418
Martin Panterbf19d162015-09-09 01:01:13 +00003419 def test_keywords(self):
3420 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003421 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3422 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003423 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003424 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3425 offset=0, count=4096,
3426 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003427
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003428 # --- headers / trailers tests
3429
Serhiy Storchaka43767632013-11-03 21:31:38 +02003430 @requires_headers_trailers
3431 def test_headers(self):
3432 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003433 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003434 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003435 headers=[b"x" * 512, b"y" * 256])
3436 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003437 total_sent += sent
3438 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003439 while total_sent < len(expected_data):
3440 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003441 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3442 offset, nbytes)
3443 if sent == 0:
3444 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003445 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003446 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003447 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003448
Serhiy Storchaka43767632013-11-03 21:31:38 +02003449 self.assertEqual(total_sent, len(expected_data))
3450 self.client.close()
3451 self.server.wait()
3452 data = self.server.handler_instance.get_data()
3453 self.assertEqual(hash(data), hash(expected_data))
3454
3455 @requires_headers_trailers
3456 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003457 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003458 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003459
Hai Shi0c4f0f32020-06-30 21:46:31 +08003460 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003461 create_file(TESTFN2, file_data)
3462
3463 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003464 os.sendfile(self.sockno, f.fileno(), 0, 5,
3465 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003466 self.client.close()
3467 self.server.wait()
3468 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003469 self.assertEqual(data, b"abcde123456789")
3470
3471 @requires_headers_trailers
3472 @requires_32b
3473 def test_headers_overflow_32bits(self):
3474 self.server.handler_instance.accumulate = False
3475 with self.assertRaises(OSError) as cm:
3476 os.sendfile(self.sockno, self.fileno, 0, 0,
3477 headers=[b"x" * 2**16] * 2**15)
3478 self.assertEqual(cm.exception.errno, errno.EINVAL)
3479
3480 @requires_headers_trailers
3481 @requires_32b
3482 def test_trailers_overflow_32bits(self):
3483 self.server.handler_instance.accumulate = False
3484 with self.assertRaises(OSError) as cm:
3485 os.sendfile(self.sockno, self.fileno, 0, 0,
3486 trailers=[b"x" * 2**16] * 2**15)
3487 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003488
Serhiy Storchaka43767632013-11-03 21:31:38 +02003489 @requires_headers_trailers
3490 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3491 'test needs os.SF_NODISKIO')
3492 def test_flags(self):
3493 try:
3494 os.sendfile(self.sockno, self.fileno, 0, 4096,
3495 flags=os.SF_NODISKIO)
3496 except OSError as err:
3497 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3498 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003499
3500
Larry Hastings9cf065c2012-06-22 16:30:09 -07003501def supports_extended_attributes():
3502 if not hasattr(os, "setxattr"):
3503 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003504
Larry Hastings9cf065c2012-06-22 16:30:09 -07003505 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003506 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003507 try:
3508 os.setxattr(fp.fileno(), b"user.test", b"")
3509 except OSError:
3510 return False
3511 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003512 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003513
3514 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003515
3516
3517@unittest.skipUnless(supports_extended_attributes(),
3518 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003519# Kernels < 2.6.39 don't respect setxattr flags.
3520@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003521class ExtendedAttributeTests(unittest.TestCase):
3522
Larry Hastings9cf065c2012-06-22 16:30:09 -07003523 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003524 fn = os_helper.TESTFN
3525 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003526 create_file(fn)
3527
Benjamin Peterson799bd802011-08-31 22:15:17 -04003528 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003529 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003530 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003531
Victor Stinnerf12e5062011-10-16 22:12:03 +02003532 init_xattr = listxattr(fn)
3533 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003534
Larry Hastings9cf065c2012-06-22 16:30:09 -07003535 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003536 xattr = set(init_xattr)
3537 xattr.add("user.test")
3538 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003539 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3540 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3541 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003542
Benjamin Peterson799bd802011-08-31 22:15:17 -04003543 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003544 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003545 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003546
Benjamin Peterson799bd802011-08-31 22:15:17 -04003547 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003548 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003549 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003550
Larry Hastings9cf065c2012-06-22 16:30:09 -07003551 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003552 xattr.add("user.test2")
3553 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003554 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003555
Benjamin Peterson799bd802011-08-31 22:15:17 -04003556 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003557 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003558 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003559
Victor Stinnerf12e5062011-10-16 22:12:03 +02003560 xattr.remove("user.test")
3561 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003562 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3563 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3564 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3565 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003566 many = sorted("user.test{}".format(i) for i in range(100))
3567 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003568 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003569 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003570
Larry Hastings9cf065c2012-06-22 16:30:09 -07003571 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003572 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003573 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003574
3575 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003576 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003577
3578 def test_simple(self):
3579 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3580 os.listxattr)
3581
3582 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003583 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3584 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003585
3586 def test_fds(self):
3587 def getxattr(path, *args):
3588 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003589 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003590 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003591 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003592 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003593 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003594 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003595 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003596 def listxattr(path, *args):
3597 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003598 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003599 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3600
3601
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003602@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3603class TermsizeTests(unittest.TestCase):
3604 def test_does_not_crash(self):
3605 """Check if get_terminal_size() returns a meaningful value.
3606
3607 There's no easy portable way to actually check the size of the
3608 terminal, so let's check if it returns something sensible instead.
3609 """
3610 try:
3611 size = os.get_terminal_size()
3612 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003613 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003614 # Under win32 a generic OSError can be thrown if the
3615 # handle cannot be retrieved
3616 self.skipTest("failed to query terminal size")
3617 raise
3618
Antoine Pitroucfade362012-02-08 23:48:59 +01003619 self.assertGreaterEqual(size.columns, 0)
3620 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003621
3622 def test_stty_match(self):
3623 """Check if stty returns the same results
3624
3625 stty actually tests stdin, so get_terminal_size is invoked on
3626 stdin explicitly. If stty succeeded, then get_terminal_size()
3627 should work too.
3628 """
3629 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003630 size = (
3631 subprocess.check_output(
3632 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3633 ).split()
3634 )
xdegaye6a55d092017-11-12 17:57:04 +01003635 except (FileNotFoundError, subprocess.CalledProcessError,
3636 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003637 self.skipTest("stty invocation failed")
3638 expected = (int(size[1]), int(size[0])) # reversed order
3639
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003640 try:
3641 actual = os.get_terminal_size(sys.__stdin__.fileno())
3642 except OSError as e:
3643 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3644 # Under win32 a generic OSError can be thrown if the
3645 # handle cannot be retrieved
3646 self.skipTest("failed to query terminal size")
3647 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003648 self.assertEqual(expected, actual)
3649
3650
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003651@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003652@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003653class MemfdCreateTests(unittest.TestCase):
3654 def test_memfd_create(self):
3655 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3656 self.assertNotEqual(fd, -1)
3657 self.addCleanup(os.close, fd)
3658 self.assertFalse(os.get_inheritable(fd))
3659 with open(fd, "wb", closefd=False) as f:
3660 f.write(b'memfd_create')
3661 self.assertEqual(f.tell(), 12)
3662
3663 fd2 = os.memfd_create("Hi")
3664 self.addCleanup(os.close, fd2)
3665 self.assertFalse(os.get_inheritable(fd2))
3666
3667
Christian Heimescd9fed62020-11-13 19:48:52 +01003668@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3669@support.requires_linux_version(2, 6, 30)
3670class EventfdTests(unittest.TestCase):
3671 def test_eventfd_initval(self):
3672 def pack(value):
3673 """Pack as native uint64_t
3674 """
3675 return struct.pack("@Q", value)
3676 size = 8 # read/write 8 bytes
3677 initval = 42
3678 fd = os.eventfd(initval)
3679 self.assertNotEqual(fd, -1)
3680 self.addCleanup(os.close, fd)
3681 self.assertFalse(os.get_inheritable(fd))
3682
3683 # test with raw read/write
3684 res = os.read(fd, size)
3685 self.assertEqual(res, pack(initval))
3686
3687 os.write(fd, pack(23))
3688 res = os.read(fd, size)
3689 self.assertEqual(res, pack(23))
3690
3691 os.write(fd, pack(40))
3692 os.write(fd, pack(2))
3693 res = os.read(fd, size)
3694 self.assertEqual(res, pack(42))
3695
3696 # test with eventfd_read/eventfd_write
3697 os.eventfd_write(fd, 20)
3698 os.eventfd_write(fd, 3)
3699 res = os.eventfd_read(fd)
3700 self.assertEqual(res, 23)
3701
3702 def test_eventfd_semaphore(self):
3703 initval = 2
3704 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3705 fd = os.eventfd(initval, flags)
3706 self.assertNotEqual(fd, -1)
3707 self.addCleanup(os.close, fd)
3708
3709 # semaphore starts has initval 2, two reads return '1'
3710 res = os.eventfd_read(fd)
3711 self.assertEqual(res, 1)
3712 res = os.eventfd_read(fd)
3713 self.assertEqual(res, 1)
3714 # third read would block
3715 with self.assertRaises(BlockingIOError):
3716 os.eventfd_read(fd)
3717 with self.assertRaises(BlockingIOError):
3718 os.read(fd, 8)
3719
3720 # increase semaphore counter, read one
3721 os.eventfd_write(fd, 1)
3722 res = os.eventfd_read(fd)
3723 self.assertEqual(res, 1)
3724 # next read would block, too
3725 with self.assertRaises(BlockingIOError):
3726 os.eventfd_read(fd)
3727
3728 def test_eventfd_select(self):
3729 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3730 fd = os.eventfd(0, flags)
3731 self.assertNotEqual(fd, -1)
3732 self.addCleanup(os.close, fd)
3733
3734 # counter is zero, only writeable
3735 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3736 self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3737
3738 # counter is non-zero, read and writeable
3739 os.eventfd_write(fd, 23)
3740 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3741 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3742 self.assertEqual(os.eventfd_read(fd), 23)
3743
3744 # counter at max, only readable
3745 os.eventfd_write(fd, (2**64) - 2)
3746 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3747 self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3748 os.eventfd_read(fd)
3749
3750
Victor Stinner292c8352012-10-30 02:17:38 +01003751class OSErrorTests(unittest.TestCase):
3752 def setUp(self):
3753 class Str(str):
3754 pass
3755
Victor Stinnerafe17062012-10-31 22:47:43 +01003756 self.bytes_filenames = []
3757 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003758 if os_helper.TESTFN_UNENCODABLE is not None:
3759 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003760 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003761 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003762 self.unicode_filenames.append(decoded)
3763 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003764 if os_helper.TESTFN_UNDECODABLE is not None:
3765 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003766 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003767 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003768 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003769 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003770 self.bytes_filenames.append(memoryview(encoded))
3771
3772 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003773
3774 def test_oserror_filename(self):
3775 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003776 (self.filenames, os.chdir,),
3777 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003778 (self.filenames, os.lstat,),
3779 (self.filenames, os.open, os.O_RDONLY),
3780 (self.filenames, os.rmdir,),
3781 (self.filenames, os.stat,),
3782 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003783 ]
3784 if sys.platform == "win32":
3785 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003786 (self.bytes_filenames, os.rename, b"dst"),
3787 (self.bytes_filenames, os.replace, b"dst"),
3788 (self.unicode_filenames, os.rename, "dst"),
3789 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003790 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003791 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003792 else:
3793 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003794 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003795 (self.filenames, os.rename, "dst"),
3796 (self.filenames, os.replace, "dst"),
3797 ))
3798 if hasattr(os, "chown"):
3799 funcs.append((self.filenames, os.chown, 0, 0))
3800 if hasattr(os, "lchown"):
3801 funcs.append((self.filenames, os.lchown, 0, 0))
3802 if hasattr(os, "truncate"):
3803 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003804 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003805 funcs.append((self.filenames, os.chflags, 0))
3806 if hasattr(os, "lchflags"):
3807 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003808 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003809 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003810 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003811 if sys.platform == "win32":
3812 funcs.append((self.bytes_filenames, os.link, b"dst"))
3813 funcs.append((self.unicode_filenames, os.link, "dst"))
3814 else:
3815 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003816 if hasattr(os, "listxattr"):
3817 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003818 (self.filenames, os.listxattr,),
3819 (self.filenames, os.getxattr, "user.test"),
3820 (self.filenames, os.setxattr, "user.test", b'user'),
3821 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003822 ))
3823 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003824 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003825 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003826 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003827
Steve Dowercc16be82016-09-08 10:35:16 -07003828
Victor Stinnerafe17062012-10-31 22:47:43 +01003829 for filenames, func, *func_args in funcs:
3830 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003831 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003832 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003833 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003834 else:
3835 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3836 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003837 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003838 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003839 except UnicodeDecodeError:
3840 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003841 else:
3842 self.fail("No exception thrown by {}".format(func))
3843
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003844class CPUCountTests(unittest.TestCase):
3845 def test_cpu_count(self):
3846 cpus = os.cpu_count()
3847 if cpus is not None:
3848 self.assertIsInstance(cpus, int)
3849 self.assertGreater(cpus, 0)
3850 else:
3851 self.skipTest("Could not determine the number of CPUs")
3852
Victor Stinnerdaf45552013-08-28 00:53:59 +02003853
3854class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003855 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003856 fd = os.open(__file__, os.O_RDONLY)
3857 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003858 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003859
Victor Stinnerdaf45552013-08-28 00:53:59 +02003860 os.set_inheritable(fd, True)
3861 self.assertEqual(os.get_inheritable(fd), True)
3862
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003863 @unittest.skipIf(fcntl is None, "need fcntl")
3864 def test_get_inheritable_cloexec(self):
3865 fd = os.open(__file__, os.O_RDONLY)
3866 self.addCleanup(os.close, fd)
3867 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003868
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003869 # clear FD_CLOEXEC flag
3870 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3871 flags &= ~fcntl.FD_CLOEXEC
3872 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003873
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003874 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003875
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003876 @unittest.skipIf(fcntl is None, "need fcntl")
3877 def test_set_inheritable_cloexec(self):
3878 fd = os.open(__file__, os.O_RDONLY)
3879 self.addCleanup(os.close, fd)
3880 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3881 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003882
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003883 os.set_inheritable(fd, True)
3884 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3885 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003886
cptpcrd7dc71c42021-01-20 09:05:51 -05003887 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3888 def test_get_set_inheritable_o_path(self):
3889 fd = os.open(__file__, os.O_PATH)
3890 self.addCleanup(os.close, fd)
3891 self.assertEqual(os.get_inheritable(fd), False)
3892
3893 os.set_inheritable(fd, True)
3894 self.assertEqual(os.get_inheritable(fd), True)
3895
3896 os.set_inheritable(fd, False)
3897 self.assertEqual(os.get_inheritable(fd), False)
3898
3899 def test_get_set_inheritable_badf(self):
3900 fd = os_helper.make_bad_fd()
3901
3902 with self.assertRaises(OSError) as ctx:
3903 os.get_inheritable(fd)
3904 self.assertEqual(ctx.exception.errno, errno.EBADF)
3905
3906 with self.assertRaises(OSError) as ctx:
3907 os.set_inheritable(fd, True)
3908 self.assertEqual(ctx.exception.errno, errno.EBADF)
3909
3910 with self.assertRaises(OSError) as ctx:
3911 os.set_inheritable(fd, False)
3912 self.assertEqual(ctx.exception.errno, errno.EBADF)
3913
Victor Stinnerdaf45552013-08-28 00:53:59 +02003914 def test_open(self):
3915 fd = os.open(__file__, os.O_RDONLY)
3916 self.addCleanup(os.close, fd)
3917 self.assertEqual(os.get_inheritable(fd), False)
3918
3919 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3920 def test_pipe(self):
3921 rfd, wfd = os.pipe()
3922 self.addCleanup(os.close, rfd)
3923 self.addCleanup(os.close, wfd)
3924 self.assertEqual(os.get_inheritable(rfd), False)
3925 self.assertEqual(os.get_inheritable(wfd), False)
3926
3927 def test_dup(self):
3928 fd1 = os.open(__file__, os.O_RDONLY)
3929 self.addCleanup(os.close, fd1)
3930
3931 fd2 = os.dup(fd1)
3932 self.addCleanup(os.close, fd2)
3933 self.assertEqual(os.get_inheritable(fd2), False)
3934
Zackery Spytz5be66602019-08-23 12:38:41 -06003935 def test_dup_standard_stream(self):
3936 fd = os.dup(1)
3937 self.addCleanup(os.close, fd)
3938 self.assertGreater(fd, 0)
3939
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003940 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3941 def test_dup_nul(self):
3942 # os.dup() was creating inheritable fds for character files.
3943 fd1 = os.open('NUL', os.O_RDONLY)
3944 self.addCleanup(os.close, fd1)
3945 fd2 = os.dup(fd1)
3946 self.addCleanup(os.close, fd2)
3947 self.assertFalse(os.get_inheritable(fd2))
3948
Victor Stinnerdaf45552013-08-28 00:53:59 +02003949 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3950 def test_dup2(self):
3951 fd = os.open(__file__, os.O_RDONLY)
3952 self.addCleanup(os.close, fd)
3953
3954 # inheritable by default
3955 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003956 self.addCleanup(os.close, fd2)
3957 self.assertEqual(os.dup2(fd, fd2), fd2)
3958 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003959
3960 # force non-inheritable
3961 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003962 self.addCleanup(os.close, fd3)
3963 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3964 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003965
3966 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3967 def test_openpty(self):
3968 master_fd, slave_fd = os.openpty()
3969 self.addCleanup(os.close, master_fd)
3970 self.addCleanup(os.close, slave_fd)
3971 self.assertEqual(os.get_inheritable(master_fd), False)
3972 self.assertEqual(os.get_inheritable(slave_fd), False)
3973
3974
Brett Cannon3f9183b2016-08-26 14:44:48 -07003975class PathTConverterTests(unittest.TestCase):
3976 # tuples of (function name, allows fd arguments, additional arguments to
3977 # function, cleanup function)
3978 functions = [
3979 ('stat', True, (), None),
3980 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003981 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003982 ('chflags', False, (0,), None),
3983 ('lchflags', False, (0,), None),
3984 ('open', False, (0,), getattr(os, 'close', None)),
3985 ]
3986
3987 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003988 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003989 if os.name == 'nt':
3990 bytes_fspath = bytes_filename = None
3991 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003992 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003993 bytes_fspath = FakePath(bytes_filename)
3994 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003995 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003996 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003997
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003998 int_fspath = FakePath(fd)
3999 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07004000
4001 for name, allow_fd, extra_args, cleanup_fn in self.functions:
4002 with self.subTest(name=name):
4003 try:
4004 fn = getattr(os, name)
4005 except AttributeError:
4006 continue
4007
Brett Cannon8f96a302016-08-26 19:30:11 -07004008 for path in (str_filename, bytes_filename, str_fspath,
4009 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07004010 if path is None:
4011 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07004012 with self.subTest(name=name, path=path):
4013 result = fn(path, *extra_args)
4014 if cleanup_fn is not None:
4015 cleanup_fn(result)
4016
4017 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004018 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07004019 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07004020
4021 if allow_fd:
4022 result = fn(fd, *extra_args) # should not fail
4023 if cleanup_fn is not None:
4024 cleanup_fn(result)
4025 else:
4026 with self.assertRaisesRegex(
4027 TypeError,
4028 'os.PathLike'):
4029 fn(fd, *extra_args)
4030
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004031 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004032 msg = r'__fspath__\(\) to return str or bytes, not %s'
4033 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004034 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004035 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004036 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004037 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004038 os.stat(FakePath(object()))
4039
Brett Cannon3f9183b2016-08-26 14:44:48 -07004040
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004041@unittest.skipUnless(hasattr(os, 'get_blocking'),
4042 'needs os.get_blocking() and os.set_blocking()')
4043class BlockingTests(unittest.TestCase):
4044 def test_blocking(self):
4045 fd = os.open(__file__, os.O_RDONLY)
4046 self.addCleanup(os.close, fd)
4047 self.assertEqual(os.get_blocking(fd), True)
4048
4049 os.set_blocking(fd, False)
4050 self.assertEqual(os.get_blocking(fd), False)
4051
4052 os.set_blocking(fd, True)
4053 self.assertEqual(os.get_blocking(fd), True)
4054
4055
Yury Selivanov97e2e062014-09-26 12:33:06 -04004056
4057class ExportsTests(unittest.TestCase):
4058 def test_os_all(self):
4059 self.assertIn('open', os.__all__)
4060 self.assertIn('walk', os.__all__)
4061
4062
Eddie Elizondob3966632019-11-05 07:16:14 -08004063class TestDirEntry(unittest.TestCase):
4064 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004065 self.path = os.path.realpath(os_helper.TESTFN)
4066 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08004067 os.mkdir(self.path)
4068
4069 def test_uninstantiable(self):
4070 self.assertRaises(TypeError, os.DirEntry)
4071
4072 def test_unpickable(self):
4073 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4074 entry = [entry for entry in os.scandir(self.path)].pop()
4075 self.assertIsInstance(entry, os.DirEntry)
4076 self.assertEqual(entry.name, "file.txt")
4077 import pickle
4078 self.assertRaises(TypeError, pickle.dumps, entry, filename)
4079
4080
Victor Stinner6036e442015-03-08 01:58:04 +01004081class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004082 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004083
Victor Stinner6036e442015-03-08 01:58:04 +01004084 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004085 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07004086 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08004087 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01004088 os.mkdir(self.path)
4089
4090 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07004091 path = self.bytes_path if isinstance(name, bytes) else self.path
4092 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01004093 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01004094 return filename
4095
4096 def get_entries(self, names):
4097 entries = dict((entry.name, entry)
4098 for entry in os.scandir(self.path))
4099 self.assertEqual(sorted(entries.keys()), names)
4100 return entries
4101
4102 def assert_stat_equal(self, stat1, stat2, skip_fields):
4103 if skip_fields:
4104 for attr in dir(stat1):
4105 if not attr.startswith("st_"):
4106 continue
4107 if attr in ("st_dev", "st_ino", "st_nlink"):
4108 continue
4109 self.assertEqual(getattr(stat1, attr),
4110 getattr(stat2, attr),
4111 (stat1, stat2, attr))
4112 else:
4113 self.assertEqual(stat1, stat2)
4114
Eddie Elizondob3966632019-11-05 07:16:14 -08004115 def test_uninstantiable(self):
4116 scandir_iter = os.scandir(self.path)
4117 self.assertRaises(TypeError, type(scandir_iter))
4118 scandir_iter.close()
4119
4120 def test_unpickable(self):
4121 filename = self.create_file("file.txt")
4122 scandir_iter = os.scandir(self.path)
4123 import pickle
4124 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4125 scandir_iter.close()
4126
Victor Stinner6036e442015-03-08 01:58:04 +01004127 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07004128 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01004129 self.assertEqual(entry.name, name)
4130 self.assertEqual(entry.path, os.path.join(self.path, name))
4131 self.assertEqual(entry.inode(),
4132 os.stat(entry.path, follow_symlinks=False).st_ino)
4133
4134 entry_stat = os.stat(entry.path)
4135 self.assertEqual(entry.is_dir(),
4136 stat.S_ISDIR(entry_stat.st_mode))
4137 self.assertEqual(entry.is_file(),
4138 stat.S_ISREG(entry_stat.st_mode))
4139 self.assertEqual(entry.is_symlink(),
4140 os.path.islink(entry.path))
4141
4142 entry_lstat = os.stat(entry.path, follow_symlinks=False)
4143 self.assertEqual(entry.is_dir(follow_symlinks=False),
4144 stat.S_ISDIR(entry_lstat.st_mode))
4145 self.assertEqual(entry.is_file(follow_symlinks=False),
4146 stat.S_ISREG(entry_lstat.st_mode))
4147
4148 self.assert_stat_equal(entry.stat(),
4149 entry_stat,
4150 os.name == 'nt' and not is_symlink)
4151 self.assert_stat_equal(entry.stat(follow_symlinks=False),
4152 entry_lstat,
4153 os.name == 'nt')
4154
4155 def test_attributes(self):
4156 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08004157 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01004158
4159 dirname = os.path.join(self.path, "dir")
4160 os.mkdir(dirname)
4161 filename = self.create_file("file.txt")
4162 if link:
xdegaye6a55d092017-11-12 17:57:04 +01004163 try:
4164 os.link(filename, os.path.join(self.path, "link_file.txt"))
4165 except PermissionError as e:
4166 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01004167 if symlink:
4168 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4169 target_is_directory=True)
4170 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4171
4172 names = ['dir', 'file.txt']
4173 if link:
4174 names.append('link_file.txt')
4175 if symlink:
4176 names.extend(('symlink_dir', 'symlink_file.txt'))
4177 entries = self.get_entries(names)
4178
4179 entry = entries['dir']
4180 self.check_entry(entry, 'dir', True, False, False)
4181
4182 entry = entries['file.txt']
4183 self.check_entry(entry, 'file.txt', False, True, False)
4184
4185 if link:
4186 entry = entries['link_file.txt']
4187 self.check_entry(entry, 'link_file.txt', False, True, False)
4188
4189 if symlink:
4190 entry = entries['symlink_dir']
4191 self.check_entry(entry, 'symlink_dir', True, False, True)
4192
4193 entry = entries['symlink_file.txt']
4194 self.check_entry(entry, 'symlink_file.txt', False, True, True)
4195
4196 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07004197 path = self.bytes_path if isinstance(name, bytes) else self.path
4198 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01004199 self.assertEqual(len(entries), 1)
4200
4201 entry = entries[0]
4202 self.assertEqual(entry.name, name)
4203 return entry
4204
Brett Cannon96881cd2016-06-10 14:37:21 -07004205 def create_file_entry(self, name='file.txt'):
4206 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01004207 return self.get_entry(os.path.basename(filename))
4208
4209 def test_current_directory(self):
4210 filename = self.create_file()
4211 old_dir = os.getcwd()
4212 try:
4213 os.chdir(self.path)
4214
4215 # call scandir() without parameter: it must list the content
4216 # of the current directory
4217 entries = dict((entry.name, entry) for entry in os.scandir())
4218 self.assertEqual(sorted(entries.keys()),
4219 [os.path.basename(filename)])
4220 finally:
4221 os.chdir(old_dir)
4222
4223 def test_repr(self):
4224 entry = self.create_file_entry()
4225 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4226
Brett Cannon96881cd2016-06-10 14:37:21 -07004227 def test_fspath_protocol(self):
4228 entry = self.create_file_entry()
4229 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4230
4231 def test_fspath_protocol_bytes(self):
4232 bytes_filename = os.fsencode('bytesfile.txt')
4233 bytes_entry = self.create_file_entry(name=bytes_filename)
4234 fspath = os.fspath(bytes_entry)
4235 self.assertIsInstance(fspath, bytes)
4236 self.assertEqual(fspath,
4237 os.path.join(os.fsencode(self.path),bytes_filename))
4238
Victor Stinner6036e442015-03-08 01:58:04 +01004239 def test_removed_dir(self):
4240 path = os.path.join(self.path, 'dir')
4241
4242 os.mkdir(path)
4243 entry = self.get_entry('dir')
4244 os.rmdir(path)
4245
4246 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4247 if os.name == 'nt':
4248 self.assertTrue(entry.is_dir())
4249 self.assertFalse(entry.is_file())
4250 self.assertFalse(entry.is_symlink())
4251 if os.name == 'nt':
4252 self.assertRaises(FileNotFoundError, entry.inode)
4253 # don't fail
4254 entry.stat()
4255 entry.stat(follow_symlinks=False)
4256 else:
4257 self.assertGreater(entry.inode(), 0)
4258 self.assertRaises(FileNotFoundError, entry.stat)
4259 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4260
4261 def test_removed_file(self):
4262 entry = self.create_file_entry()
4263 os.unlink(entry.path)
4264
4265 self.assertFalse(entry.is_dir())
4266 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4267 if os.name == 'nt':
4268 self.assertTrue(entry.is_file())
4269 self.assertFalse(entry.is_symlink())
4270 if os.name == 'nt':
4271 self.assertRaises(FileNotFoundError, entry.inode)
4272 # don't fail
4273 entry.stat()
4274 entry.stat(follow_symlinks=False)
4275 else:
4276 self.assertGreater(entry.inode(), 0)
4277 self.assertRaises(FileNotFoundError, entry.stat)
4278 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4279
4280 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004281 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004282 return self.skipTest('cannot create symbolic link')
4283
4284 filename = self.create_file("file.txt")
4285 os.symlink(filename,
4286 os.path.join(self.path, "symlink.txt"))
4287 entries = self.get_entries(['file.txt', 'symlink.txt'])
4288 entry = entries['symlink.txt']
4289 os.unlink(filename)
4290
4291 self.assertGreater(entry.inode(), 0)
4292 self.assertFalse(entry.is_dir())
4293 self.assertFalse(entry.is_file()) # broken symlink returns False
4294 self.assertFalse(entry.is_dir(follow_symlinks=False))
4295 self.assertFalse(entry.is_file(follow_symlinks=False))
4296 self.assertTrue(entry.is_symlink())
4297 self.assertRaises(FileNotFoundError, entry.stat)
4298 # don't fail
4299 entry.stat(follow_symlinks=False)
4300
4301 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004302 self.create_file("file.txt")
4303
4304 path_bytes = os.fsencode(self.path)
4305 entries = list(os.scandir(path_bytes))
4306 self.assertEqual(len(entries), 1, entries)
4307 entry = entries[0]
4308
4309 self.assertEqual(entry.name, b'file.txt')
4310 self.assertEqual(entry.path,
4311 os.fsencode(os.path.join(self.path, 'file.txt')))
4312
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004313 def test_bytes_like(self):
4314 self.create_file("file.txt")
4315
4316 for cls in bytearray, memoryview:
4317 path_bytes = cls(os.fsencode(self.path))
4318 with self.assertWarns(DeprecationWarning):
4319 entries = list(os.scandir(path_bytes))
4320 self.assertEqual(len(entries), 1, entries)
4321 entry = entries[0]
4322
4323 self.assertEqual(entry.name, b'file.txt')
4324 self.assertEqual(entry.path,
4325 os.fsencode(os.path.join(self.path, 'file.txt')))
4326 self.assertIs(type(entry.name), bytes)
4327 self.assertIs(type(entry.path), bytes)
4328
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004329 @unittest.skipUnless(os.listdir in os.supports_fd,
4330 'fd support for listdir required for this test.')
4331 def test_fd(self):
4332 self.assertIn(os.scandir, os.supports_fd)
4333 self.create_file('file.txt')
4334 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004335 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004336 os.symlink('file.txt', os.path.join(self.path, 'link'))
4337 expected_names.append('link')
4338
4339 fd = os.open(self.path, os.O_RDONLY)
4340 try:
4341 with os.scandir(fd) as it:
4342 entries = list(it)
4343 names = [entry.name for entry in entries]
4344 self.assertEqual(sorted(names), expected_names)
4345 self.assertEqual(names, os.listdir(fd))
4346 for entry in entries:
4347 self.assertEqual(entry.path, entry.name)
4348 self.assertEqual(os.fspath(entry), entry.name)
4349 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4350 if os.stat in os.supports_dir_fd:
4351 st = os.stat(entry.name, dir_fd=fd)
4352 self.assertEqual(entry.stat(), st)
4353 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4354 self.assertEqual(entry.stat(follow_symlinks=False), st)
4355 finally:
4356 os.close(fd)
4357
Victor Stinner6036e442015-03-08 01:58:04 +01004358 def test_empty_path(self):
4359 self.assertRaises(FileNotFoundError, os.scandir, '')
4360
4361 def test_consume_iterator_twice(self):
4362 self.create_file("file.txt")
4363 iterator = os.scandir(self.path)
4364
4365 entries = list(iterator)
4366 self.assertEqual(len(entries), 1, entries)
4367
4368 # check than consuming the iterator twice doesn't raise exception
4369 entries2 = list(iterator)
4370 self.assertEqual(len(entries2), 0, entries2)
4371
4372 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004373 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004374 self.assertRaises(TypeError, os.scandir, obj)
4375
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004376 def test_close(self):
4377 self.create_file("file.txt")
4378 self.create_file("file2.txt")
4379 iterator = os.scandir(self.path)
4380 next(iterator)
4381 iterator.close()
4382 # multiple closes
4383 iterator.close()
4384 with self.check_no_resource_warning():
4385 del iterator
4386
4387 def test_context_manager(self):
4388 self.create_file("file.txt")
4389 self.create_file("file2.txt")
4390 with os.scandir(self.path) as iterator:
4391 next(iterator)
4392 with self.check_no_resource_warning():
4393 del iterator
4394
4395 def test_context_manager_close(self):
4396 self.create_file("file.txt")
4397 self.create_file("file2.txt")
4398 with os.scandir(self.path) as iterator:
4399 next(iterator)
4400 iterator.close()
4401
4402 def test_context_manager_exception(self):
4403 self.create_file("file.txt")
4404 self.create_file("file2.txt")
4405 with self.assertRaises(ZeroDivisionError):
4406 with os.scandir(self.path) as iterator:
4407 next(iterator)
4408 1/0
4409 with self.check_no_resource_warning():
4410 del iterator
4411
4412 def test_resource_warning(self):
4413 self.create_file("file.txt")
4414 self.create_file("file2.txt")
4415 iterator = os.scandir(self.path)
4416 next(iterator)
4417 with self.assertWarns(ResourceWarning):
4418 del iterator
4419 support.gc_collect()
4420 # exhausted iterator
4421 iterator = os.scandir(self.path)
4422 list(iterator)
4423 with self.check_no_resource_warning():
4424 del iterator
4425
Victor Stinner6036e442015-03-08 01:58:04 +01004426
Ethan Furmancdc08792016-06-02 15:06:09 -07004427class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004428
4429 # Abstracted so it can be overridden to test pure Python implementation
4430 # if a C version is provided.
4431 fspath = staticmethod(os.fspath)
4432
Ethan Furmancdc08792016-06-02 15:06:09 -07004433 def test_return_bytes(self):
4434 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004435 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004436
4437 def test_return_string(self):
4438 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004439 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004440
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004441 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004442 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004443 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004444
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004445 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004446 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4447 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4448
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004449 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004450 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4451 self.assertTrue(issubclass(FakePath, os.PathLike))
4452 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004453
Ethan Furmancdc08792016-06-02 15:06:09 -07004454 def test_garbage_in_exception_out(self):
4455 vapor = type('blah', (), {})
4456 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004457 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004458
4459 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004460 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004461
Brett Cannon044283a2016-07-15 10:41:49 -07004462 def test_bad_pathlike(self):
4463 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004464 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004465 # __fspath__ attribute that is not callable.
4466 c = type('foo', (), {})
4467 c.__fspath__ = 1
4468 self.assertRaises(TypeError, self.fspath, c())
4469 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004470 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004471 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004472
Bar Hareleae87e32019-12-22 11:57:27 +02004473 def test_pathlike_subclasshook(self):
4474 # bpo-38878: subclasshook causes subclass checks
4475 # true on abstract implementation.
4476 class A(os.PathLike):
4477 pass
4478 self.assertFalse(issubclass(FakePath, A))
4479 self.assertTrue(issubclass(FakePath, os.PathLike))
4480
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004481 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004482 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004483
Victor Stinnerc29b5852017-11-02 07:28:27 -07004484
4485class TimesTests(unittest.TestCase):
4486 def test_times(self):
4487 times = os.times()
4488 self.assertIsInstance(times, os.times_result)
4489
4490 for field in ('user', 'system', 'children_user', 'children_system',
4491 'elapsed'):
4492 value = getattr(times, field)
4493 self.assertIsInstance(value, float)
4494
4495 if os.name == 'nt':
4496 self.assertEqual(times.children_user, 0)
4497 self.assertEqual(times.children_system, 0)
4498 self.assertEqual(times.elapsed, 0)
4499
4500
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004501# Only test if the C version is provided, otherwise TestPEP519 already tested
4502# the pure Python implementation.
4503if hasattr(os, "_fspath"):
4504 class TestPEP519PurePython(TestPEP519):
4505
4506 """Explicitly test the pure Python implementation of os.fspath()."""
4507
4508 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004509
4510
Fred Drake2e2be372001-09-20 21:33:42 +00004511if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004512 unittest.main()