blob: 96fddc7d057663eb8dabcfa2f0bd467ab7a7dc8d [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Christian Heimescd9fed62020-11-13 19:48:52 +010018import select
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Christian Heimescd9fed62020-11-13 19:48:52 +010023import struct
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020027import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020028import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020029import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070030import types
Victor Stinner47aacc82015-06-12 17:26:23 +020031import unittest
32import uuid
33import warnings
34from test import support
Hai Shid94af3f2020-08-08 17:32:41 +080035from test.support import import_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080036from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030037from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080038from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080039from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070040from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020041
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042try:
43 import resource
44except ImportError:
45 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010050try:
51 import _winapi
52except ImportError:
53 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020054try:
R David Murrayf2ad1732014-12-25 18:36:56 -050055 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Christian Heimescd9fed62020-11-13 19:48:52 +010064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080066from test.support import unix_shell
67from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
Berker Peksag4af23d72016-09-15 20:32:44 +030087def requires_os_func(name):
88 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
89
90
Victor Stinnerae39d232016-03-24 17:12:55 +010091def create_file(filename, content=b'content'):
92 with open(filename, "xb", 0) as fp:
93 fp.write(content)
94
95
Victor Stinner1de61d32020-11-17 23:08:10 +010096# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
97requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
98 'on AIX, splice() only accepts sockets')
99
100
Victor Stinner689830e2019-06-26 17:31:12 +0200101class MiscTests(unittest.TestCase):
102 def test_getcwd(self):
103 cwd = os.getcwd()
104 self.assertIsInstance(cwd, str)
105
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200106 def test_getcwd_long_path(self):
107 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
108 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
109 # longer path if longer paths support is enabled. Internally, the os
110 # module uses MAXPATHLEN which is at least 1024.
111 #
112 # Use a directory name of 200 characters to fit into Windows MAX_PATH
113 # limit.
114 #
115 # On Windows, the test can stop when trying to create a path longer
116 # than MAX_PATH if long paths support is disabled:
117 # see RtlAreLongPathsEnabled().
118 min_len = 2000 # characters
pxinwraa1b8a12020-11-29 04:21:30 +0800119 # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path
120 # longer than PATH_MAX will fail.
121 if sys.platform == 'vxworks':
122 min_len = 1000
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200123 dirlen = 200 # characters
124 dirname = 'python_test_dir_'
125 dirname = dirname + ('a' * (dirlen - len(dirname)))
126
127 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800128 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200129 expected = path
130
131 while True:
132 cwd = os.getcwd()
133 self.assertEqual(cwd, expected)
134
135 need = min_len - (len(cwd) + len(os.path.sep))
136 if need <= 0:
137 break
138 if len(dirname) > need and need > 0:
139 dirname = dirname[:need]
140
141 path = os.path.join(path, dirname)
142 try:
143 os.mkdir(path)
144 # On Windows, chdir() can fail
145 # even if mkdir() succeeded
146 os.chdir(path)
147 except FileNotFoundError:
148 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
149 # ERROR_FILENAME_EXCED_RANGE (206) errors
150 # ("The filename or extension is too long")
151 break
152 except OSError as exc:
153 if exc.errno == errno.ENAMETOOLONG:
154 break
155 else:
156 raise
157
158 expected = path
159
160 if support.verbose:
161 print(f"Tested current directory length: {len(cwd)}")
162
Victor Stinner689830e2019-06-26 17:31:12 +0200163 def test_getcwdb(self):
164 cwd = os.getcwdb()
165 self.assertIsInstance(cwd, bytes)
166 self.assertEqual(os.fsdecode(cwd), os.getcwd())
167
168
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000169# Tests creating TESTFN
170class FileTests(unittest.TestCase):
171 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800172 if os.path.lexists(os_helper.TESTFN):
173 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000174 tearDown = setUp
175
176 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800177 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000178 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800179 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000180
Christian Heimesfdab48e2008-01-20 09:06:41 +0000181 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800182 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000183 # We must allocate two consecutive file descriptors, otherwise
184 # it will mess up other file descriptors (perhaps even the three
185 # standard ones).
186 second = os.dup(first)
187 try:
188 retries = 0
189 while second != first + 1:
190 os.close(first)
191 retries += 1
192 if retries > 10:
193 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000194 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000195 first, second = second, os.dup(second)
196 finally:
197 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000198 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000199 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000200 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000201
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000202 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000203 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800204 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000205 old = sys.getrefcount(path)
206 self.assertRaises(TypeError, os.rename, path, 0)
207 new = sys.getrefcount(path)
208 self.assertEqual(old, new)
209
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000210 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800211 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000212 fobj.write(b"spam")
213 fobj.flush()
214 fd = fobj.fileno()
215 os.lseek(fd, 0, 0)
216 s = os.read(fd, 4)
217 self.assertEqual(type(s), bytes)
218 self.assertEqual(s, b"spam")
219
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200220 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200221 # Skip the test on 32-bit platforms: the number of bytes must fit in a
222 # Py_ssize_t type
223 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
224 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200225 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
226 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800227 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
228 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200229
230 # Issue #21932: Make sure that os.read() does not raise an
231 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800232 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200233 data = os.read(fp.fileno(), size)
234
Victor Stinner8c663fd2017-11-08 14:44:44 -0800235 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200236 # operating system is free to return less bytes than requested.
237 self.assertEqual(data, b'test')
238
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000239 def test_write(self):
240 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800241 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000242 self.assertRaises(TypeError, os.write, fd, "beans")
243 os.write(fd, b"bacon\n")
244 os.write(fd, bytearray(b"eggs\n"))
245 os.write(fd, memoryview(b"spam\n"))
246 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800247 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000248 self.assertEqual(fobj.read().splitlines(),
249 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000250
Victor Stinnere0daff12011-03-20 23:36:35 +0100251 def write_windows_console(self, *args):
252 retcode = subprocess.call(args,
253 # use a new console to not flood the test output
254 creationflags=subprocess.CREATE_NEW_CONSOLE,
255 # use a shell to hide the console window (SW_HIDE)
256 shell=True)
257 self.assertEqual(retcode, 0)
258
259 @unittest.skipUnless(sys.platform == 'win32',
260 'test specific to the Windows console')
261 def test_write_windows_console(self):
262 # Issue #11395: the Windows console returns an error (12: not enough
263 # space error) on writing into stdout if stdout mode is binary and the
264 # length is greater than 66,000 bytes (or less, depending on heap
265 # usage).
266 code = "print('x' * 100000)"
267 self.write_windows_console(sys.executable, "-c", code)
268 self.write_windows_console(sys.executable, "-u", "-c", code)
269
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000270 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800271 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200272 f = os.fdopen(fd, *args)
273 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000274
275 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800276 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200277 os.close(fd)
278
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000279 self.fdopen_helper()
280 self.fdopen_helper('r')
281 self.fdopen_helper('r', 100)
282
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100283 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800284 TESTFN2 = os_helper.TESTFN + ".2"
285 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
286 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100287
Hai Shi0c4f0f32020-06-30 21:46:31 +0800288 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100289 create_file(TESTFN2, b"2")
290
Hai Shi0c4f0f32020-06-30 21:46:31 +0800291 os.replace(os_helper.TESTFN, TESTFN2)
292 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100293 with open(TESTFN2, 'r') as f:
294 self.assertEqual(f.read(), "1")
295
Martin Panterbf19d162015-09-09 01:01:13 +0000296 def test_open_keywords(self):
297 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
298 dir_fd=None)
299 os.close(f)
300
301 def test_symlink_keywords(self):
302 symlink = support.get_attribute(os, "symlink")
303 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800304 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000305 target_is_directory=False, dir_fd=None)
306 except (NotImplementedError, OSError):
307 pass # No OS support or unprivileged user
308
Pablo Galindoaac4d032019-05-31 19:39:47 +0100309 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
310 def test_copy_file_range_invalid_values(self):
311 with self.assertRaises(ValueError):
312 os.copy_file_range(0, 1, -10)
313
314 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
315 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800316 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100317 data = b'0123456789'
318
Hai Shi0c4f0f32020-06-30 21:46:31 +0800319 create_file(os_helper.TESTFN, data)
320 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100321
Hai Shi0c4f0f32020-06-30 21:46:31 +0800322 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100323 self.addCleanup(in_file.close)
324 in_fd = in_file.fileno()
325
326 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800327 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100328 self.addCleanup(out_file.close)
329 out_fd = out_file.fileno()
330
331 try:
332 i = os.copy_file_range(in_fd, out_fd, 5)
333 except OSError as e:
334 # Handle the case in which Python was compiled
335 # in a system with the syscall but without support
336 # in the kernel.
337 if e.errno != errno.ENOSYS:
338 raise
339 self.skipTest(e)
340 else:
341 # The number of copied bytes can be less than
342 # the number of bytes originally requested.
343 self.assertIn(i, range(0, 6));
344
345 with open(TESTFN2, 'rb') as in_file:
346 self.assertEqual(in_file.read(), data[:i])
347
348 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
349 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800350 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100351 data = b'0123456789'
352 bytes_to_copy = 6
353 in_skip = 3
354 out_seek = 5
355
Hai Shi0c4f0f32020-06-30 21:46:31 +0800356 create_file(os_helper.TESTFN, data)
357 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100358
Hai Shi0c4f0f32020-06-30 21:46:31 +0800359 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100360 self.addCleanup(in_file.close)
361 in_fd = in_file.fileno()
362
363 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800364 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100365 self.addCleanup(out_file.close)
366 out_fd = out_file.fileno()
367
368 try:
369 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
370 offset_src=in_skip,
371 offset_dst=out_seek)
372 except OSError as e:
373 # Handle the case in which Python was compiled
374 # in a system with the syscall but without support
375 # in the kernel.
376 if e.errno != errno.ENOSYS:
377 raise
378 self.skipTest(e)
379 else:
380 # The number of copied bytes can be less than
381 # the number of bytes originally requested.
382 self.assertIn(i, range(0, bytes_to_copy+1));
383
384 with open(TESTFN4, 'rb') as in_file:
385 read = in_file.read()
386 # seeked bytes (5) are zero'ed
387 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
388 # 012 are skipped (in_skip)
389 # 345678 are copied in the file (in_skip + bytes_to_copy)
390 self.assertEqual(read[out_seek:],
391 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200392
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000393 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
394 def test_splice_invalid_values(self):
395 with self.assertRaises(ValueError):
396 os.splice(0, 1, -10)
397
398 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100399 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000400 def test_splice(self):
401 TESTFN2 = os_helper.TESTFN + ".3"
402 data = b'0123456789'
403
404 create_file(os_helper.TESTFN, data)
405 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
406
407 in_file = open(os_helper.TESTFN, 'rb')
408 self.addCleanup(in_file.close)
409 in_fd = in_file.fileno()
410
411 read_fd, write_fd = os.pipe()
412 self.addCleanup(lambda: os.close(read_fd))
413 self.addCleanup(lambda: os.close(write_fd))
414
415 try:
416 i = os.splice(in_fd, write_fd, 5)
417 except OSError as e:
418 # Handle the case in which Python was compiled
419 # in a system with the syscall but without support
420 # in the kernel.
421 if e.errno != errno.ENOSYS:
422 raise
423 self.skipTest(e)
424 else:
425 # The number of copied bytes can be less than
426 # the number of bytes originally requested.
427 self.assertIn(i, range(0, 6));
428
429 self.assertEqual(os.read(read_fd, 100), data[:i])
430
431 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100432 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000433 def test_splice_offset_in(self):
434 TESTFN4 = os_helper.TESTFN + ".4"
435 data = b'0123456789'
436 bytes_to_copy = 6
437 in_skip = 3
438
439 create_file(os_helper.TESTFN, data)
440 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
441
442 in_file = open(os_helper.TESTFN, 'rb')
443 self.addCleanup(in_file.close)
444 in_fd = in_file.fileno()
445
446 read_fd, write_fd = os.pipe()
447 self.addCleanup(lambda: os.close(read_fd))
448 self.addCleanup(lambda: os.close(write_fd))
449
450 try:
451 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
452 except OSError as e:
453 # Handle the case in which Python was compiled
454 # in a system with the syscall but without support
455 # in the kernel.
456 if e.errno != errno.ENOSYS:
457 raise
458 self.skipTest(e)
459 else:
460 # The number of copied bytes can be less than
461 # the number of bytes originally requested.
462 self.assertIn(i, range(0, bytes_to_copy+1));
463
464 read = os.read(read_fd, 100)
465 # 012 are skipped (in_skip)
466 # 345678 are copied in the file (in_skip + bytes_to_copy)
467 self.assertEqual(read, data[in_skip:in_skip+i])
468
469 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100470 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000471 def test_splice_offset_out(self):
472 TESTFN4 = os_helper.TESTFN + ".4"
473 data = b'0123456789'
474 bytes_to_copy = 6
475 out_seek = 3
476
477 create_file(os_helper.TESTFN, data)
478 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
479
480 read_fd, write_fd = os.pipe()
481 self.addCleanup(lambda: os.close(read_fd))
482 self.addCleanup(lambda: os.close(write_fd))
483 os.write(write_fd, data)
484
485 out_file = open(TESTFN4, 'w+b')
486 self.addCleanup(os_helper.unlink, TESTFN4)
487 self.addCleanup(out_file.close)
488 out_fd = out_file.fileno()
489
490 try:
491 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
492 except OSError as e:
493 # Handle the case in which Python was compiled
494 # in a system with the syscall but without support
495 # in the kernel.
496 if e.errno != errno.ENOSYS:
497 raise
498 self.skipTest(e)
499 else:
500 # The number of copied bytes can be less than
501 # the number of bytes originally requested.
502 self.assertIn(i, range(0, bytes_to_copy+1));
503
504 with open(TESTFN4, 'rb') as in_file:
505 read = in_file.read()
506 # seeked bytes (5) are zero'ed
507 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
508 # 012 are skipped (in_skip)
509 # 345678 are copied in the file (in_skip + bytes_to_copy)
510 self.assertEqual(read[out_seek:], data[:i])
511
512
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000513# Test attributes on return values from os.*stat* family.
514class StatAttributeTests(unittest.TestCase):
515 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800516 self.fname = os_helper.TESTFN
517 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100518 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000519
Antoine Pitrou38425292010-09-21 18:19:07 +0000520 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000521 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000522
523 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000524 self.assertEqual(result[stat.ST_SIZE], 3)
525 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000526
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000527 # Make sure all the attributes are there
528 members = dir(result)
529 for name in dir(stat):
530 if name[:3] == 'ST_':
531 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000532 if name.endswith("TIME"):
533 def trunc(x): return int(x)
534 else:
535 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000536 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000537 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000538 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000539
Larry Hastings6fe20b32012-04-19 15:07:49 -0700540 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700541 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700542 for name in 'st_atime st_mtime st_ctime'.split():
543 floaty = int(getattr(result, name) * 100000)
544 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700545 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700546
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000547 try:
548 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200549 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000550 except IndexError:
551 pass
552
553 # Make sure that assignment fails
554 try:
555 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200556 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000557 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000558 pass
559
560 try:
561 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200562 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000563 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000564 pass
565
566 try:
567 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200568 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000569 except AttributeError:
570 pass
571
572 # Use the stat_result constructor with a too-short tuple.
573 try:
574 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200575 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000576 except TypeError:
577 pass
578
Ezio Melotti42da6632011-03-15 05:18:48 +0200579 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000580 try:
581 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
582 except TypeError:
583 pass
584
Antoine Pitrou38425292010-09-21 18:19:07 +0000585 def test_stat_attributes(self):
586 self.check_stat_attributes(self.fname)
587
588 def test_stat_attributes_bytes(self):
589 try:
590 fname = self.fname.encode(sys.getfilesystemencoding())
591 except UnicodeEncodeError:
592 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700593 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000594
Christian Heimes25827622013-10-12 01:27:08 +0200595 def test_stat_result_pickle(self):
596 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200597 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
598 p = pickle.dumps(result, proto)
599 self.assertIn(b'stat_result', p)
600 if proto < 4:
601 self.assertIn(b'cos\nstat_result\n', p)
602 unpickled = pickle.loads(p)
603 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200604
Serhiy Storchaka43767632013-11-03 21:31:38 +0200605 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000606 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700607 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000608
609 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000610 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000611
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000612 # Make sure all the attributes are there.
613 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
614 'ffree', 'favail', 'flag', 'namemax')
615 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000616 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000617
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100618 self.assertTrue(isinstance(result.f_fsid, int))
619
620 # Test that the size of the tuple doesn't change
621 self.assertEqual(len(result), 10)
622
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000623 # Make sure that assignment really fails
624 try:
625 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200626 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000627 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000628 pass
629
630 try:
631 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200632 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000633 except AttributeError:
634 pass
635
636 # Use the constructor with a too-short tuple.
637 try:
638 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200639 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000640 except TypeError:
641 pass
642
Ezio Melotti42da6632011-03-15 05:18:48 +0200643 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000644 try:
645 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
646 except TypeError:
647 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000648
Christian Heimes25827622013-10-12 01:27:08 +0200649 @unittest.skipUnless(hasattr(os, 'statvfs'),
650 "need os.statvfs()")
651 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700652 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200653
Serhiy Storchakabad12572014-12-15 14:03:42 +0200654 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
655 p = pickle.dumps(result, proto)
656 self.assertIn(b'statvfs_result', p)
657 if proto < 4:
658 self.assertIn(b'cos\nstatvfs_result\n', p)
659 unpickled = pickle.loads(p)
660 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200661
Serhiy Storchaka43767632013-11-03 21:31:38 +0200662 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
663 def test_1686475(self):
664 # Verify that an open file can be stat'ed
665 try:
666 os.stat(r"c:\pagefile.sys")
667 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600668 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200669 except OSError as e:
670 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000671
Serhiy Storchaka43767632013-11-03 21:31:38 +0200672 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
673 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
674 def test_15261(self):
675 # Verify that stat'ing a closed fd does not cause crash
676 r, w = os.pipe()
677 try:
678 os.stat(r) # should not raise error
679 finally:
680 os.close(r)
681 os.close(w)
682 with self.assertRaises(OSError) as ctx:
683 os.stat(r)
684 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100685
Zachary Ware63f277b2014-06-19 09:46:37 -0500686 def check_file_attributes(self, result):
687 self.assertTrue(hasattr(result, 'st_file_attributes'))
688 self.assertTrue(isinstance(result.st_file_attributes, int))
689 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
690
691 @unittest.skipUnless(sys.platform == "win32",
692 "st_file_attributes is Win32 specific")
693 def test_file_attributes(self):
694 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
695 result = os.stat(self.fname)
696 self.check_file_attributes(result)
697 self.assertEqual(
698 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
699 0)
700
701 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800702 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200703 os.mkdir(dirname)
704 self.addCleanup(os.rmdir, dirname)
705
706 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500707 self.check_file_attributes(result)
708 self.assertEqual(
709 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
710 stat.FILE_ATTRIBUTE_DIRECTORY)
711
Berker Peksag0b4dc482016-09-17 15:49:59 +0300712 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
713 def test_access_denied(self):
714 # Default to FindFirstFile WIN32_FIND_DATA when access is
715 # denied. See issue 28075.
716 # os.environ['TEMP'] should be located on a volume that
717 # supports file ACLs.
718 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800719 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300720 create_file(fname, b'ABC')
721 # Deny the right to [S]YNCHRONIZE on the file to
722 # force CreateFile to fail with ERROR_ACCESS_DENIED.
723 DETACHED_PROCESS = 8
724 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500725 # bpo-30584: Use security identifier *S-1-5-32-545 instead
726 # of localized "Users" to not depend on the locale.
727 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300728 creationflags=DETACHED_PROCESS
729 )
730 result = os.stat(fname)
731 self.assertNotEqual(result.st_size, 0)
732
Steve Dower772ec0f2019-09-04 14:42:54 -0700733 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
734 def test_stat_block_device(self):
735 # bpo-38030: os.stat fails for block devices
736 # Test a filename like "//./C:"
737 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
738 result = os.stat(fname)
739 self.assertEqual(result.st_mode, stat.S_IFBLK)
740
Victor Stinner47aacc82015-06-12 17:26:23 +0200741
742class UtimeTests(unittest.TestCase):
743 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800744 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200745 self.fname = os.path.join(self.dirname, "f1")
746
Hai Shi0c4f0f32020-06-30 21:46:31 +0800747 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200748 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100749 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200750
Victor Stinner47aacc82015-06-12 17:26:23 +0200751 def support_subsecond(self, filename):
752 # Heuristic to check if the filesystem supports timestamp with
753 # subsecond resolution: check if float and int timestamps are different
754 st = os.stat(filename)
755 return ((st.st_atime != st[7])
756 or (st.st_mtime != st[8])
757 or (st.st_ctime != st[9]))
758
759 def _test_utime(self, set_time, filename=None):
760 if not filename:
761 filename = self.fname
762
763 support_subsecond = self.support_subsecond(filename)
764 if support_subsecond:
765 # Timestamp with a resolution of 1 microsecond (10^-6).
766 #
767 # The resolution of the C internal function used by os.utime()
768 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
769 # test with a resolution of 1 ns requires more work:
770 # see the issue #15745.
771 atime_ns = 1002003000 # 1.002003 seconds
772 mtime_ns = 4005006000 # 4.005006 seconds
773 else:
774 # use a resolution of 1 second
775 atime_ns = 5 * 10**9
776 mtime_ns = 8 * 10**9
777
778 set_time(filename, (atime_ns, mtime_ns))
779 st = os.stat(filename)
780
781 if support_subsecond:
782 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
783 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
784 else:
785 self.assertEqual(st.st_atime, atime_ns * 1e-9)
786 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
787 self.assertEqual(st.st_atime_ns, atime_ns)
788 self.assertEqual(st.st_mtime_ns, mtime_ns)
789
790 def test_utime(self):
791 def set_time(filename, ns):
792 # test the ns keyword parameter
793 os.utime(filename, ns=ns)
794 self._test_utime(set_time)
795
796 @staticmethod
797 def ns_to_sec(ns):
798 # Convert a number of nanosecond (int) to a number of seconds (float).
799 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
800 # issue, os.utime() rounds towards minus infinity.
801 return (ns * 1e-9) + 0.5e-9
802
803 def test_utime_by_indexed(self):
804 # pass times as floating point seconds as the second indexed parameter
805 def set_time(filename, ns):
806 atime_ns, mtime_ns = ns
807 atime = self.ns_to_sec(atime_ns)
808 mtime = self.ns_to_sec(mtime_ns)
809 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
810 # or utime(time_t)
811 os.utime(filename, (atime, mtime))
812 self._test_utime(set_time)
813
814 def test_utime_by_times(self):
815 def set_time(filename, ns):
816 atime_ns, mtime_ns = ns
817 atime = self.ns_to_sec(atime_ns)
818 mtime = self.ns_to_sec(mtime_ns)
819 # test the times keyword parameter
820 os.utime(filename, times=(atime, mtime))
821 self._test_utime(set_time)
822
823 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
824 "follow_symlinks support for utime required "
825 "for this test.")
826 def test_utime_nofollow_symlinks(self):
827 def set_time(filename, ns):
828 # use follow_symlinks=False to test utimensat(timespec)
829 # or lutimes(timeval)
830 os.utime(filename, ns=ns, follow_symlinks=False)
831 self._test_utime(set_time)
832
833 @unittest.skipUnless(os.utime in os.supports_fd,
834 "fd support for utime required for this test.")
835 def test_utime_fd(self):
836 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100837 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200838 # use a file descriptor to test futimens(timespec)
839 # or futimes(timeval)
840 os.utime(fp.fileno(), ns=ns)
841 self._test_utime(set_time)
842
843 @unittest.skipUnless(os.utime in os.supports_dir_fd,
844 "dir_fd support for utime required for this test.")
845 def test_utime_dir_fd(self):
846 def set_time(filename, ns):
847 dirname, name = os.path.split(filename)
848 dirfd = os.open(dirname, os.O_RDONLY)
849 try:
850 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
851 os.utime(name, dir_fd=dirfd, ns=ns)
852 finally:
853 os.close(dirfd)
854 self._test_utime(set_time)
855
856 def test_utime_directory(self):
857 def set_time(filename, ns):
858 # test calling os.utime() on a directory
859 os.utime(filename, ns=ns)
860 self._test_utime(set_time, filename=self.dirname)
861
862 def _test_utime_current(self, set_time):
863 # Get the system clock
864 current = time.time()
865
866 # Call os.utime() to set the timestamp to the current system clock
867 set_time(self.fname)
868
869 if not self.support_subsecond(self.fname):
870 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700871 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200872 # On Windows, the usual resolution of time.time() is 15.6 ms.
873 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700874 #
875 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
876 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200877 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200878 st = os.stat(self.fname)
879 msg = ("st_time=%r, current=%r, dt=%r"
880 % (st.st_mtime, current, st.st_mtime - current))
881 self.assertAlmostEqual(st.st_mtime, current,
882 delta=delta, msg=msg)
883
884 def test_utime_current(self):
885 def set_time(filename):
886 # Set to the current time in the new way
887 os.utime(self.fname)
888 self._test_utime_current(set_time)
889
890 def test_utime_current_old(self):
891 def set_time(filename):
892 # Set to the current time in the old explicit way.
893 os.utime(self.fname, None)
894 self._test_utime_current(set_time)
895
896 def get_file_system(self, path):
897 if sys.platform == 'win32':
898 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
899 import ctypes
900 kernel32 = ctypes.windll.kernel32
901 buf = ctypes.create_unicode_buffer("", 100)
902 ok = kernel32.GetVolumeInformationW(root, None, 0,
903 None, None, None,
904 buf, len(buf))
905 if ok:
906 return buf.value
907 # return None if the filesystem is unknown
908
909 def test_large_time(self):
910 # Many filesystems are limited to the year 2038. At least, the test
911 # pass with NTFS filesystem.
912 if self.get_file_system(self.dirname) != "NTFS":
913 self.skipTest("requires NTFS")
914
915 large = 5000000000 # some day in 2128
916 os.utime(self.fname, (large, large))
917 self.assertEqual(os.stat(self.fname).st_mtime, large)
918
919 def test_utime_invalid_arguments(self):
920 # seconds and nanoseconds parameters are mutually exclusive
921 with self.assertRaises(ValueError):
922 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200923 with self.assertRaises(TypeError):
924 os.utime(self.fname, [5, 5])
925 with self.assertRaises(TypeError):
926 os.utime(self.fname, (5,))
927 with self.assertRaises(TypeError):
928 os.utime(self.fname, (5, 5, 5))
929 with self.assertRaises(TypeError):
930 os.utime(self.fname, ns=[5, 5])
931 with self.assertRaises(TypeError):
932 os.utime(self.fname, ns=(5,))
933 with self.assertRaises(TypeError):
934 os.utime(self.fname, ns=(5, 5, 5))
935
936 if os.utime not in os.supports_follow_symlinks:
937 with self.assertRaises(NotImplementedError):
938 os.utime(self.fname, (5, 5), follow_symlinks=False)
939 if os.utime not in os.supports_fd:
940 with open(self.fname, 'wb', 0) as fp:
941 with self.assertRaises(TypeError):
942 os.utime(fp.fileno(), (5, 5))
943 if os.utime not in os.supports_dir_fd:
944 with self.assertRaises(NotImplementedError):
945 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200946
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300947 @support.cpython_only
948 def test_issue31577(self):
949 # The interpreter shouldn't crash in case utime() received a bad
950 # ns argument.
951 def get_bad_int(divmod_ret_val):
952 class BadInt:
953 def __divmod__(*args):
954 return divmod_ret_val
955 return BadInt()
956 with self.assertRaises(TypeError):
957 os.utime(self.fname, ns=(get_bad_int(42), 1))
958 with self.assertRaises(TypeError):
959 os.utime(self.fname, ns=(get_bad_int(()), 1))
960 with self.assertRaises(TypeError):
961 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
962
Victor Stinner47aacc82015-06-12 17:26:23 +0200963
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000964from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000965
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000966class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000967 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000968 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000969
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000970 def setUp(self):
971 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000972 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000973 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000974 for key, value in self._reference().items():
975 os.environ[key] = value
976
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000977 def tearDown(self):
978 os.environ.clear()
979 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000980 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000981 os.environb.clear()
982 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000983
Christian Heimes90333392007-11-01 19:08:42 +0000984 def _reference(self):
985 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
986
987 def _empty_mapping(self):
988 os.environ.clear()
989 return os.environ
990
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000991 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200992 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
993 'requires a shell')
pxinwre1e3c2d2020-12-16 05:20:07 +0800994 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
Martin v. Löwis5510f652005-02-17 21:23:20 +0000995 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000996 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300997 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200998 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300999 value = popen.read().strip()
1000 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +00001001
Xavier de Gayed1415312016-07-22 12:15:29 +02001002 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1003 'requires a shell')
pxinwre1e3c2d2020-12-16 05:20:07 +08001004 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
Christian Heimes1a13d592007-11-08 14:16:55 +00001005 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +02001006 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
1007 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +03001008 it = iter(popen)
1009 self.assertEqual(next(it), "line1\n")
1010 self.assertEqual(next(it), "line2\n")
1011 self.assertEqual(next(it), "line3\n")
1012 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +00001013
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001014 # Verify environ keys and values from the OS are of the
1015 # correct str type.
1016 def test_keyvalue_types(self):
1017 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001018 self.assertEqual(type(key), str)
1019 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001020
Christian Heimes90333392007-11-01 19:08:42 +00001021 def test_items(self):
1022 for key, value in self._reference().items():
1023 self.assertEqual(os.environ.get(key), value)
1024
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001025 # Issue 7310
1026 def test___repr__(self):
1027 """Check that the repr() of os.environ looks like environ({...})."""
1028 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +00001029 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
1030 '{!r}: {!r}'.format(key, value)
1031 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001032
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +00001033 def test_get_exec_path(self):
1034 defpath_list = os.defpath.split(os.pathsep)
1035 test_path = ['/monty', '/python', '', '/flying/circus']
1036 test_env = {'PATH': os.pathsep.join(test_path)}
1037
1038 saved_environ = os.environ
1039 try:
1040 os.environ = dict(test_env)
1041 # Test that defaulting to os.environ works.
1042 self.assertSequenceEqual(test_path, os.get_exec_path())
1043 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1044 finally:
1045 os.environ = saved_environ
1046
1047 # No PATH environment variable
1048 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1049 # Empty PATH environment variable
1050 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1051 # Supplied PATH environment variable
1052 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1053
Victor Stinnerb745a742010-05-18 17:17:23 +00001054 if os.supports_bytes_environ:
1055 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +00001056 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001057 # ignore BytesWarning warning
1058 with warnings.catch_warnings(record=True):
1059 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +00001060 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001061 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +00001062 pass
1063 else:
1064 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +00001065
1066 # bytes key and/or value
1067 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1068 ['abc'])
1069 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1070 ['abc'])
1071 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1072 ['abc'])
1073
1074 @unittest.skipUnless(os.supports_bytes_environ,
1075 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +00001076 def test_environb(self):
1077 # os.environ -> os.environb
1078 value = 'euro\u20ac'
1079 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001080 value_bytes = value.encode(sys.getfilesystemencoding(),
1081 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +00001082 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001083 msg = "U+20AC character is not encodable to %s" % (
1084 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +00001085 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +00001086 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(os.environ['unicode'], value)
1088 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +00001089
1090 # os.environb -> os.environ
1091 value = b'\xff'
1092 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001093 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +00001094 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001095 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001096
Victor Stinner161e7b32020-01-24 11:53:44 +01001097 def test_putenv_unsetenv(self):
1098 name = "PYTHONTESTVAR"
1099 value = "testvalue"
1100 code = f'import os; print(repr(os.environ.get({name!r})))'
1101
Hai Shi0c4f0f32020-06-30 21:46:31 +08001102 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +01001103 env.pop(name, None)
1104
1105 os.putenv(name, value)
1106 proc = subprocess.run([sys.executable, '-c', code], check=True,
1107 stdout=subprocess.PIPE, text=True)
1108 self.assertEqual(proc.stdout.rstrip(), repr(value))
1109
1110 os.unsetenv(name)
1111 proc = subprocess.run([sys.executable, '-c', code], check=True,
1112 stdout=subprocess.PIPE, text=True)
1113 self.assertEqual(proc.stdout.rstrip(), repr(None))
1114
Victor Stinner13ff2452018-01-22 18:32:50 +01001115 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +01001116 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +01001117 def test_putenv_unsetenv_error(self):
1118 # Empty variable name is invalid.
1119 # "=" and null character are not allowed in a variable name.
1120 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1121 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1122 self.assertRaises((OSError, ValueError), os.unsetenv, name)
1123
Victor Stinnerb73dd022020-01-22 21:11:17 +01001124 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +01001125 # On Windows, an environment variable string ("name=value" string)
1126 # is limited to 32,767 characters
1127 longstr = 'x' * 32_768
1128 self.assertRaises(ValueError, os.putenv, longstr, "1")
1129 self.assertRaises(ValueError, os.putenv, "X", longstr)
1130 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +01001131
Victor Stinner6d101392013-04-14 16:35:04 +02001132 def test_key_type(self):
1133 missing = 'missingkey'
1134 self.assertNotIn(missing, os.environ)
1135
Victor Stinner839e5ea2013-04-14 16:43:03 +02001136 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001137 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001138 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001139 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001140
Victor Stinner839e5ea2013-04-14 16:43:03 +02001141 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001142 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001143 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001144 self.assertTrue(cm.exception.__suppress_context__)
1145
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001146 def _test_environ_iteration(self, collection):
1147 iterator = iter(collection)
1148 new_key = "__new_key__"
1149
1150 next(iterator) # start iteration over os.environ.items
1151
1152 # add a new key in os.environ mapping
1153 os.environ[new_key] = "test_environ_iteration"
1154
1155 try:
1156 next(iterator) # force iteration over modified mapping
1157 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1158 finally:
1159 del os.environ[new_key]
1160
1161 def test_iter_error_when_changing_os_environ(self):
1162 self._test_environ_iteration(os.environ)
1163
1164 def test_iter_error_when_changing_os_environ_items(self):
1165 self._test_environ_iteration(os.environ.items())
1166
1167 def test_iter_error_when_changing_os_environ_values(self):
1168 self._test_environ_iteration(os.environ.values())
1169
Charles Burklandd648ef12020-03-13 09:04:43 -07001170 def _test_underlying_process_env(self, var, expected):
1171 if not (unix_shell and os.path.exists(unix_shell)):
1172 return
1173
1174 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1175 value = popen.read().strip()
1176
1177 self.assertEqual(expected, value)
1178
1179 def test_or_operator(self):
1180 overridden_key = '_TEST_VAR_'
1181 original_value = 'original_value'
1182 os.environ[overridden_key] = original_value
1183
1184 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1185 expected = dict(os.environ)
1186 expected.update(new_vars_dict)
1187
1188 actual = os.environ | new_vars_dict
1189 self.assertDictEqual(expected, actual)
1190 self.assertEqual('3', actual[overridden_key])
1191
1192 new_vars_items = new_vars_dict.items()
1193 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1194
1195 self._test_underlying_process_env('_A_', '')
1196 self._test_underlying_process_env(overridden_key, original_value)
1197
1198 def test_ior_operator(self):
1199 overridden_key = '_TEST_VAR_'
1200 os.environ[overridden_key] = 'original_value'
1201
1202 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1203 expected = dict(os.environ)
1204 expected.update(new_vars_dict)
1205
1206 os.environ |= new_vars_dict
1207 self.assertEqual(expected, os.environ)
1208 self.assertEqual('3', os.environ[overridden_key])
1209
1210 self._test_underlying_process_env('_A_', '1')
1211 self._test_underlying_process_env(overridden_key, '3')
1212
1213 def test_ior_operator_invalid_dicts(self):
1214 os_environ_copy = os.environ.copy()
1215 with self.assertRaises(TypeError):
1216 dict_with_bad_key = {1: '_A_'}
1217 os.environ |= dict_with_bad_key
1218
1219 with self.assertRaises(TypeError):
1220 dict_with_bad_val = {'_A_': 1}
1221 os.environ |= dict_with_bad_val
1222
1223 # Check nothing was added.
1224 self.assertEqual(os_environ_copy, os.environ)
1225
1226 def test_ior_operator_key_value_iterable(self):
1227 overridden_key = '_TEST_VAR_'
1228 os.environ[overridden_key] = 'original_value'
1229
1230 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1231 expected = dict(os.environ)
1232 expected.update(new_vars_items)
1233
1234 os.environ |= new_vars_items
1235 self.assertEqual(expected, os.environ)
1236 self.assertEqual('3', os.environ[overridden_key])
1237
1238 self._test_underlying_process_env('_A_', '1')
1239 self._test_underlying_process_env(overridden_key, '3')
1240
1241 def test_ror_operator(self):
1242 overridden_key = '_TEST_VAR_'
1243 original_value = 'original_value'
1244 os.environ[overridden_key] = original_value
1245
1246 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1247 expected = dict(new_vars_dict)
1248 expected.update(os.environ)
1249
1250 actual = new_vars_dict | os.environ
1251 self.assertDictEqual(expected, actual)
1252 self.assertEqual(original_value, actual[overridden_key])
1253
1254 new_vars_items = new_vars_dict.items()
1255 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1256
1257 self._test_underlying_process_env('_A_', '')
1258 self._test_underlying_process_env(overridden_key, original_value)
1259
Victor Stinner6d101392013-04-14 16:35:04 +02001260
Tim Petersc4e09402003-04-25 07:11:48 +00001261class WalkTests(unittest.TestCase):
1262 """Tests for os.walk()."""
1263
Victor Stinner0561c532015-03-12 10:28:24 +01001264 # Wrapper to hide minor differences between os.walk and os.fwalk
1265 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001266 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001267 if 'follow_symlinks' in kwargs:
1268 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001269 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001270
Charles-François Natali7372b062012-02-05 15:15:38 +01001271 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001272 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001273 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001274
1275 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001276 # TESTFN/
1277 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001278 # tmp1
1279 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001280 # tmp2
1281 # SUB11/ no kids
1282 # SUB2/ a file kid and a dirsymlink kid
1283 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001284 # SUB21/ not readable
1285 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001286 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001287 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001288 # broken_link2
1289 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001290 # TEST2/
1291 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001292 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001293 self.sub1_path = join(self.walk_path, "SUB1")
1294 self.sub11_path = join(self.sub1_path, "SUB11")
1295 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001296 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001297 tmp1_path = join(self.walk_path, "tmp1")
1298 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001299 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001300 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001301 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001302 t2_path = join(os_helper.TESTFN, "TEST2")
1303 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001304 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001305 broken_link2_path = join(sub2_path, "broken_link2")
1306 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001307
1308 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001309 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001310 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001311 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001312 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001313
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001314 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001315 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001316 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001317
Hai Shi0c4f0f32020-06-30 21:46:31 +08001318 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001319 os.symlink(os.path.abspath(t2_path), self.link_path)
1320 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001321 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1322 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001323 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001324 ["broken_link", "broken_link2", "broken_link3",
1325 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001326 else:
pxinwr3e028b22019-02-15 13:04:47 +08001327 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001328
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001329 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001330 try:
1331 os.listdir(sub21_path)
1332 except PermissionError:
1333 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1334 else:
1335 os.chmod(sub21_path, stat.S_IRWXU)
1336 os.unlink(tmp5_path)
1337 os.rmdir(sub21_path)
1338 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001339
Victor Stinner0561c532015-03-12 10:28:24 +01001340 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001341 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001342 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001343
Tim Petersc4e09402003-04-25 07:11:48 +00001344 self.assertEqual(len(all), 4)
1345 # We can't know which order SUB1 and SUB2 will appear in.
1346 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1347 # flipped: TESTFN, SUB2, SUB1, SUB11
1348 flipped = all[0][1][0] != "SUB1"
1349 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001350 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001351 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001352 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1353 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1354 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1355 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001356
Brett Cannon3f9183b2016-08-26 14:44:48 -07001357 def test_walk_prune(self, walk_path=None):
1358 if walk_path is None:
1359 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001360 # Prune the search.
1361 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001362 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001363 all.append((root, dirs, files))
1364 # Don't descend into SUB1.
1365 if 'SUB1' in dirs:
1366 # Note that this also mutates the dirs we appended to all!
1367 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001368
Victor Stinner0561c532015-03-12 10:28:24 +01001369 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001370 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001371
1372 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001373 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001374 self.assertEqual(all[1], self.sub2_tree)
1375
Brett Cannon3f9183b2016-08-26 14:44:48 -07001376 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001377 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001378
Victor Stinner0561c532015-03-12 10:28:24 +01001379 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001380 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001381 all = list(self.walk(self.walk_path, topdown=False))
1382
Victor Stinner53b0a412016-03-26 01:12:36 +01001383 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001384 # We can't know which order SUB1 and SUB2 will appear in.
1385 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1386 # flipped: SUB2, SUB11, SUB1, TESTFN
1387 flipped = all[3][1][0] != "SUB1"
1388 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001389 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001390 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001391 self.assertEqual(all[3],
1392 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1393 self.assertEqual(all[flipped],
1394 (self.sub11_path, [], []))
1395 self.assertEqual(all[flipped + 1],
1396 (self.sub1_path, ["SUB11"], ["tmp2"]))
1397 self.assertEqual(all[2 - 2 * flipped],
1398 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001399
Victor Stinner0561c532015-03-12 10:28:24 +01001400 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001401 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001402 self.skipTest("need symlink support")
1403
1404 # Walk, following symlinks.
1405 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1406 for root, dirs, files in walk_it:
1407 if root == self.link_path:
1408 self.assertEqual(dirs, [])
1409 self.assertEqual(files, ["tmp4"])
1410 break
1411 else:
1412 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001413
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001414 def test_walk_bad_dir(self):
1415 # Walk top-down.
1416 errors = []
1417 walk_it = self.walk(self.walk_path, onerror=errors.append)
1418 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001419 self.assertEqual(errors, [])
1420 dir1 = 'SUB1'
1421 path1 = os.path.join(root, dir1)
1422 path1new = os.path.join(root, dir1 + '.new')
1423 os.rename(path1, path1new)
1424 try:
1425 roots = [r for r, d, f in walk_it]
1426 self.assertTrue(errors)
1427 self.assertNotIn(path1, roots)
1428 self.assertNotIn(path1new, roots)
1429 for dir2 in dirs:
1430 if dir2 != dir1:
1431 self.assertIn(os.path.join(root, dir2), roots)
1432 finally:
1433 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001434
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001435 def test_walk_many_open_files(self):
1436 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001437 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001438 p = os.path.join(base, *(['d']*depth))
1439 os.makedirs(p)
1440
1441 iters = [self.walk(base, topdown=False) for j in range(100)]
1442 for i in range(depth + 1):
1443 expected = (p, ['d'] if i else [], [])
1444 for it in iters:
1445 self.assertEqual(next(it), expected)
1446 p = os.path.dirname(p)
1447
1448 iters = [self.walk(base, topdown=True) for j in range(100)]
1449 p = base
1450 for i in range(depth + 1):
1451 expected = (p, ['d'] if i < depth else [], [])
1452 for it in iters:
1453 self.assertEqual(next(it), expected)
1454 p = os.path.join(p, 'd')
1455
Charles-François Natali7372b062012-02-05 15:15:38 +01001456
1457@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1458class FwalkTests(WalkTests):
1459 """Tests for os.fwalk()."""
1460
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001461 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001462 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001463 yield (root, dirs, files)
1464
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001465 def fwalk(self, *args, **kwargs):
1466 return os.fwalk(*args, **kwargs)
1467
Larry Hastingsc48fe982012-06-25 04:49:05 -07001468 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1469 """
1470 compare with walk() results.
1471 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001472 walk_kwargs = walk_kwargs.copy()
1473 fwalk_kwargs = fwalk_kwargs.copy()
1474 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1475 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1476 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001477
Charles-François Natali7372b062012-02-05 15:15:38 +01001478 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001479 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001480 expected[root] = (set(dirs), set(files))
1481
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001482 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001483 self.assertIn(root, expected)
1484 self.assertEqual(expected[root], (set(dirs), set(files)))
1485
Larry Hastingsc48fe982012-06-25 04:49:05 -07001486 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001487 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001488 self._compare_to_walk(kwargs, kwargs)
1489
Charles-François Natali7372b062012-02-05 15:15:38 +01001490 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001491 try:
1492 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001493 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001494 fwalk_kwargs = walk_kwargs.copy()
1495 fwalk_kwargs['dir_fd'] = fd
1496 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1497 finally:
1498 os.close(fd)
1499
1500 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001501 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001502 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001503 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001504 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001505 # check that the FD is valid
1506 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001507 # redundant check
1508 os.stat(rootfd)
1509 # check that listdir() returns consistent information
1510 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001511
1512 def test_fd_leak(self):
1513 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1514 # we both check that calling fwalk() a large number of times doesn't
1515 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1516 minfd = os.dup(1)
1517 os.close(minfd)
1518 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001519 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001520 pass
1521 newfd = os.dup(1)
1522 self.addCleanup(os.close, newfd)
1523 self.assertEqual(newfd, minfd)
1524
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001525 # fwalk() keeps file descriptors open
1526 test_walk_many_open_files = None
1527
1528
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001529class BytesWalkTests(WalkTests):
1530 """Tests for os.walk() with bytes."""
1531 def walk(self, top, **kwargs):
1532 if 'follow_symlinks' in kwargs:
1533 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1534 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1535 root = os.fsdecode(broot)
1536 dirs = list(map(os.fsdecode, bdirs))
1537 files = list(map(os.fsdecode, bfiles))
1538 yield (root, dirs, files)
1539 bdirs[:] = list(map(os.fsencode, dirs))
1540 bfiles[:] = list(map(os.fsencode, files))
1541
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001542@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1543class BytesFwalkTests(FwalkTests):
1544 """Tests for os.walk() with bytes."""
1545 def fwalk(self, top='.', *args, **kwargs):
1546 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1547 root = os.fsdecode(broot)
1548 dirs = list(map(os.fsdecode, bdirs))
1549 files = list(map(os.fsdecode, bfiles))
1550 yield (root, dirs, files, topfd)
1551 bdirs[:] = list(map(os.fsencode, dirs))
1552 bfiles[:] = list(map(os.fsencode, files))
1553
Charles-François Natali7372b062012-02-05 15:15:38 +01001554
Guido van Rossume7ba4952007-06-06 23:52:48 +00001555class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001556 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001557 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001558
1559 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001560 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001561 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1562 os.makedirs(path) # Should work
1563 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1564 os.makedirs(path)
1565
1566 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001567 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001568 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1569 os.makedirs(path)
1570 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1571 'dir5', 'dir6')
1572 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001573
Serhiy Storchakae304e332017-03-24 13:27:42 +02001574 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001575 with os_helper.temp_umask(0o002):
1576 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001577 parent = os.path.join(base, 'dir1')
1578 path = os.path.join(parent, 'dir2')
1579 os.makedirs(path, 0o555)
1580 self.assertTrue(os.path.exists(path))
1581 self.assertTrue(os.path.isdir(path))
1582 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001583 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1584 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001585
Terry Reedy5a22b652010-12-02 07:05:56 +00001586 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001587 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001588 mode = 0o777
1589 old_mask = os.umask(0o022)
1590 os.makedirs(path, mode)
1591 self.assertRaises(OSError, os.makedirs, path, mode)
1592 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001593 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001594 os.makedirs(path, mode=mode, exist_ok=True)
1595 os.umask(old_mask)
1596
Martin Pantera82642f2015-11-19 04:48:44 +00001597 # Issue #25583: A drive root could raise PermissionError on Windows
1598 os.makedirs(os.path.abspath('/'), exist_ok=True)
1599
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001600 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001601 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001602 S_ISGID = stat.S_ISGID
1603 mode = 0o777
1604 old_mask = os.umask(0o022)
1605 try:
1606 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001607 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001608 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001609 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001610 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001611 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001612 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001613 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1614 # The os should apply S_ISGID from the parent dir for us, but
1615 # this test need not depend on that behavior. Be explicit.
1616 os.makedirs(path, mode | S_ISGID)
1617 # http://bugs.python.org/issue14992
1618 # Should not fail when the bit is already set.
1619 os.makedirs(path, mode, exist_ok=True)
1620 # remove the bit.
1621 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001622 # May work even when the bit is not already set when demanded.
1623 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001624 finally:
1625 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001626
1627 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001628 base = os_helper.TESTFN
1629 path = os.path.join(os_helper.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001630 with open(path, 'w') as f:
1631 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001632 self.assertRaises(OSError, os.makedirs, path)
1633 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1634 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1635 os.remove(path)
1636
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001637 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001638 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001639 'dir4', 'dir5', 'dir6')
1640 # If the tests failed, the bottom-most directory ('../dir6')
1641 # may not have been created, so we look for the outermost directory
1642 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001643 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001644 path = os.path.dirname(path)
1645
1646 os.removedirs(path)
1647
Andrew Svetlov405faed2012-12-25 12:18:09 +02001648
R David Murrayf2ad1732014-12-25 18:36:56 -05001649@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1650class ChownFileTests(unittest.TestCase):
1651
Berker Peksag036a71b2015-07-21 09:29:48 +03001652 @classmethod
1653 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001654 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001655
1656 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001657 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001658 uid = stat.st_uid
1659 gid = stat.st_gid
1660 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001661 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1662 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1663 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1664 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001665
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001666 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1667 def test_chown_gid(self):
1668 groups = os.getgroups()
1669 if len(groups) < 2:
1670 self.skipTest("test needs at least 2 groups")
1671
R David Murrayf2ad1732014-12-25 18:36:56 -05001672 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001673 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001674
Hai Shi0c4f0f32020-06-30 21:46:31 +08001675 os.chown(os_helper.TESTFN, uid, gid_1)
1676 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001677 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001678
Hai Shi0c4f0f32020-06-30 21:46:31 +08001679 os.chown(os_helper.TESTFN, uid, gid_2)
1680 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001681 self.assertEqual(gid, gid_2)
1682
1683 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1684 "test needs root privilege and more than one user")
1685 def test_chown_with_root(self):
1686 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001687 gid = os.stat(os_helper.TESTFN).st_gid
1688 os.chown(os_helper.TESTFN, uid_1, gid)
1689 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001690 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001691 os.chown(os_helper.TESTFN, uid_2, gid)
1692 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001693 self.assertEqual(uid, uid_2)
1694
1695 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1696 "test needs non-root account and more than one user")
1697 def test_chown_without_permission(self):
1698 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001699 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001700 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001701 os.chown(os_helper.TESTFN, uid_1, gid)
1702 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001703
Berker Peksag036a71b2015-07-21 09:29:48 +03001704 @classmethod
1705 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001706 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001707
1708
Andrew Svetlov405faed2012-12-25 12:18:09 +02001709class RemoveDirsTests(unittest.TestCase):
1710 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001711 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001712
1713 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001714 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001715
1716 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001717 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001718 os.mkdir(dira)
1719 dirb = os.path.join(dira, 'dirb')
1720 os.mkdir(dirb)
1721 os.removedirs(dirb)
1722 self.assertFalse(os.path.exists(dirb))
1723 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001724 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001725
1726 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001727 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001728 os.mkdir(dira)
1729 dirb = os.path.join(dira, 'dirb')
1730 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001731 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001732 os.removedirs(dirb)
1733 self.assertFalse(os.path.exists(dirb))
1734 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001735 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001736
1737 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001738 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001739 os.mkdir(dira)
1740 dirb = os.path.join(dira, 'dirb')
1741 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001742 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001743 with self.assertRaises(OSError):
1744 os.removedirs(dirb)
1745 self.assertTrue(os.path.exists(dirb))
1746 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001747 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001748
1749
Guido van Rossume7ba4952007-06-06 23:52:48 +00001750class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001751 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001752 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001753 f.write(b'hello')
1754 f.close()
1755 with open(os.devnull, 'rb') as f:
1756 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001757
Andrew Svetlov405faed2012-12-25 12:18:09 +02001758
Guido van Rossume7ba4952007-06-06 23:52:48 +00001759class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001760 def test_urandom_length(self):
1761 self.assertEqual(len(os.urandom(0)), 0)
1762 self.assertEqual(len(os.urandom(1)), 1)
1763 self.assertEqual(len(os.urandom(10)), 10)
1764 self.assertEqual(len(os.urandom(100)), 100)
1765 self.assertEqual(len(os.urandom(1000)), 1000)
1766
1767 def test_urandom_value(self):
1768 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001769 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001770 data2 = os.urandom(16)
1771 self.assertNotEqual(data1, data2)
1772
1773 def get_urandom_subprocess(self, count):
1774 code = '\n'.join((
1775 'import os, sys',
1776 'data = os.urandom(%s)' % count,
1777 'sys.stdout.buffer.write(data)',
1778 'sys.stdout.buffer.flush()'))
1779 out = assert_python_ok('-c', code)
1780 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001781 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001782 return stdout
1783
1784 def test_urandom_subprocess(self):
1785 data1 = self.get_urandom_subprocess(16)
1786 data2 = self.get_urandom_subprocess(16)
1787 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001788
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001789
Victor Stinner9b1f4742016-09-06 16:18:52 -07001790@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1791class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001792 @classmethod
1793 def setUpClass(cls):
1794 try:
1795 os.getrandom(1)
1796 except OSError as exc:
1797 if exc.errno == errno.ENOSYS:
1798 # Python compiled on a more recent Linux version
1799 # than the current Linux kernel
1800 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1801 else:
1802 raise
1803
Victor Stinner9b1f4742016-09-06 16:18:52 -07001804 def test_getrandom_type(self):
1805 data = os.getrandom(16)
1806 self.assertIsInstance(data, bytes)
1807 self.assertEqual(len(data), 16)
1808
1809 def test_getrandom0(self):
1810 empty = os.getrandom(0)
1811 self.assertEqual(empty, b'')
1812
1813 def test_getrandom_random(self):
1814 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1815
1816 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1817 # resource /dev/random
1818
1819 def test_getrandom_nonblock(self):
1820 # The call must not fail. Check also that the flag exists
1821 try:
1822 os.getrandom(1, os.GRND_NONBLOCK)
1823 except BlockingIOError:
1824 # System urandom is not initialized yet
1825 pass
1826
1827 def test_getrandom_value(self):
1828 data1 = os.getrandom(16)
1829 data2 = os.getrandom(16)
1830 self.assertNotEqual(data1, data2)
1831
1832
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001833# os.urandom() doesn't use a file descriptor when it is implemented with the
1834# getentropy() function, the getrandom() function or the getrandom() syscall
1835OS_URANDOM_DONT_USE_FD = (
1836 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1837 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1838 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001839
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001840@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1841 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001842@unittest.skipIf(sys.platform == "vxworks",
1843 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001844class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001845 @unittest.skipUnless(resource, "test requires the resource module")
1846 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001847 # Check urandom() failing when it is not able to open /dev/random.
1848 # We spawn a new process to make the test more robust (if getrlimit()
1849 # failed to restore the file descriptor limit after this, the whole
1850 # test suite would crash; this actually happened on the OS X Tiger
1851 # buildbot).
1852 code = """if 1:
1853 import errno
1854 import os
1855 import resource
1856
1857 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1858 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1859 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001860 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001861 except OSError as e:
1862 assert e.errno == errno.EMFILE, e.errno
1863 else:
1864 raise AssertionError("OSError not raised")
1865 """
1866 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001867
Antoine Pitroue472aea2014-04-26 14:33:03 +02001868 def test_urandom_fd_closed(self):
1869 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1870 # closed.
1871 code = """if 1:
1872 import os
1873 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001874 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001875 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001876 with test.support.SuppressCrashReport():
1877 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001878 sys.stdout.buffer.write(os.urandom(4))
1879 """
1880 rc, out, err = assert_python_ok('-Sc', code)
1881
1882 def test_urandom_fd_reopened(self):
1883 # Issue #21207: urandom() should detect its fd to /dev/urandom
1884 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001885 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1886 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001887
Antoine Pitroue472aea2014-04-26 14:33:03 +02001888 code = """if 1:
1889 import os
1890 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001891 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001892 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001893 with test.support.SuppressCrashReport():
1894 for fd in range(3, 256):
1895 try:
1896 os.close(fd)
1897 except OSError:
1898 pass
1899 else:
1900 # Found the urandom fd (XXX hopefully)
1901 break
1902 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001903 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001904 new_fd = f.fileno()
1905 # Issue #26935: posix allows new_fd and fd to be equal but
1906 # some libc implementations have dup2 return an error in this
1907 # case.
1908 if new_fd != fd:
1909 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001910 sys.stdout.buffer.write(os.urandom(4))
1911 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001912 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001913 rc, out, err = assert_python_ok('-Sc', code)
1914 self.assertEqual(len(out), 8)
1915 self.assertNotEqual(out[0:4], out[4:8])
1916 rc, out2, err2 = assert_python_ok('-Sc', code)
1917 self.assertEqual(len(out2), 8)
1918 self.assertNotEqual(out2, out)
1919
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001920
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001921@contextlib.contextmanager
1922def _execvpe_mockup(defpath=None):
1923 """
1924 Stubs out execv and execve functions when used as context manager.
1925 Records exec calls. The mock execv and execve functions always raise an
1926 exception as they would normally never return.
1927 """
1928 # A list of tuples containing (function name, first arg, args)
1929 # of calls to execv or execve that have been made.
1930 calls = []
1931
1932 def mock_execv(name, *args):
1933 calls.append(('execv', name, args))
1934 raise RuntimeError("execv called")
1935
1936 def mock_execve(name, *args):
1937 calls.append(('execve', name, args))
1938 raise OSError(errno.ENOTDIR, "execve called")
1939
1940 try:
1941 orig_execv = os.execv
1942 orig_execve = os.execve
1943 orig_defpath = os.defpath
1944 os.execv = mock_execv
1945 os.execve = mock_execve
1946 if defpath is not None:
1947 os.defpath = defpath
1948 yield calls
1949 finally:
1950 os.execv = orig_execv
1951 os.execve = orig_execve
1952 os.defpath = orig_defpath
1953
pxinwrf2d7ac72019-05-21 18:46:37 +08001954@unittest.skipUnless(hasattr(os, 'execv'),
1955 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001956class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001957 @unittest.skipIf(USING_LINUXTHREADS,
1958 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001959 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001960 self.assertRaises(OSError, os.execvpe, 'no such app-',
1961 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001962
Steve Dowerbce26262016-11-19 19:17:26 -08001963 def test_execv_with_bad_arglist(self):
1964 self.assertRaises(ValueError, os.execv, 'notepad', ())
1965 self.assertRaises(ValueError, os.execv, 'notepad', [])
1966 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1967 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1968
Thomas Heller6790d602007-08-30 17:15:14 +00001969 def test_execvpe_with_bad_arglist(self):
1970 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001971 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1972 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001973
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001974 @unittest.skipUnless(hasattr(os, '_execvpe'),
1975 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001976 def _test_internal_execvpe(self, test_type):
1977 program_path = os.sep + 'absolutepath'
1978 if test_type is bytes:
1979 program = b'executable'
1980 fullpath = os.path.join(os.fsencode(program_path), program)
1981 native_fullpath = fullpath
1982 arguments = [b'progname', 'arg1', 'arg2']
1983 else:
1984 program = 'executable'
1985 arguments = ['progname', 'arg1', 'arg2']
1986 fullpath = os.path.join(program_path, program)
1987 if os.name != "nt":
1988 native_fullpath = os.fsencode(fullpath)
1989 else:
1990 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001991 env = {'spam': 'beans'}
1992
Victor Stinnerb745a742010-05-18 17:17:23 +00001993 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001994 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001995 self.assertRaises(RuntimeError,
1996 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001997 self.assertEqual(len(calls), 1)
1998 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1999
Victor Stinnerb745a742010-05-18 17:17:23 +00002000 # test os._execvpe() with a relative path:
2001 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002002 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00002003 self.assertRaises(OSError,
2004 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002005 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00002006 self.assertSequenceEqual(calls[0],
2007 ('execve', native_fullpath, (arguments, env)))
2008
2009 # test os._execvpe() with a relative path:
2010 # os.get_exec_path() reads the 'PATH' variable
2011 with _execvpe_mockup() as calls:
2012 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00002013 if test_type is bytes:
2014 env_path[b'PATH'] = program_path
2015 else:
2016 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00002017 self.assertRaises(OSError,
2018 os._execvpe, program, arguments, env=env_path)
2019 self.assertEqual(len(calls), 1)
2020 self.assertSequenceEqual(calls[0],
2021 ('execve', native_fullpath, (arguments, env_path)))
2022
2023 def test_internal_execvpe_str(self):
2024 self._test_internal_execvpe(str)
2025 if os.name != "nt":
2026 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002027
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002028 def test_execve_invalid_env(self):
2029 args = [sys.executable, '-c', 'pass']
2030
Ville Skyttä49b27342017-08-03 09:00:59 +03002031 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002032 newenv = os.environ.copy()
2033 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2034 with self.assertRaises(ValueError):
2035 os.execve(args[0], args, newenv)
2036
Ville Skyttä49b27342017-08-03 09:00:59 +03002037 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002038 newenv = os.environ.copy()
2039 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2040 with self.assertRaises(ValueError):
2041 os.execve(args[0], args, newenv)
2042
Ville Skyttä49b27342017-08-03 09:00:59 +03002043 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002044 newenv = os.environ.copy()
2045 newenv["FRUIT=ORANGE"] = "lemon"
2046 with self.assertRaises(ValueError):
2047 os.execve(args[0], args, newenv)
2048
Alexey Izbyshev83460312018-10-20 03:28:22 +03002049 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2050 def test_execve_with_empty_path(self):
2051 # bpo-32890: Check GetLastError() misuse
2052 try:
2053 os.execve('', ['arg'], {})
2054 except OSError as e:
2055 self.assertTrue(e.winerror is None or e.winerror != 0)
2056 else:
2057 self.fail('No OSError raised')
2058
Gregory P. Smith4ae37772010-05-08 18:05:46 +00002059
Serhiy Storchaka43767632013-11-03 21:31:38 +02002060@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002061class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01002062 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01002063 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002064 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01002065 except FileNotFoundError:
2066 exists = False
2067 except OSError as exc:
2068 exists = True
2069 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08002070 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01002071 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002072 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01002073
Thomas Wouters477c8d52006-05-27 19:21:47 +00002074 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002075 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002076
2077 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002078 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002079
2080 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002081 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002082
2083 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002084 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002085
Hai Shi0c4f0f32020-06-30 21:46:31 +08002086 with open(os_helper.TESTFN, "x") as f:
2087 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002088
2089 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002090 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002091
Thomas Wouters477c8d52006-05-27 19:21:47 +00002092 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002093 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002094
Victor Stinnere77c9742016-03-25 10:28:23 +01002095
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002096class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00002097 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002098 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2099 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07002100 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002101 def get_single(f):
2102 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00002103 if hasattr(os, f):
2104 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002105 return helper
2106 for f in singles:
2107 locals()["test_"+f] = get_single(f)
2108
Benjamin Peterson7522c742009-01-19 21:00:09 +00002109 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002110 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002111 f(os_helper.make_bad_fd(), *args)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002112 except OSError as e:
2113 self.assertEqual(e.errno, errno.EBADF)
2114 else:
Martin Panter7462b6492015-11-02 03:37:02 +00002115 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002116 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00002117
Serhiy Storchaka43767632013-11-03 21:31:38 +02002118 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002119 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002120 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002121
Serhiy Storchaka43767632013-11-03 21:31:38 +02002122 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002123 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002124 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002125 # Make sure none of the descriptors we are about to close are
2126 # currently valid (issue 6542).
2127 for i in range(10):
2128 try: os.fstat(fd+i)
2129 except OSError:
2130 pass
2131 else:
2132 break
2133 if i < 2:
2134 raise unittest.SkipTest(
2135 "Unable to acquire a range of invalid file descriptors")
2136 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002137
Serhiy Storchaka43767632013-11-03 21:31:38 +02002138 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002139 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002140 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002141
Serhiy Storchaka43767632013-11-03 21:31:38 +02002142 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002143 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002144 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002145
Serhiy Storchaka43767632013-11-03 21:31:38 +02002146 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002147 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002148 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002149
Serhiy Storchaka43767632013-11-03 21:31:38 +02002150 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002151 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002152 self.check(os.pathconf, "PC_NAME_MAX")
2153 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002154
Serhiy Storchaka43767632013-11-03 21:31:38 +02002155 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002156 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002157 self.check(os.truncate, 0)
2158 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002159
Serhiy Storchaka43767632013-11-03 21:31:38 +02002160 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002161 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002162 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002163
Serhiy Storchaka43767632013-11-03 21:31:38 +02002164 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002165 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002166 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002167
Victor Stinner57ddf782014-01-08 15:21:28 +01002168 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2169 def test_readv(self):
2170 buf = bytearray(10)
2171 self.check(os.readv, [buf])
2172
Serhiy Storchaka43767632013-11-03 21:31:38 +02002173 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002174 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002175 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002176
Serhiy Storchaka43767632013-11-03 21:31:38 +02002177 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002178 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002179 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002180
Victor Stinner57ddf782014-01-08 15:21:28 +01002181 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2182 def test_writev(self):
2183 self.check(os.writev, [b'abc'])
2184
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002185 def test_inheritable(self):
2186 self.check(os.get_inheritable)
2187 self.check(os.set_inheritable, True)
2188
2189 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2190 'needs os.get_blocking() and os.set_blocking()')
2191 def test_blocking(self):
2192 self.check(os.get_blocking)
2193 self.check(os.set_blocking, True)
2194
Brian Curtin1b9df392010-11-24 20:24:31 +00002195
2196class LinkTests(unittest.TestCase):
2197 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002198 self.file1 = os_helper.TESTFN
2199 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002200
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002201 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002202 for file in (self.file1, self.file2):
2203 if os.path.exists(file):
2204 os.unlink(file)
2205
Brian Curtin1b9df392010-11-24 20:24:31 +00002206 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002207 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002208
xdegaye6a55d092017-11-12 17:57:04 +01002209 try:
2210 os.link(file1, file2)
2211 except PermissionError as e:
2212 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002213 with open(file1, "r") as f1, open(file2, "r") as f2:
2214 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2215
2216 def test_link(self):
2217 self._test_link(self.file1, self.file2)
2218
2219 def test_link_bytes(self):
2220 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2221 bytes(self.file2, sys.getfilesystemencoding()))
2222
Brian Curtinf498b752010-11-30 15:54:04 +00002223 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002224 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002225 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002226 except UnicodeError:
2227 raise unittest.SkipTest("Unable to encode for this platform.")
2228
Brian Curtinf498b752010-11-30 15:54:04 +00002229 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002230 self.file2 = self.file1 + "2"
2231 self._test_link(self.file1, self.file2)
2232
Serhiy Storchaka43767632013-11-03 21:31:38 +02002233@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2234class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002235 # uid_t and gid_t are 32-bit unsigned integers on Linux
2236 UID_OVERFLOW = (1 << 32)
2237 GID_OVERFLOW = (1 << 32)
2238
Serhiy Storchaka43767632013-11-03 21:31:38 +02002239 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2240 def test_setuid(self):
2241 if os.getuid() != 0:
2242 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002243 self.assertRaises(TypeError, os.setuid, 'not an int')
2244 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002245
Serhiy Storchaka43767632013-11-03 21:31:38 +02002246 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2247 def test_setgid(self):
2248 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2249 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002250 self.assertRaises(TypeError, os.setgid, 'not an int')
2251 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002252
Serhiy Storchaka43767632013-11-03 21:31:38 +02002253 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2254 def test_seteuid(self):
2255 if os.getuid() != 0:
2256 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002257 self.assertRaises(TypeError, os.setegid, 'not an int')
2258 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002259
Serhiy Storchaka43767632013-11-03 21:31:38 +02002260 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2261 def test_setegid(self):
2262 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2263 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002264 self.assertRaises(TypeError, os.setegid, 'not an int')
2265 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002266
Serhiy Storchaka43767632013-11-03 21:31:38 +02002267 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2268 def test_setreuid(self):
2269 if os.getuid() != 0:
2270 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002271 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2272 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2273 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2274 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002275
Serhiy Storchaka43767632013-11-03 21:31:38 +02002276 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2277 def test_setreuid_neg1(self):
2278 # Needs to accept -1. We run this in a subprocess to avoid
2279 # altering the test runner's process state (issue8045).
2280 subprocess.check_call([
2281 sys.executable, '-c',
2282 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002283
Serhiy Storchaka43767632013-11-03 21:31:38 +02002284 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2285 def test_setregid(self):
2286 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2287 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002288 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2289 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2290 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2291 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002292
Serhiy Storchaka43767632013-11-03 21:31:38 +02002293 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2294 def test_setregid_neg1(self):
2295 # Needs to accept -1. We run this in a subprocess to avoid
2296 # altering the test runner's process state (issue8045).
2297 subprocess.check_call([
2298 sys.executable, '-c',
2299 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002300
Serhiy Storchaka43767632013-11-03 21:31:38 +02002301@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2302class Pep383Tests(unittest.TestCase):
2303 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002304 if os_helper.TESTFN_UNENCODABLE:
2305 self.dir = os_helper.TESTFN_UNENCODABLE
2306 elif os_helper.TESTFN_NONASCII:
2307 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002308 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002309 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002310 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002311
Serhiy Storchaka43767632013-11-03 21:31:38 +02002312 bytesfn = []
2313 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002314 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002315 fn = os.fsencode(fn)
2316 except UnicodeEncodeError:
2317 return
2318 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002319 add_filename(os_helper.TESTFN_UNICODE)
2320 if os_helper.TESTFN_UNENCODABLE:
2321 add_filename(os_helper.TESTFN_UNENCODABLE)
2322 if os_helper.TESTFN_NONASCII:
2323 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002324 if not bytesfn:
2325 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002326
Serhiy Storchaka43767632013-11-03 21:31:38 +02002327 self.unicodefn = set()
2328 os.mkdir(self.dir)
2329 try:
2330 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002331 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002332 fn = os.fsdecode(fn)
2333 if fn in self.unicodefn:
2334 raise ValueError("duplicate filename")
2335 self.unicodefn.add(fn)
2336 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002337 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002338 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002339
Serhiy Storchaka43767632013-11-03 21:31:38 +02002340 def tearDown(self):
2341 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002342
Serhiy Storchaka43767632013-11-03 21:31:38 +02002343 def test_listdir(self):
2344 expected = self.unicodefn
2345 found = set(os.listdir(self.dir))
2346 self.assertEqual(found, expected)
2347 # test listdir without arguments
2348 current_directory = os.getcwd()
2349 try:
2350 os.chdir(os.sep)
2351 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2352 finally:
2353 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002354
Serhiy Storchaka43767632013-11-03 21:31:38 +02002355 def test_open(self):
2356 for fn in self.unicodefn:
2357 f = open(os.path.join(self.dir, fn), 'rb')
2358 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002359
Serhiy Storchaka43767632013-11-03 21:31:38 +02002360 @unittest.skipUnless(hasattr(os, 'statvfs'),
2361 "need os.statvfs()")
2362 def test_statvfs(self):
2363 # issue #9645
2364 for fn in self.unicodefn:
2365 # should not fail with file not found error
2366 fullname = os.path.join(self.dir, fn)
2367 os.statvfs(fullname)
2368
2369 def test_stat(self):
2370 for fn in self.unicodefn:
2371 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002372
Brian Curtineb24d742010-04-12 17:16:38 +00002373@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2374class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002375 def _kill(self, sig):
2376 # Start sys.executable as a subprocess and communicate from the
2377 # subprocess to the parent that the interpreter is ready. When it
2378 # becomes ready, send *sig* via os.kill to the subprocess and check
2379 # that the return code is equal to *sig*.
2380 import ctypes
2381 from ctypes import wintypes
2382 import msvcrt
2383
2384 # Since we can't access the contents of the process' stdout until the
2385 # process has exited, use PeekNamedPipe to see what's inside stdout
2386 # without waiting. This is done so we can tell that the interpreter
2387 # is started and running at a point where it could handle a signal.
2388 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2389 PeekNamedPipe.restype = wintypes.BOOL
2390 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2391 ctypes.POINTER(ctypes.c_char), # stdout buf
2392 wintypes.DWORD, # Buffer size
2393 ctypes.POINTER(wintypes.DWORD), # bytes read
2394 ctypes.POINTER(wintypes.DWORD), # bytes avail
2395 ctypes.POINTER(wintypes.DWORD)) # bytes left
2396 msg = "running"
2397 proc = subprocess.Popen([sys.executable, "-c",
2398 "import sys;"
2399 "sys.stdout.write('{}');"
2400 "sys.stdout.flush();"
2401 "input()".format(msg)],
2402 stdout=subprocess.PIPE,
2403 stderr=subprocess.PIPE,
2404 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002405 self.addCleanup(proc.stdout.close)
2406 self.addCleanup(proc.stderr.close)
2407 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002408
2409 count, max = 0, 100
2410 while count < max and proc.poll() is None:
2411 # Create a string buffer to store the result of stdout from the pipe
2412 buf = ctypes.create_string_buffer(len(msg))
2413 # Obtain the text currently in proc.stdout
2414 # Bytes read/avail/left are left as NULL and unused
2415 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2416 buf, ctypes.sizeof(buf), None, None, None)
2417 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2418 if buf.value:
2419 self.assertEqual(msg, buf.value.decode())
2420 break
2421 time.sleep(0.1)
2422 count += 1
2423 else:
2424 self.fail("Did not receive communication from the subprocess")
2425
Brian Curtineb24d742010-04-12 17:16:38 +00002426 os.kill(proc.pid, sig)
2427 self.assertEqual(proc.wait(), sig)
2428
2429 def test_kill_sigterm(self):
2430 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002431 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002432
2433 def test_kill_int(self):
2434 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002435 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002436
2437 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002438 tagname = "test_os_%s" % uuid.uuid1()
2439 m = mmap.mmap(-1, 1, tagname)
2440 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002441 # Run a script which has console control handling enabled.
2442 proc = subprocess.Popen([sys.executable,
2443 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002444 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002445 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2446 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002447 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002448 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002449 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002450 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002451 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002452 count += 1
2453 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002454 # Forcefully kill the process if we weren't able to signal it.
2455 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002456 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002457 os.kill(proc.pid, event)
2458 # proc.send_signal(event) could also be done here.
2459 # Allow time for the signal to be passed and the process to exit.
2460 time.sleep(0.5)
2461 if not proc.poll():
2462 # Forcefully kill the process if we weren't able to signal it.
2463 os.kill(proc.pid, signal.SIGINT)
2464 self.fail("subprocess did not stop on {}".format(name))
2465
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002466 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002467 def test_CTRL_C_EVENT(self):
2468 from ctypes import wintypes
2469 import ctypes
2470
2471 # Make a NULL value by creating a pointer with no argument.
2472 NULL = ctypes.POINTER(ctypes.c_int)()
2473 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2474 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2475 wintypes.BOOL)
2476 SetConsoleCtrlHandler.restype = wintypes.BOOL
2477
2478 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002479 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002480 # by subprocesses.
2481 SetConsoleCtrlHandler(NULL, 0)
2482
2483 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2484
2485 def test_CTRL_BREAK_EVENT(self):
2486 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2487
2488
Brian Curtind40e6f72010-07-08 21:39:08 +00002489@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002490class Win32ListdirTests(unittest.TestCase):
2491 """Test listdir on Windows."""
2492
2493 def setUp(self):
2494 self.created_paths = []
2495 for i in range(2):
2496 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002497 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002498 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002499 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002500 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002501 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002502 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2503 self.created_paths.extend([dir_name, file_name])
2504 self.created_paths.sort()
2505
2506 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002507 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002508
2509 def test_listdir_no_extended_path(self):
2510 """Test when the path is not an "extended" path."""
2511 # unicode
2512 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002513 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002514 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002515
Tim Golden781bbeb2013-10-25 20:24:06 +01002516 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002517 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002518 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002519 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002520
2521 def test_listdir_extended_path(self):
2522 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002523 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002524 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002525 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002526 self.assertEqual(
2527 sorted(os.listdir(path)),
2528 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002529
Tim Golden781bbeb2013-10-25 20:24:06 +01002530 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002531 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002532 self.assertEqual(
2533 sorted(os.listdir(path)),
2534 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002535
2536
Berker Peksage0b5b202018-08-15 13:03:41 +03002537@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2538class ReadlinkTests(unittest.TestCase):
2539 filelink = 'readlinktest'
2540 filelink_target = os.path.abspath(__file__)
2541 filelinkb = os.fsencode(filelink)
2542 filelinkb_target = os.fsencode(filelink_target)
2543
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002544 def assertPathEqual(self, left, right):
2545 left = os.path.normcase(left)
2546 right = os.path.normcase(right)
2547 if sys.platform == 'win32':
2548 # Bad practice to blindly strip the prefix as it may be required to
2549 # correctly refer to the file, but we're only comparing paths here.
2550 has_prefix = lambda p: p.startswith(
2551 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2552 if has_prefix(left):
2553 left = left[4:]
2554 if has_prefix(right):
2555 right = right[4:]
2556 self.assertEqual(left, right)
2557
Berker Peksage0b5b202018-08-15 13:03:41 +03002558 def setUp(self):
2559 self.assertTrue(os.path.exists(self.filelink_target))
2560 self.assertTrue(os.path.exists(self.filelinkb_target))
2561 self.assertFalse(os.path.exists(self.filelink))
2562 self.assertFalse(os.path.exists(self.filelinkb))
2563
2564 def test_not_symlink(self):
2565 filelink_target = FakePath(self.filelink_target)
2566 self.assertRaises(OSError, os.readlink, self.filelink_target)
2567 self.assertRaises(OSError, os.readlink, filelink_target)
2568
2569 def test_missing_link(self):
2570 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2571 self.assertRaises(FileNotFoundError, os.readlink,
2572 FakePath('missing-link'))
2573
Hai Shi0c4f0f32020-06-30 21:46:31 +08002574 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002575 def test_pathlike(self):
2576 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002577 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002578 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002579 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002580
Hai Shi0c4f0f32020-06-30 21:46:31 +08002581 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002582 def test_pathlike_bytes(self):
2583 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002584 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002585 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002586 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002587 self.assertIsInstance(path, bytes)
2588
Hai Shi0c4f0f32020-06-30 21:46:31 +08002589 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002590 def test_bytes(self):
2591 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002592 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002593 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002594 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002595 self.assertIsInstance(path, bytes)
2596
2597
Tim Golden781bbeb2013-10-25 20:24:06 +01002598@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002599@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002600class Win32SymlinkTests(unittest.TestCase):
2601 filelink = 'filelinktest'
2602 filelink_target = os.path.abspath(__file__)
2603 dirlink = 'dirlinktest'
2604 dirlink_target = os.path.dirname(filelink_target)
2605 missing_link = 'missing link'
2606
2607 def setUp(self):
2608 assert os.path.exists(self.dirlink_target)
2609 assert os.path.exists(self.filelink_target)
2610 assert not os.path.exists(self.dirlink)
2611 assert not os.path.exists(self.filelink)
2612 assert not os.path.exists(self.missing_link)
2613
2614 def tearDown(self):
2615 if os.path.exists(self.filelink):
2616 os.remove(self.filelink)
2617 if os.path.exists(self.dirlink):
2618 os.rmdir(self.dirlink)
2619 if os.path.lexists(self.missing_link):
2620 os.remove(self.missing_link)
2621
2622 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002623 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002624 self.assertTrue(os.path.exists(self.dirlink))
2625 self.assertTrue(os.path.isdir(self.dirlink))
2626 self.assertTrue(os.path.islink(self.dirlink))
2627 self.check_stat(self.dirlink, self.dirlink_target)
2628
2629 def test_file_link(self):
2630 os.symlink(self.filelink_target, self.filelink)
2631 self.assertTrue(os.path.exists(self.filelink))
2632 self.assertTrue(os.path.isfile(self.filelink))
2633 self.assertTrue(os.path.islink(self.filelink))
2634 self.check_stat(self.filelink, self.filelink_target)
2635
2636 def _create_missing_dir_link(self):
2637 'Create a "directory" link to a non-existent target'
2638 linkname = self.missing_link
2639 if os.path.lexists(linkname):
2640 os.remove(linkname)
2641 target = r'c:\\target does not exist.29r3c740'
2642 assert not os.path.exists(target)
2643 target_is_dir = True
2644 os.symlink(target, linkname, target_is_dir)
2645
2646 def test_remove_directory_link_to_missing_target(self):
2647 self._create_missing_dir_link()
2648 # For compatibility with Unix, os.remove will check the
2649 # directory status and call RemoveDirectory if the symlink
2650 # was created with target_is_dir==True.
2651 os.remove(self.missing_link)
2652
Brian Curtind40e6f72010-07-08 21:39:08 +00002653 def test_isdir_on_directory_link_to_missing_target(self):
2654 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002655 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002656
Brian Curtind40e6f72010-07-08 21:39:08 +00002657 def test_rmdir_on_directory_link_to_missing_target(self):
2658 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002659 os.rmdir(self.missing_link)
2660
2661 def check_stat(self, link, target):
2662 self.assertEqual(os.stat(link), os.stat(target))
2663 self.assertNotEqual(os.lstat(link), os.stat(link))
2664
Brian Curtind25aef52011-06-13 15:16:04 -05002665 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002666 self.assertEqual(os.stat(bytes_link), os.stat(target))
2667 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002668
2669 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002670 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002671 level2 = os.path.join(level1, "level2")
2672 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002673 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002674
2675 os.mkdir(level1)
2676 os.mkdir(level2)
2677 os.mkdir(level3)
2678
2679 file1 = os.path.abspath(os.path.join(level1, "file1"))
2680 create_file(file1)
2681
2682 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002683 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002684 os.chdir(level2)
2685 link = os.path.join(level2, "link")
2686 os.symlink(os.path.relpath(file1), "link")
2687 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002688
Victor Stinnerae39d232016-03-24 17:12:55 +01002689 # Check os.stat calls from the same dir as the link
2690 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002691
Victor Stinnerae39d232016-03-24 17:12:55 +01002692 # Check os.stat calls from a dir below the link
2693 os.chdir(level1)
2694 self.assertEqual(os.stat(file1),
2695 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002696
Victor Stinnerae39d232016-03-24 17:12:55 +01002697 # Check os.stat calls from a dir above the link
2698 os.chdir(level3)
2699 self.assertEqual(os.stat(file1),
2700 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002701 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002702 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002703
SSE43c34aad2018-02-13 00:10:35 +07002704 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2705 and os.path.exists(r'C:\ProgramData'),
2706 'Test directories not found')
2707 def test_29248(self):
2708 # os.symlink() calls CreateSymbolicLink, which creates
2709 # the reparse data buffer with the print name stored
2710 # first, so the offset is always 0. CreateSymbolicLink
2711 # stores the "PrintName" DOS path (e.g. "C:\") first,
2712 # with an offset of 0, followed by the "SubstituteName"
2713 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2714 # the other hand, seems to have been created manually
2715 # with an inverted order.
2716 target = os.readlink(r'C:\Users\All Users')
2717 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2718
Steve Dower6921e732018-03-05 14:26:08 -08002719 def test_buffer_overflow(self):
2720 # Older versions would have a buffer overflow when detecting
2721 # whether a link source was a directory. This test ensures we
2722 # no longer crash, but does not otherwise validate the behavior
2723 segment = 'X' * 27
2724 path = os.path.join(*[segment] * 10)
2725 test_cases = [
2726 # overflow with absolute src
2727 ('\\' + path, segment),
2728 # overflow dest with relative src
2729 (segment, path),
2730 # overflow when joining src
2731 (path[:180], path[:180]),
2732 ]
2733 for src, dest in test_cases:
2734 try:
2735 os.symlink(src, dest)
2736 except FileNotFoundError:
2737 pass
2738 else:
2739 try:
2740 os.remove(dest)
2741 except OSError:
2742 pass
2743 # Also test with bytes, since that is a separate code path.
2744 try:
2745 os.symlink(os.fsencode(src), os.fsencode(dest))
2746 except FileNotFoundError:
2747 pass
2748 else:
2749 try:
2750 os.remove(dest)
2751 except OSError:
2752 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002753
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002754 def test_appexeclink(self):
2755 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002756 if not os.path.isdir(root):
2757 self.skipTest("test requires a WindowsApps directory")
2758
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002759 aliases = [os.path.join(root, a)
2760 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2761
2762 for alias in aliases:
2763 if support.verbose:
2764 print()
2765 print("Testing with", alias)
2766 st = os.lstat(alias)
2767 self.assertEqual(st, os.stat(alias))
2768 self.assertFalse(stat.S_ISLNK(st.st_mode))
2769 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2770 # testing the first one we see is sufficient
2771 break
2772 else:
2773 self.skipTest("test requires an app execution alias")
2774
Tim Golden0321cf22014-05-05 19:46:17 +01002775@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2776class Win32JunctionTests(unittest.TestCase):
2777 junction = 'junctiontest'
2778 junction_target = os.path.dirname(os.path.abspath(__file__))
2779
2780 def setUp(self):
2781 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002782 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002783
2784 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002785 if os.path.lexists(self.junction):
2786 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002787
2788 def test_create_junction(self):
2789 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002790 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002791 self.assertTrue(os.path.exists(self.junction))
2792 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002793 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2794 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002795
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002796 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002797 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002798 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2799 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002800
2801 def test_unlink_removes_junction(self):
2802 _winapi.CreateJunction(self.junction_target, self.junction)
2803 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002804 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002805
2806 os.unlink(self.junction)
2807 self.assertFalse(os.path.exists(self.junction))
2808
Mark Becwarb82bfac2019-02-02 16:08:23 -05002809@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2810class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002811 def test_getfinalpathname_handles(self):
Hai Shid94af3f2020-08-08 17:32:41 +08002812 nt = import_helper.import_module('nt')
2813 ctypes = import_helper.import_module('ctypes')
Berker Peksag6ef726a2019-04-22 18:46:28 +03002814 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002815
2816 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2817 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2818
2819 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2820 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2821 ctypes.wintypes.LPDWORD)
2822
2823 # This is a pseudo-handle that doesn't need to be closed
2824 hproc = kernel.GetCurrentProcess()
2825
2826 handle_count = ctypes.wintypes.DWORD()
2827 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2828 self.assertEqual(1, ok)
2829
2830 before_count = handle_count.value
2831
2832 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002833 filenames = [
2834 r'\\?\C:',
2835 r'\\?\NUL',
2836 r'\\?\CONIN',
2837 __file__,
2838 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002839
Berker Peksag6ef726a2019-04-22 18:46:28 +03002840 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002841 for name in filenames:
2842 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002843 nt._getfinalpathname(name)
2844 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002845 # Failure is expected
2846 pass
2847 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002848 os.stat(name)
2849 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002850 pass
2851
2852 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2853 self.assertEqual(1, ok)
2854
2855 handle_delta = handle_count.value - before_count
2856
2857 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002858
Hai Shi0c4f0f32020-06-30 21:46:31 +08002859@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002860class NonLocalSymlinkTests(unittest.TestCase):
2861
2862 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002863 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002864 Create this structure:
2865
2866 base
2867 \___ some_dir
2868 """
2869 os.makedirs('base/some_dir')
2870
2871 def tearDown(self):
2872 shutil.rmtree('base')
2873
2874 def test_directory_link_nonlocal(self):
2875 """
2876 The symlink target should resolve relative to the link, not relative
2877 to the current directory.
2878
2879 Then, link base/some_link -> base/some_dir and ensure that some_link
2880 is resolved as a directory.
2881
2882 In issue13772, it was discovered that directory detection failed if
2883 the symlink target was not specified relative to the current
2884 directory, which was a defect in the implementation.
2885 """
2886 src = os.path.join('base', 'some_link')
2887 os.symlink('some_dir', src)
2888 assert os.path.isdir(src)
2889
2890
Victor Stinnere8d51452010-08-19 01:05:19 +00002891class FSEncodingTests(unittest.TestCase):
2892 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002893 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2894 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002895
Victor Stinnere8d51452010-08-19 01:05:19 +00002896 def test_identity(self):
2897 # assert fsdecode(fsencode(x)) == x
2898 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2899 try:
2900 bytesfn = os.fsencode(fn)
2901 except UnicodeEncodeError:
2902 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002903 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002904
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002905
Brett Cannonefb00c02012-02-29 18:31:31 -05002906
2907class DeviceEncodingTests(unittest.TestCase):
2908
2909 def test_bad_fd(self):
2910 # Return None when an fd doesn't actually exist.
2911 self.assertIsNone(os.device_encoding(123456))
2912
Paul Monson62dfd7d2019-04-25 11:36:45 -07002913 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002914 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002915 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002916 def test_device_encoding(self):
2917 encoding = os.device_encoding(0)
2918 self.assertIsNotNone(encoding)
2919 self.assertTrue(codecs.lookup(encoding))
2920
2921
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002922class PidTests(unittest.TestCase):
2923 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2924 def test_getppid(self):
2925 p = subprocess.Popen([sys.executable, '-c',
2926 'import os; print(os.getppid())'],
2927 stdout=subprocess.PIPE)
2928 stdout, _ = p.communicate()
2929 # We are the parent of our subprocess
2930 self.assertEqual(int(stdout), os.getpid())
2931
Victor Stinner9bee32b2020-04-22 16:30:35 +02002932 def check_waitpid(self, code, exitcode, callback=None):
2933 if sys.platform == 'win32':
2934 # On Windows, os.spawnv() simply joins arguments with spaces:
2935 # arguments need to be quoted
2936 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2937 else:
2938 args = [sys.executable, '-c', code]
2939 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002940
Victor Stinner9bee32b2020-04-22 16:30:35 +02002941 if callback is not None:
2942 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002943
Victor Stinner9bee32b2020-04-22 16:30:35 +02002944 # don't use support.wait_process() to test directly os.waitpid()
2945 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002946 pid2, status = os.waitpid(pid, 0)
2947 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2948 self.assertEqual(pid2, pid)
2949
Victor Stinner9bee32b2020-04-22 16:30:35 +02002950 def test_waitpid(self):
2951 self.check_waitpid(code='pass', exitcode=0)
2952
2953 def test_waitstatus_to_exitcode(self):
2954 exitcode = 23
2955 code = f'import sys; sys.exit({exitcode})'
2956 self.check_waitpid(code, exitcode=exitcode)
2957
2958 with self.assertRaises(TypeError):
2959 os.waitstatus_to_exitcode(0.0)
2960
2961 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2962 def test_waitpid_windows(self):
2963 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2964 # with exit code larger than INT_MAX.
2965 STATUS_CONTROL_C_EXIT = 0xC000013A
2966 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2967 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2968
2969 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2970 def test_waitstatus_to_exitcode_windows(self):
2971 max_exitcode = 2 ** 32 - 1
2972 for exitcode in (0, 1, 5, max_exitcode):
2973 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2974 exitcode)
2975
2976 # invalid values
2977 with self.assertRaises(ValueError):
2978 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2979 with self.assertRaises(OverflowError):
2980 os.waitstatus_to_exitcode(-1)
2981
Victor Stinner65a796e2020-04-01 18:49:29 +02002982 # Skip the test on Windows
2983 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2984 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002985 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002986 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002987
Victor Stinner9bee32b2020-04-22 16:30:35 +02002988 def kill_process(pid):
2989 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002990
Victor Stinner9bee32b2020-04-22 16:30:35 +02002991 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002992
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002993
Victor Stinner4659ccf2016-09-14 10:57:00 +02002994class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002995 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002996 self.exitcode = 17
2997
Hai Shi0c4f0f32020-06-30 21:46:31 +08002998 filename = os_helper.TESTFN
2999 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003000
3001 if not with_env:
3002 code = 'import sys; sys.exit(%s)' % self.exitcode
3003 else:
3004 self.env = dict(os.environ)
3005 # create an unique key
3006 self.key = str(uuid.uuid4())
3007 self.env[self.key] = self.key
3008 # read the variable from os.environ to check that it exists
3009 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
3010 % (self.key, self.exitcode))
3011
3012 with open(filename, "w") as fp:
3013 fp.write(code)
3014
Berker Peksag81816462016-09-15 20:19:47 +03003015 args = [sys.executable, filename]
3016 if use_bytes:
3017 args = [os.fsencode(a) for a in args]
3018 self.env = {os.fsencode(k): os.fsencode(v)
3019 for k, v in self.env.items()}
3020
3021 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02003022
Berker Peksag4af23d72016-09-15 20:32:44 +03003023 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003024 def test_spawnl(self):
3025 args = self.create_args()
3026 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
3027 self.assertEqual(exitcode, self.exitcode)
3028
Berker Peksag4af23d72016-09-15 20:32:44 +03003029 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003030 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003031 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003032 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
3033 self.assertEqual(exitcode, self.exitcode)
3034
Berker Peksag4af23d72016-09-15 20:32:44 +03003035 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003036 def test_spawnlp(self):
3037 args = self.create_args()
3038 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
3039 self.assertEqual(exitcode, self.exitcode)
3040
Berker Peksag4af23d72016-09-15 20:32:44 +03003041 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003042 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003043 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003044 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
3045 self.assertEqual(exitcode, self.exitcode)
3046
Berker Peksag4af23d72016-09-15 20:32:44 +03003047 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003048 def test_spawnv(self):
3049 args = self.create_args()
3050 exitcode = os.spawnv(os.P_WAIT, args[0], args)
3051 self.assertEqual(exitcode, self.exitcode)
3052
Victor Stinner9bee32b2020-04-22 16:30:35 +02003053 # Test for PyUnicode_FSConverter()
3054 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
3055 self.assertEqual(exitcode, self.exitcode)
3056
Berker Peksag4af23d72016-09-15 20:32:44 +03003057 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003058 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003059 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003060 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3061 self.assertEqual(exitcode, self.exitcode)
3062
Berker Peksag4af23d72016-09-15 20:32:44 +03003063 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003064 def test_spawnvp(self):
3065 args = self.create_args()
3066 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
3067 self.assertEqual(exitcode, self.exitcode)
3068
Berker Peksag4af23d72016-09-15 20:32:44 +03003069 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003070 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003071 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003072 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
3073 self.assertEqual(exitcode, self.exitcode)
3074
Berker Peksag4af23d72016-09-15 20:32:44 +03003075 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003076 def test_nowait(self):
3077 args = self.create_args()
3078 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02003079 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003080
Berker Peksag4af23d72016-09-15 20:32:44 +03003081 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03003082 def test_spawnve_bytes(self):
3083 # Test bytes handling in parse_arglist and parse_envlist (#28114)
3084 args = self.create_args(with_env=True, use_bytes=True)
3085 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3086 self.assertEqual(exitcode, self.exitcode)
3087
Steve Dower859fd7b2016-11-19 18:53:19 -08003088 @requires_os_func('spawnl')
3089 def test_spawnl_noargs(self):
3090 args = self.create_args()
3091 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08003092 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08003093
3094 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08003095 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003096 args = self.create_args()
3097 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003098 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08003099
3100 @requires_os_func('spawnv')
3101 def test_spawnv_noargs(self):
3102 args = self.create_args()
3103 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
3104 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08003105 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
3106 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08003107
3108 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08003109 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003110 args = self.create_args()
3111 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
3112 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003113 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
3114 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02003115
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003116 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03003117 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003118
Ville Skyttä49b27342017-08-03 09:00:59 +03003119 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003120 newenv = os.environ.copy()
3121 newenv["FRUIT\0VEGETABLE"] = "cabbage"
3122 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003123 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003124 except ValueError:
3125 pass
3126 else:
3127 self.assertEqual(exitcode, 127)
3128
Ville Skyttä49b27342017-08-03 09:00:59 +03003129 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003130 newenv = os.environ.copy()
3131 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3132 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003133 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003134 except ValueError:
3135 pass
3136 else:
3137 self.assertEqual(exitcode, 127)
3138
Ville Skyttä49b27342017-08-03 09:00:59 +03003139 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003140 newenv = os.environ.copy()
3141 newenv["FRUIT=ORANGE"] = "lemon"
3142 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003143 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003144 except ValueError:
3145 pass
3146 else:
3147 self.assertEqual(exitcode, 127)
3148
Ville Skyttä49b27342017-08-03 09:00:59 +03003149 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003150 filename = os_helper.TESTFN
3151 self.addCleanup(os_helper.unlink, filename)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003152 with open(filename, "w") as fp:
3153 fp.write('import sys, os\n'
3154 'if os.getenv("FRUIT") != "orange=lemon":\n'
3155 ' raise AssertionError')
3156 args = [sys.executable, filename]
3157 newenv = os.environ.copy()
3158 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003159 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003160 self.assertEqual(exitcode, 0)
3161
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003162 @requires_os_func('spawnve')
3163 def test_spawnve_invalid_env(self):
3164 self._test_invalid_env(os.spawnve)
3165
3166 @requires_os_func('spawnvpe')
3167 def test_spawnvpe_invalid_env(self):
3168 self._test_invalid_env(os.spawnvpe)
3169
Serhiy Storchaka77703942017-06-25 07:33:01 +03003170
Brian Curtin0151b8e2010-09-24 13:43:43 +00003171# The introduction of this TestCase caused at least two different errors on
3172# *nix buildbots. Temporarily skip this to let the buildbots move along.
3173@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003174@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3175class LoginTests(unittest.TestCase):
3176 def test_getlogin(self):
3177 user_name = os.getlogin()
3178 self.assertNotEqual(len(user_name), 0)
3179
3180
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003181@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3182 "needs os.getpriority and os.setpriority")
3183class ProgramPriorityTests(unittest.TestCase):
3184 """Tests for os.getpriority() and os.setpriority()."""
3185
3186 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003187
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003188 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3189 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3190 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003191 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3192 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003193 raise unittest.SkipTest("unable to reliably test setpriority "
3194 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003195 else:
3196 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003197 finally:
3198 try:
3199 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3200 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003201 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003202 raise
3203
3204
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003205class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003206
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003207 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003208
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 def __init__(self, conn):
3210 asynchat.async_chat.__init__(self, conn)
3211 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003212 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003213 self.closed = False
3214 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003215
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 def handle_read(self):
3217 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003218 if self.accumulate:
3219 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003220
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003221 def get_data(self):
3222 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003223
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003224 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003225 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003226 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003227
3228 def handle_error(self):
3229 raise
3230
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003231 def __init__(self, address):
3232 threading.Thread.__init__(self)
3233 asyncore.dispatcher.__init__(self)
3234 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3235 self.bind(address)
3236 self.listen(5)
3237 self.host, self.port = self.socket.getsockname()[:2]
3238 self.handler_instance = None
3239 self._active = False
3240 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003241
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003242 # --- public API
3243
3244 @property
3245 def running(self):
3246 return self._active
3247
3248 def start(self):
3249 assert not self.running
3250 self.__flag = threading.Event()
3251 threading.Thread.start(self)
3252 self.__flag.wait()
3253
3254 def stop(self):
3255 assert self.running
3256 self._active = False
3257 self.join()
3258
3259 def wait(self):
3260 # wait for handler connection to be closed, then stop the server
3261 while not getattr(self.handler_instance, "closed", False):
3262 time.sleep(0.001)
3263 self.stop()
3264
3265 # --- internals
3266
3267 def run(self):
3268 self._active = True
3269 self.__flag.set()
3270 while self._active and asyncore.socket_map:
3271 self._active_lock.acquire()
3272 asyncore.loop(timeout=0.001, count=1)
3273 self._active_lock.release()
3274 asyncore.close_all()
3275
3276 def handle_accept(self):
3277 conn, addr = self.accept()
3278 self.handler_instance = self.Handler(conn)
3279
3280 def handle_connect(self):
3281 self.close()
3282 handle_read = handle_connect
3283
3284 def writable(self):
3285 return 0
3286
3287 def handle_error(self):
3288 raise
3289
3290
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003291@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3292class TestSendfile(unittest.TestCase):
3293
Victor Stinner8c663fd2017-11-08 14:44:44 -08003294 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003295 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003296 not sys.platform.startswith("solaris") and \
3297 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003298 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3299 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003300 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3301 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003302
3303 @classmethod
3304 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003305 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003306 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003307
3308 @classmethod
3309 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003310 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003311 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003312
3313 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003314 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003315 self.server.start()
3316 self.client = socket.socket()
3317 self.client.connect((self.server.host, self.server.port))
3318 self.client.settimeout(1)
3319 # synchronize by waiting for "220 ready" response
3320 self.client.recv(1024)
3321 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003322 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003323 self.fileno = self.file.fileno()
3324
3325 def tearDown(self):
3326 self.file.close()
3327 self.client.close()
3328 if self.server.running:
3329 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003330 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003331
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003332 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003333 """A higher level wrapper representing how an application is
3334 supposed to use sendfile().
3335 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003336 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003337 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003338 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003339 except OSError as err:
3340 if err.errno == errno.ECONNRESET:
3341 # disconnected
3342 raise
3343 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3344 # we have to retry send data
3345 continue
3346 else:
3347 raise
3348
3349 def test_send_whole_file(self):
3350 # normal send
3351 total_sent = 0
3352 offset = 0
3353 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003354 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003355 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3356 if sent == 0:
3357 break
3358 offset += sent
3359 total_sent += sent
3360 self.assertTrue(sent <= nbytes)
3361 self.assertEqual(offset, total_sent)
3362
3363 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003364 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003365 self.client.close()
3366 self.server.wait()
3367 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003368 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003369 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003370
3371 def test_send_at_certain_offset(self):
3372 # start sending a file at a certain offset
3373 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003374 offset = len(self.DATA) // 2
3375 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003376 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003377 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003378 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3379 if sent == 0:
3380 break
3381 offset += sent
3382 total_sent += sent
3383 self.assertTrue(sent <= nbytes)
3384
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003385 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003386 self.client.close()
3387 self.server.wait()
3388 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003389 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003390 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003391 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003392 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003393
3394 def test_offset_overflow(self):
3395 # specify an offset > file size
3396 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003397 try:
3398 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3399 except OSError as e:
3400 # Solaris can raise EINVAL if offset >= file length, ignore.
3401 if e.errno != errno.EINVAL:
3402 raise
3403 else:
3404 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003405 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003406 self.client.close()
3407 self.server.wait()
3408 data = self.server.handler_instance.get_data()
3409 self.assertEqual(data, b'')
3410
3411 def test_invalid_offset(self):
3412 with self.assertRaises(OSError) as cm:
3413 os.sendfile(self.sockno, self.fileno, -1, 4096)
3414 self.assertEqual(cm.exception.errno, errno.EINVAL)
3415
Martin Panterbf19d162015-09-09 01:01:13 +00003416 def test_keywords(self):
3417 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003418 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3419 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003420 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003421 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3422 offset=0, count=4096,
3423 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003424
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003425 # --- headers / trailers tests
3426
Serhiy Storchaka43767632013-11-03 21:31:38 +02003427 @requires_headers_trailers
3428 def test_headers(self):
3429 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003430 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003431 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003432 headers=[b"x" * 512, b"y" * 256])
3433 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003434 total_sent += sent
3435 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003436 while total_sent < len(expected_data):
3437 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003438 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3439 offset, nbytes)
3440 if sent == 0:
3441 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003442 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003443 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003444 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003445
Serhiy Storchaka43767632013-11-03 21:31:38 +02003446 self.assertEqual(total_sent, len(expected_data))
3447 self.client.close()
3448 self.server.wait()
3449 data = self.server.handler_instance.get_data()
3450 self.assertEqual(hash(data), hash(expected_data))
3451
3452 @requires_headers_trailers
3453 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003454 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003455 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003456
Hai Shi0c4f0f32020-06-30 21:46:31 +08003457 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003458 create_file(TESTFN2, file_data)
3459
3460 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003461 os.sendfile(self.sockno, f.fileno(), 0, 5,
3462 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003463 self.client.close()
3464 self.server.wait()
3465 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003466 self.assertEqual(data, b"abcde123456789")
3467
3468 @requires_headers_trailers
3469 @requires_32b
3470 def test_headers_overflow_32bits(self):
3471 self.server.handler_instance.accumulate = False
3472 with self.assertRaises(OSError) as cm:
3473 os.sendfile(self.sockno, self.fileno, 0, 0,
3474 headers=[b"x" * 2**16] * 2**15)
3475 self.assertEqual(cm.exception.errno, errno.EINVAL)
3476
3477 @requires_headers_trailers
3478 @requires_32b
3479 def test_trailers_overflow_32bits(self):
3480 self.server.handler_instance.accumulate = False
3481 with self.assertRaises(OSError) as cm:
3482 os.sendfile(self.sockno, self.fileno, 0, 0,
3483 trailers=[b"x" * 2**16] * 2**15)
3484 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003485
Serhiy Storchaka43767632013-11-03 21:31:38 +02003486 @requires_headers_trailers
3487 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3488 'test needs os.SF_NODISKIO')
3489 def test_flags(self):
3490 try:
3491 os.sendfile(self.sockno, self.fileno, 0, 4096,
3492 flags=os.SF_NODISKIO)
3493 except OSError as err:
3494 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3495 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003496
3497
Larry Hastings9cf065c2012-06-22 16:30:09 -07003498def supports_extended_attributes():
3499 if not hasattr(os, "setxattr"):
3500 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003501
Larry Hastings9cf065c2012-06-22 16:30:09 -07003502 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003503 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003504 try:
3505 os.setxattr(fp.fileno(), b"user.test", b"")
3506 except OSError:
3507 return False
3508 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003509 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003510
3511 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003512
3513
3514@unittest.skipUnless(supports_extended_attributes(),
3515 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003516# Kernels < 2.6.39 don't respect setxattr flags.
3517@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003518class ExtendedAttributeTests(unittest.TestCase):
3519
Larry Hastings9cf065c2012-06-22 16:30:09 -07003520 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003521 fn = os_helper.TESTFN
3522 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003523 create_file(fn)
3524
Benjamin Peterson799bd802011-08-31 22:15:17 -04003525 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003526 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003527 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003528
Victor Stinnerf12e5062011-10-16 22:12:03 +02003529 init_xattr = listxattr(fn)
3530 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003531
Larry Hastings9cf065c2012-06-22 16:30:09 -07003532 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003533 xattr = set(init_xattr)
3534 xattr.add("user.test")
3535 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003536 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3537 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3538 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003539
Benjamin Peterson799bd802011-08-31 22:15:17 -04003540 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003541 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003542 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003543
Benjamin Peterson799bd802011-08-31 22:15:17 -04003544 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003545 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003546 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003547
Larry Hastings9cf065c2012-06-22 16:30:09 -07003548 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003549 xattr.add("user.test2")
3550 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003551 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003552
Benjamin Peterson799bd802011-08-31 22:15:17 -04003553 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003554 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003555 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003556
Victor Stinnerf12e5062011-10-16 22:12:03 +02003557 xattr.remove("user.test")
3558 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003559 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3560 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3561 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3562 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003563 many = sorted("user.test{}".format(i) for i in range(100))
3564 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003565 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003566 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003567
Larry Hastings9cf065c2012-06-22 16:30:09 -07003568 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003569 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003570 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003571
3572 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003573 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003574
3575 def test_simple(self):
3576 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3577 os.listxattr)
3578
3579 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003580 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3581 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003582
3583 def test_fds(self):
3584 def getxattr(path, *args):
3585 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003586 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003587 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003588 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003589 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003590 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003591 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003592 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003593 def listxattr(path, *args):
3594 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003595 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003596 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3597
3598
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003599@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3600class TermsizeTests(unittest.TestCase):
3601 def test_does_not_crash(self):
3602 """Check if get_terminal_size() returns a meaningful value.
3603
3604 There's no easy portable way to actually check the size of the
3605 terminal, so let's check if it returns something sensible instead.
3606 """
3607 try:
3608 size = os.get_terminal_size()
3609 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003610 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003611 # Under win32 a generic OSError can be thrown if the
3612 # handle cannot be retrieved
3613 self.skipTest("failed to query terminal size")
3614 raise
3615
Antoine Pitroucfade362012-02-08 23:48:59 +01003616 self.assertGreaterEqual(size.columns, 0)
3617 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003618
3619 def test_stty_match(self):
3620 """Check if stty returns the same results
3621
3622 stty actually tests stdin, so get_terminal_size is invoked on
3623 stdin explicitly. If stty succeeded, then get_terminal_size()
3624 should work too.
3625 """
3626 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003627 size = (
3628 subprocess.check_output(
3629 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3630 ).split()
3631 )
xdegaye6a55d092017-11-12 17:57:04 +01003632 except (FileNotFoundError, subprocess.CalledProcessError,
3633 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003634 self.skipTest("stty invocation failed")
3635 expected = (int(size[1]), int(size[0])) # reversed order
3636
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003637 try:
3638 actual = os.get_terminal_size(sys.__stdin__.fileno())
3639 except OSError as e:
3640 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3641 # Under win32 a generic OSError can be thrown if the
3642 # handle cannot be retrieved
3643 self.skipTest("failed to query terminal size")
3644 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003645 self.assertEqual(expected, actual)
3646
3647
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003648@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003649@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003650class MemfdCreateTests(unittest.TestCase):
3651 def test_memfd_create(self):
3652 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3653 self.assertNotEqual(fd, -1)
3654 self.addCleanup(os.close, fd)
3655 self.assertFalse(os.get_inheritable(fd))
3656 with open(fd, "wb", closefd=False) as f:
3657 f.write(b'memfd_create')
3658 self.assertEqual(f.tell(), 12)
3659
3660 fd2 = os.memfd_create("Hi")
3661 self.addCleanup(os.close, fd2)
3662 self.assertFalse(os.get_inheritable(fd2))
3663
3664
Christian Heimescd9fed62020-11-13 19:48:52 +01003665@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3666@support.requires_linux_version(2, 6, 30)
3667class EventfdTests(unittest.TestCase):
3668 def test_eventfd_initval(self):
3669 def pack(value):
3670 """Pack as native uint64_t
3671 """
3672 return struct.pack("@Q", value)
3673 size = 8 # read/write 8 bytes
3674 initval = 42
3675 fd = os.eventfd(initval)
3676 self.assertNotEqual(fd, -1)
3677 self.addCleanup(os.close, fd)
3678 self.assertFalse(os.get_inheritable(fd))
3679
3680 # test with raw read/write
3681 res = os.read(fd, size)
3682 self.assertEqual(res, pack(initval))
3683
3684 os.write(fd, pack(23))
3685 res = os.read(fd, size)
3686 self.assertEqual(res, pack(23))
3687
3688 os.write(fd, pack(40))
3689 os.write(fd, pack(2))
3690 res = os.read(fd, size)
3691 self.assertEqual(res, pack(42))
3692
3693 # test with eventfd_read/eventfd_write
3694 os.eventfd_write(fd, 20)
3695 os.eventfd_write(fd, 3)
3696 res = os.eventfd_read(fd)
3697 self.assertEqual(res, 23)
3698
3699 def test_eventfd_semaphore(self):
3700 initval = 2
3701 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3702 fd = os.eventfd(initval, flags)
3703 self.assertNotEqual(fd, -1)
3704 self.addCleanup(os.close, fd)
3705
3706 # semaphore starts has initval 2, two reads return '1'
3707 res = os.eventfd_read(fd)
3708 self.assertEqual(res, 1)
3709 res = os.eventfd_read(fd)
3710 self.assertEqual(res, 1)
3711 # third read would block
3712 with self.assertRaises(BlockingIOError):
3713 os.eventfd_read(fd)
3714 with self.assertRaises(BlockingIOError):
3715 os.read(fd, 8)
3716
3717 # increase semaphore counter, read one
3718 os.eventfd_write(fd, 1)
3719 res = os.eventfd_read(fd)
3720 self.assertEqual(res, 1)
3721 # next read would block, too
3722 with self.assertRaises(BlockingIOError):
3723 os.eventfd_read(fd)
3724
3725 def test_eventfd_select(self):
3726 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3727 fd = os.eventfd(0, flags)
3728 self.assertNotEqual(fd, -1)
3729 self.addCleanup(os.close, fd)
3730
3731 # counter is zero, only writeable
3732 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3733 self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3734
3735 # counter is non-zero, read and writeable
3736 os.eventfd_write(fd, 23)
3737 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3738 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3739 self.assertEqual(os.eventfd_read(fd), 23)
3740
3741 # counter at max, only readable
3742 os.eventfd_write(fd, (2**64) - 2)
3743 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3744 self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3745 os.eventfd_read(fd)
3746
3747
Victor Stinner292c8352012-10-30 02:17:38 +01003748class OSErrorTests(unittest.TestCase):
3749 def setUp(self):
3750 class Str(str):
3751 pass
3752
Victor Stinnerafe17062012-10-31 22:47:43 +01003753 self.bytes_filenames = []
3754 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003755 if os_helper.TESTFN_UNENCODABLE is not None:
3756 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003757 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003758 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003759 self.unicode_filenames.append(decoded)
3760 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003761 if os_helper.TESTFN_UNDECODABLE is not None:
3762 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003763 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003764 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003765 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003766 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003767 self.bytes_filenames.append(memoryview(encoded))
3768
3769 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003770
3771 def test_oserror_filename(self):
3772 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003773 (self.filenames, os.chdir,),
3774 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003775 (self.filenames, os.lstat,),
3776 (self.filenames, os.open, os.O_RDONLY),
3777 (self.filenames, os.rmdir,),
3778 (self.filenames, os.stat,),
3779 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003780 ]
3781 if sys.platform == "win32":
3782 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003783 (self.bytes_filenames, os.rename, b"dst"),
3784 (self.bytes_filenames, os.replace, b"dst"),
3785 (self.unicode_filenames, os.rename, "dst"),
3786 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003787 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003788 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003789 else:
3790 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003791 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003792 (self.filenames, os.rename, "dst"),
3793 (self.filenames, os.replace, "dst"),
3794 ))
3795 if hasattr(os, "chown"):
3796 funcs.append((self.filenames, os.chown, 0, 0))
3797 if hasattr(os, "lchown"):
3798 funcs.append((self.filenames, os.lchown, 0, 0))
3799 if hasattr(os, "truncate"):
3800 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003801 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003802 funcs.append((self.filenames, os.chflags, 0))
3803 if hasattr(os, "lchflags"):
3804 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003805 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003806 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003807 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003808 if sys.platform == "win32":
3809 funcs.append((self.bytes_filenames, os.link, b"dst"))
3810 funcs.append((self.unicode_filenames, os.link, "dst"))
3811 else:
3812 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003813 if hasattr(os, "listxattr"):
3814 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003815 (self.filenames, os.listxattr,),
3816 (self.filenames, os.getxattr, "user.test"),
3817 (self.filenames, os.setxattr, "user.test", b'user'),
3818 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003819 ))
3820 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003821 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003822 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003823 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003824
Steve Dowercc16be82016-09-08 10:35:16 -07003825
Victor Stinnerafe17062012-10-31 22:47:43 +01003826 for filenames, func, *func_args in funcs:
3827 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003828 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003829 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003830 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003831 else:
3832 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3833 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003834 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003835 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003836 except UnicodeDecodeError:
3837 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003838 else:
3839 self.fail("No exception thrown by {}".format(func))
3840
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003841class CPUCountTests(unittest.TestCase):
3842 def test_cpu_count(self):
3843 cpus = os.cpu_count()
3844 if cpus is not None:
3845 self.assertIsInstance(cpus, int)
3846 self.assertGreater(cpus, 0)
3847 else:
3848 self.skipTest("Could not determine the number of CPUs")
3849
Victor Stinnerdaf45552013-08-28 00:53:59 +02003850
3851class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003852 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003853 fd = os.open(__file__, os.O_RDONLY)
3854 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003855 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003856
Victor Stinnerdaf45552013-08-28 00:53:59 +02003857 os.set_inheritable(fd, True)
3858 self.assertEqual(os.get_inheritable(fd), True)
3859
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003860 @unittest.skipIf(fcntl is None, "need fcntl")
3861 def test_get_inheritable_cloexec(self):
3862 fd = os.open(__file__, os.O_RDONLY)
3863 self.addCleanup(os.close, fd)
3864 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003865
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003866 # clear FD_CLOEXEC flag
3867 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3868 flags &= ~fcntl.FD_CLOEXEC
3869 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003870
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003871 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003872
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003873 @unittest.skipIf(fcntl is None, "need fcntl")
3874 def test_set_inheritable_cloexec(self):
3875 fd = os.open(__file__, os.O_RDONLY)
3876 self.addCleanup(os.close, fd)
3877 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3878 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003879
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003880 os.set_inheritable(fd, True)
3881 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3882 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003883
cptpcrd7dc71c42021-01-20 09:05:51 -05003884 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3885 def test_get_set_inheritable_o_path(self):
3886 fd = os.open(__file__, os.O_PATH)
3887 self.addCleanup(os.close, fd)
3888 self.assertEqual(os.get_inheritable(fd), False)
3889
3890 os.set_inheritable(fd, True)
3891 self.assertEqual(os.get_inheritable(fd), True)
3892
3893 os.set_inheritable(fd, False)
3894 self.assertEqual(os.get_inheritable(fd), False)
3895
3896 def test_get_set_inheritable_badf(self):
3897 fd = os_helper.make_bad_fd()
3898
3899 with self.assertRaises(OSError) as ctx:
3900 os.get_inheritable(fd)
3901 self.assertEqual(ctx.exception.errno, errno.EBADF)
3902
3903 with self.assertRaises(OSError) as ctx:
3904 os.set_inheritable(fd, True)
3905 self.assertEqual(ctx.exception.errno, errno.EBADF)
3906
3907 with self.assertRaises(OSError) as ctx:
3908 os.set_inheritable(fd, False)
3909 self.assertEqual(ctx.exception.errno, errno.EBADF)
3910
Victor Stinnerdaf45552013-08-28 00:53:59 +02003911 def test_open(self):
3912 fd = os.open(__file__, os.O_RDONLY)
3913 self.addCleanup(os.close, fd)
3914 self.assertEqual(os.get_inheritable(fd), False)
3915
3916 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3917 def test_pipe(self):
3918 rfd, wfd = os.pipe()
3919 self.addCleanup(os.close, rfd)
3920 self.addCleanup(os.close, wfd)
3921 self.assertEqual(os.get_inheritable(rfd), False)
3922 self.assertEqual(os.get_inheritable(wfd), False)
3923
3924 def test_dup(self):
3925 fd1 = os.open(__file__, os.O_RDONLY)
3926 self.addCleanup(os.close, fd1)
3927
3928 fd2 = os.dup(fd1)
3929 self.addCleanup(os.close, fd2)
3930 self.assertEqual(os.get_inheritable(fd2), False)
3931
Zackery Spytz5be66602019-08-23 12:38:41 -06003932 def test_dup_standard_stream(self):
3933 fd = os.dup(1)
3934 self.addCleanup(os.close, fd)
3935 self.assertGreater(fd, 0)
3936
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003937 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3938 def test_dup_nul(self):
3939 # os.dup() was creating inheritable fds for character files.
3940 fd1 = os.open('NUL', os.O_RDONLY)
3941 self.addCleanup(os.close, fd1)
3942 fd2 = os.dup(fd1)
3943 self.addCleanup(os.close, fd2)
3944 self.assertFalse(os.get_inheritable(fd2))
3945
Victor Stinnerdaf45552013-08-28 00:53:59 +02003946 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3947 def test_dup2(self):
3948 fd = os.open(__file__, os.O_RDONLY)
3949 self.addCleanup(os.close, fd)
3950
3951 # inheritable by default
3952 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003953 self.addCleanup(os.close, fd2)
3954 self.assertEqual(os.dup2(fd, fd2), fd2)
3955 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003956
3957 # force non-inheritable
3958 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003959 self.addCleanup(os.close, fd3)
3960 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3961 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003962
3963 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3964 def test_openpty(self):
3965 master_fd, slave_fd = os.openpty()
3966 self.addCleanup(os.close, master_fd)
3967 self.addCleanup(os.close, slave_fd)
3968 self.assertEqual(os.get_inheritable(master_fd), False)
3969 self.assertEqual(os.get_inheritable(slave_fd), False)
3970
3971
Brett Cannon3f9183b2016-08-26 14:44:48 -07003972class PathTConverterTests(unittest.TestCase):
3973 # tuples of (function name, allows fd arguments, additional arguments to
3974 # function, cleanup function)
3975 functions = [
3976 ('stat', True, (), None),
3977 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003978 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003979 ('chflags', False, (0,), None),
3980 ('lchflags', False, (0,), None),
3981 ('open', False, (0,), getattr(os, 'close', None)),
3982 ]
3983
3984 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003985 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003986 if os.name == 'nt':
3987 bytes_fspath = bytes_filename = None
3988 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003989 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003990 bytes_fspath = FakePath(bytes_filename)
3991 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003992 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003993 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003994
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003995 int_fspath = FakePath(fd)
3996 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003997
3998 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3999 with self.subTest(name=name):
4000 try:
4001 fn = getattr(os, name)
4002 except AttributeError:
4003 continue
4004
Brett Cannon8f96a302016-08-26 19:30:11 -07004005 for path in (str_filename, bytes_filename, str_fspath,
4006 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07004007 if path is None:
4008 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07004009 with self.subTest(name=name, path=path):
4010 result = fn(path, *extra_args)
4011 if cleanup_fn is not None:
4012 cleanup_fn(result)
4013
4014 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004015 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07004016 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07004017
4018 if allow_fd:
4019 result = fn(fd, *extra_args) # should not fail
4020 if cleanup_fn is not None:
4021 cleanup_fn(result)
4022 else:
4023 with self.assertRaisesRegex(
4024 TypeError,
4025 'os.PathLike'):
4026 fn(fd, *extra_args)
4027
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004028 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004029 msg = r'__fspath__\(\) to return str or bytes, not %s'
4030 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004031 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004032 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004033 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004034 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004035 os.stat(FakePath(object()))
4036
Brett Cannon3f9183b2016-08-26 14:44:48 -07004037
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004038@unittest.skipUnless(hasattr(os, 'get_blocking'),
4039 'needs os.get_blocking() and os.set_blocking()')
4040class BlockingTests(unittest.TestCase):
4041 def test_blocking(self):
4042 fd = os.open(__file__, os.O_RDONLY)
4043 self.addCleanup(os.close, fd)
4044 self.assertEqual(os.get_blocking(fd), True)
4045
4046 os.set_blocking(fd, False)
4047 self.assertEqual(os.get_blocking(fd), False)
4048
4049 os.set_blocking(fd, True)
4050 self.assertEqual(os.get_blocking(fd), True)
4051
4052
Yury Selivanov97e2e062014-09-26 12:33:06 -04004053
4054class ExportsTests(unittest.TestCase):
4055 def test_os_all(self):
4056 self.assertIn('open', os.__all__)
4057 self.assertIn('walk', os.__all__)
4058
4059
Eddie Elizondob3966632019-11-05 07:16:14 -08004060class TestDirEntry(unittest.TestCase):
4061 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004062 self.path = os.path.realpath(os_helper.TESTFN)
4063 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08004064 os.mkdir(self.path)
4065
4066 def test_uninstantiable(self):
4067 self.assertRaises(TypeError, os.DirEntry)
4068
4069 def test_unpickable(self):
4070 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4071 entry = [entry for entry in os.scandir(self.path)].pop()
4072 self.assertIsInstance(entry, os.DirEntry)
4073 self.assertEqual(entry.name, "file.txt")
4074 import pickle
4075 self.assertRaises(TypeError, pickle.dumps, entry, filename)
4076
4077
Victor Stinner6036e442015-03-08 01:58:04 +01004078class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004079 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004080
Victor Stinner6036e442015-03-08 01:58:04 +01004081 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004082 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07004083 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08004084 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01004085 os.mkdir(self.path)
4086
4087 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07004088 path = self.bytes_path if isinstance(name, bytes) else self.path
4089 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01004090 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01004091 return filename
4092
4093 def get_entries(self, names):
4094 entries = dict((entry.name, entry)
4095 for entry in os.scandir(self.path))
4096 self.assertEqual(sorted(entries.keys()), names)
4097 return entries
4098
4099 def assert_stat_equal(self, stat1, stat2, skip_fields):
4100 if skip_fields:
4101 for attr in dir(stat1):
4102 if not attr.startswith("st_"):
4103 continue
4104 if attr in ("st_dev", "st_ino", "st_nlink"):
4105 continue
4106 self.assertEqual(getattr(stat1, attr),
4107 getattr(stat2, attr),
4108 (stat1, stat2, attr))
4109 else:
4110 self.assertEqual(stat1, stat2)
4111
Eddie Elizondob3966632019-11-05 07:16:14 -08004112 def test_uninstantiable(self):
4113 scandir_iter = os.scandir(self.path)
4114 self.assertRaises(TypeError, type(scandir_iter))
4115 scandir_iter.close()
4116
4117 def test_unpickable(self):
4118 filename = self.create_file("file.txt")
4119 scandir_iter = os.scandir(self.path)
4120 import pickle
4121 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4122 scandir_iter.close()
4123
Victor Stinner6036e442015-03-08 01:58:04 +01004124 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07004125 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01004126 self.assertEqual(entry.name, name)
4127 self.assertEqual(entry.path, os.path.join(self.path, name))
4128 self.assertEqual(entry.inode(),
4129 os.stat(entry.path, follow_symlinks=False).st_ino)
4130
4131 entry_stat = os.stat(entry.path)
4132 self.assertEqual(entry.is_dir(),
4133 stat.S_ISDIR(entry_stat.st_mode))
4134 self.assertEqual(entry.is_file(),
4135 stat.S_ISREG(entry_stat.st_mode))
4136 self.assertEqual(entry.is_symlink(),
4137 os.path.islink(entry.path))
4138
4139 entry_lstat = os.stat(entry.path, follow_symlinks=False)
4140 self.assertEqual(entry.is_dir(follow_symlinks=False),
4141 stat.S_ISDIR(entry_lstat.st_mode))
4142 self.assertEqual(entry.is_file(follow_symlinks=False),
4143 stat.S_ISREG(entry_lstat.st_mode))
4144
4145 self.assert_stat_equal(entry.stat(),
4146 entry_stat,
4147 os.name == 'nt' and not is_symlink)
4148 self.assert_stat_equal(entry.stat(follow_symlinks=False),
4149 entry_lstat,
4150 os.name == 'nt')
4151
4152 def test_attributes(self):
4153 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08004154 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01004155
4156 dirname = os.path.join(self.path, "dir")
4157 os.mkdir(dirname)
4158 filename = self.create_file("file.txt")
4159 if link:
xdegaye6a55d092017-11-12 17:57:04 +01004160 try:
4161 os.link(filename, os.path.join(self.path, "link_file.txt"))
4162 except PermissionError as e:
4163 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01004164 if symlink:
4165 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4166 target_is_directory=True)
4167 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4168
4169 names = ['dir', 'file.txt']
4170 if link:
4171 names.append('link_file.txt')
4172 if symlink:
4173 names.extend(('symlink_dir', 'symlink_file.txt'))
4174 entries = self.get_entries(names)
4175
4176 entry = entries['dir']
4177 self.check_entry(entry, 'dir', True, False, False)
4178
4179 entry = entries['file.txt']
4180 self.check_entry(entry, 'file.txt', False, True, False)
4181
4182 if link:
4183 entry = entries['link_file.txt']
4184 self.check_entry(entry, 'link_file.txt', False, True, False)
4185
4186 if symlink:
4187 entry = entries['symlink_dir']
4188 self.check_entry(entry, 'symlink_dir', True, False, True)
4189
4190 entry = entries['symlink_file.txt']
4191 self.check_entry(entry, 'symlink_file.txt', False, True, True)
4192
4193 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07004194 path = self.bytes_path if isinstance(name, bytes) else self.path
4195 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01004196 self.assertEqual(len(entries), 1)
4197
4198 entry = entries[0]
4199 self.assertEqual(entry.name, name)
4200 return entry
4201
Brett Cannon96881cd2016-06-10 14:37:21 -07004202 def create_file_entry(self, name='file.txt'):
4203 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01004204 return self.get_entry(os.path.basename(filename))
4205
4206 def test_current_directory(self):
4207 filename = self.create_file()
4208 old_dir = os.getcwd()
4209 try:
4210 os.chdir(self.path)
4211
4212 # call scandir() without parameter: it must list the content
4213 # of the current directory
4214 entries = dict((entry.name, entry) for entry in os.scandir())
4215 self.assertEqual(sorted(entries.keys()),
4216 [os.path.basename(filename)])
4217 finally:
4218 os.chdir(old_dir)
4219
4220 def test_repr(self):
4221 entry = self.create_file_entry()
4222 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4223
Brett Cannon96881cd2016-06-10 14:37:21 -07004224 def test_fspath_protocol(self):
4225 entry = self.create_file_entry()
4226 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4227
4228 def test_fspath_protocol_bytes(self):
4229 bytes_filename = os.fsencode('bytesfile.txt')
4230 bytes_entry = self.create_file_entry(name=bytes_filename)
4231 fspath = os.fspath(bytes_entry)
4232 self.assertIsInstance(fspath, bytes)
4233 self.assertEqual(fspath,
4234 os.path.join(os.fsencode(self.path),bytes_filename))
4235
Victor Stinner6036e442015-03-08 01:58:04 +01004236 def test_removed_dir(self):
4237 path = os.path.join(self.path, 'dir')
4238
4239 os.mkdir(path)
4240 entry = self.get_entry('dir')
4241 os.rmdir(path)
4242
4243 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4244 if os.name == 'nt':
4245 self.assertTrue(entry.is_dir())
4246 self.assertFalse(entry.is_file())
4247 self.assertFalse(entry.is_symlink())
4248 if os.name == 'nt':
4249 self.assertRaises(FileNotFoundError, entry.inode)
4250 # don't fail
4251 entry.stat()
4252 entry.stat(follow_symlinks=False)
4253 else:
4254 self.assertGreater(entry.inode(), 0)
4255 self.assertRaises(FileNotFoundError, entry.stat)
4256 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4257
4258 def test_removed_file(self):
4259 entry = self.create_file_entry()
4260 os.unlink(entry.path)
4261
4262 self.assertFalse(entry.is_dir())
4263 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4264 if os.name == 'nt':
4265 self.assertTrue(entry.is_file())
4266 self.assertFalse(entry.is_symlink())
4267 if os.name == 'nt':
4268 self.assertRaises(FileNotFoundError, entry.inode)
4269 # don't fail
4270 entry.stat()
4271 entry.stat(follow_symlinks=False)
4272 else:
4273 self.assertGreater(entry.inode(), 0)
4274 self.assertRaises(FileNotFoundError, entry.stat)
4275 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4276
4277 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004278 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004279 return self.skipTest('cannot create symbolic link')
4280
4281 filename = self.create_file("file.txt")
4282 os.symlink(filename,
4283 os.path.join(self.path, "symlink.txt"))
4284 entries = self.get_entries(['file.txt', 'symlink.txt'])
4285 entry = entries['symlink.txt']
4286 os.unlink(filename)
4287
4288 self.assertGreater(entry.inode(), 0)
4289 self.assertFalse(entry.is_dir())
4290 self.assertFalse(entry.is_file()) # broken symlink returns False
4291 self.assertFalse(entry.is_dir(follow_symlinks=False))
4292 self.assertFalse(entry.is_file(follow_symlinks=False))
4293 self.assertTrue(entry.is_symlink())
4294 self.assertRaises(FileNotFoundError, entry.stat)
4295 # don't fail
4296 entry.stat(follow_symlinks=False)
4297
4298 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004299 self.create_file("file.txt")
4300
4301 path_bytes = os.fsencode(self.path)
4302 entries = list(os.scandir(path_bytes))
4303 self.assertEqual(len(entries), 1, entries)
4304 entry = entries[0]
4305
4306 self.assertEqual(entry.name, b'file.txt')
4307 self.assertEqual(entry.path,
4308 os.fsencode(os.path.join(self.path, 'file.txt')))
4309
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004310 def test_bytes_like(self):
4311 self.create_file("file.txt")
4312
4313 for cls in bytearray, memoryview:
4314 path_bytes = cls(os.fsencode(self.path))
4315 with self.assertWarns(DeprecationWarning):
4316 entries = list(os.scandir(path_bytes))
4317 self.assertEqual(len(entries), 1, entries)
4318 entry = entries[0]
4319
4320 self.assertEqual(entry.name, b'file.txt')
4321 self.assertEqual(entry.path,
4322 os.fsencode(os.path.join(self.path, 'file.txt')))
4323 self.assertIs(type(entry.name), bytes)
4324 self.assertIs(type(entry.path), bytes)
4325
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004326 @unittest.skipUnless(os.listdir in os.supports_fd,
4327 'fd support for listdir required for this test.')
4328 def test_fd(self):
4329 self.assertIn(os.scandir, os.supports_fd)
4330 self.create_file('file.txt')
4331 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004332 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004333 os.symlink('file.txt', os.path.join(self.path, 'link'))
4334 expected_names.append('link')
4335
4336 fd = os.open(self.path, os.O_RDONLY)
4337 try:
4338 with os.scandir(fd) as it:
4339 entries = list(it)
4340 names = [entry.name for entry in entries]
4341 self.assertEqual(sorted(names), expected_names)
4342 self.assertEqual(names, os.listdir(fd))
4343 for entry in entries:
4344 self.assertEqual(entry.path, entry.name)
4345 self.assertEqual(os.fspath(entry), entry.name)
4346 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4347 if os.stat in os.supports_dir_fd:
4348 st = os.stat(entry.name, dir_fd=fd)
4349 self.assertEqual(entry.stat(), st)
4350 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4351 self.assertEqual(entry.stat(follow_symlinks=False), st)
4352 finally:
4353 os.close(fd)
4354
Victor Stinner6036e442015-03-08 01:58:04 +01004355 def test_empty_path(self):
4356 self.assertRaises(FileNotFoundError, os.scandir, '')
4357
4358 def test_consume_iterator_twice(self):
4359 self.create_file("file.txt")
4360 iterator = os.scandir(self.path)
4361
4362 entries = list(iterator)
4363 self.assertEqual(len(entries), 1, entries)
4364
4365 # check than consuming the iterator twice doesn't raise exception
4366 entries2 = list(iterator)
4367 self.assertEqual(len(entries2), 0, entries2)
4368
4369 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004370 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004371 self.assertRaises(TypeError, os.scandir, obj)
4372
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004373 def test_close(self):
4374 self.create_file("file.txt")
4375 self.create_file("file2.txt")
4376 iterator = os.scandir(self.path)
4377 next(iterator)
4378 iterator.close()
4379 # multiple closes
4380 iterator.close()
4381 with self.check_no_resource_warning():
4382 del iterator
4383
4384 def test_context_manager(self):
4385 self.create_file("file.txt")
4386 self.create_file("file2.txt")
4387 with os.scandir(self.path) as iterator:
4388 next(iterator)
4389 with self.check_no_resource_warning():
4390 del iterator
4391
4392 def test_context_manager_close(self):
4393 self.create_file("file.txt")
4394 self.create_file("file2.txt")
4395 with os.scandir(self.path) as iterator:
4396 next(iterator)
4397 iterator.close()
4398
4399 def test_context_manager_exception(self):
4400 self.create_file("file.txt")
4401 self.create_file("file2.txt")
4402 with self.assertRaises(ZeroDivisionError):
4403 with os.scandir(self.path) as iterator:
4404 next(iterator)
4405 1/0
4406 with self.check_no_resource_warning():
4407 del iterator
4408
4409 def test_resource_warning(self):
4410 self.create_file("file.txt")
4411 self.create_file("file2.txt")
4412 iterator = os.scandir(self.path)
4413 next(iterator)
4414 with self.assertWarns(ResourceWarning):
4415 del iterator
4416 support.gc_collect()
4417 # exhausted iterator
4418 iterator = os.scandir(self.path)
4419 list(iterator)
4420 with self.check_no_resource_warning():
4421 del iterator
4422
Victor Stinner6036e442015-03-08 01:58:04 +01004423
Ethan Furmancdc08792016-06-02 15:06:09 -07004424class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004425
4426 # Abstracted so it can be overridden to test pure Python implementation
4427 # if a C version is provided.
4428 fspath = staticmethod(os.fspath)
4429
Ethan Furmancdc08792016-06-02 15:06:09 -07004430 def test_return_bytes(self):
4431 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004432 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004433
4434 def test_return_string(self):
4435 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004436 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004437
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004438 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004439 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004440 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004441
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004442 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004443 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4444 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4445
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004446 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004447 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4448 self.assertTrue(issubclass(FakePath, os.PathLike))
4449 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004450
Ethan Furmancdc08792016-06-02 15:06:09 -07004451 def test_garbage_in_exception_out(self):
4452 vapor = type('blah', (), {})
4453 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004454 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004455
4456 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004457 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004458
Brett Cannon044283a2016-07-15 10:41:49 -07004459 def test_bad_pathlike(self):
4460 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004461 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004462 # __fspath__ attribute that is not callable.
4463 c = type('foo', (), {})
4464 c.__fspath__ = 1
4465 self.assertRaises(TypeError, self.fspath, c())
4466 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004467 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004468 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004469
Bar Hareleae87e32019-12-22 11:57:27 +02004470 def test_pathlike_subclasshook(self):
4471 # bpo-38878: subclasshook causes subclass checks
4472 # true on abstract implementation.
4473 class A(os.PathLike):
4474 pass
4475 self.assertFalse(issubclass(FakePath, A))
4476 self.assertTrue(issubclass(FakePath, os.PathLike))
4477
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004478 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004479 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004480
Victor Stinnerc29b5852017-11-02 07:28:27 -07004481
4482class TimesTests(unittest.TestCase):
4483 def test_times(self):
4484 times = os.times()
4485 self.assertIsInstance(times, os.times_result)
4486
4487 for field in ('user', 'system', 'children_user', 'children_system',
4488 'elapsed'):
4489 value = getattr(times, field)
4490 self.assertIsInstance(value, float)
4491
4492 if os.name == 'nt':
4493 self.assertEqual(times.children_user, 0)
4494 self.assertEqual(times.children_system, 0)
4495 self.assertEqual(times.elapsed, 0)
4496
4497
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004498# Only test if the C version is provided, otherwise TestPEP519 already tested
4499# the pure Python implementation.
4500if hasattr(os, "_fspath"):
4501 class TestPEP519PurePython(TestPEP519):
4502
4503 """Explicitly test the pure Python implementation of os.fspath()."""
4504
4505 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004506
4507
Fred Drake2e2be372001-09-20 21:33:42 +00004508if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004509 unittest.main()