blob: 684e308ad3a0596af73d3f8103bc4c7ff4d10526 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00006import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02007import decimal
8import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -07009import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020010import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020011import itertools
12import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000013import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020014import os
15import pickle
Christian Heimescd9fed62020-11-13 19:48:52 +010016import select
Victor Stinner47aacc82015-06-12 17:26:23 +020017import shutil
18import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000019import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010020import stat
Christian Heimescd9fed62020-11-13 19:48:52 +010021import struct
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070028import types
Victor Stinner47aacc82015-06-12 17:26:23 +020029import unittest
30import uuid
31import warnings
32from test import support
Hai Shid94af3f2020-08-08 17:32:41 +080033from test.support import import_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080034from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030035from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080036from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080037from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070038from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020039
Miss Islington (bot)8bec9fb2021-06-24 16:38:01 -070040with warnings.catch_warnings():
41 warnings.simplefilter('ignore', DeprecationWarning)
42 import asynchat
43 import asyncore
44
Antoine Pitrouec34ab52013-08-16 20:44:38 +020045try:
46 import resource
47except ImportError:
48 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010053try:
54 import _winapi
55except ImportError:
56 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020057try:
R David Murrayf2ad1732014-12-25 18:36:56 -050058 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010060except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050061 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Christian Heimescd9fed62020-11-13 19:48:52 +010067
Berker Peksagce643912015-05-06 06:33:17 +030068from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080069from test.support import unix_shell
70from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000071
Victor Stinner923590e2016-03-24 09:11:48 +010072
R David Murrayf2ad1732014-12-25 18:36:56 -050073root_in_posix = False
74if hasattr(os, 'geteuid'):
75 root_in_posix = (os.geteuid() == 0)
76
Mark Dickinson7cf03892010-04-16 13:45:35 +000077# Detect whether we're on a Linux system that uses the (now outdated
78# and unmaintained) linuxthreads threading library. There's an issue
79# when combining linuxthreads with a failed execv call: see
80# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020081if hasattr(sys, 'thread_info') and sys.thread_info.version:
82 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
83else:
84 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000085
Stefan Krahebee49a2013-01-17 15:31:00 +010086# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
87HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
88
Victor Stinner923590e2016-03-24 09:11:48 +010089
Berker Peksag4af23d72016-09-15 20:32:44 +030090def requires_os_func(name):
91 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
92
93
Victor Stinnerae39d232016-03-24 17:12:55 +010094def create_file(filename, content=b'content'):
95 with open(filename, "xb", 0) as fp:
96 fp.write(content)
97
98
Victor Stinner1de61d32020-11-17 23:08:10 +010099# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
100requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
101 'on AIX, splice() only accepts sockets')
102
103
Victor Stinner689830e2019-06-26 17:31:12 +0200104class MiscTests(unittest.TestCase):
105 def test_getcwd(self):
106 cwd = os.getcwd()
107 self.assertIsInstance(cwd, str)
108
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200109 def test_getcwd_long_path(self):
110 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
111 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
112 # longer path if longer paths support is enabled. Internally, the os
113 # module uses MAXPATHLEN which is at least 1024.
114 #
115 # Use a directory name of 200 characters to fit into Windows MAX_PATH
116 # limit.
117 #
118 # On Windows, the test can stop when trying to create a path longer
119 # than MAX_PATH if long paths support is disabled:
120 # see RtlAreLongPathsEnabled().
121 min_len = 2000 # characters
pxinwraa1b8a12020-11-29 04:21:30 +0800122 # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path
123 # longer than PATH_MAX will fail.
124 if sys.platform == 'vxworks':
125 min_len = 1000
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200126 dirlen = 200 # characters
127 dirname = 'python_test_dir_'
128 dirname = dirname + ('a' * (dirlen - len(dirname)))
129
130 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800131 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200132 expected = path
133
134 while True:
135 cwd = os.getcwd()
136 self.assertEqual(cwd, expected)
137
138 need = min_len - (len(cwd) + len(os.path.sep))
139 if need <= 0:
140 break
141 if len(dirname) > need and need > 0:
142 dirname = dirname[:need]
143
144 path = os.path.join(path, dirname)
145 try:
146 os.mkdir(path)
147 # On Windows, chdir() can fail
148 # even if mkdir() succeeded
149 os.chdir(path)
150 except FileNotFoundError:
151 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
152 # ERROR_FILENAME_EXCED_RANGE (206) errors
153 # ("The filename or extension is too long")
154 break
155 except OSError as exc:
156 if exc.errno == errno.ENAMETOOLONG:
157 break
158 else:
159 raise
160
161 expected = path
162
163 if support.verbose:
164 print(f"Tested current directory length: {len(cwd)}")
165
Victor Stinner689830e2019-06-26 17:31:12 +0200166 def test_getcwdb(self):
167 cwd = os.getcwdb()
168 self.assertIsInstance(cwd, bytes)
169 self.assertEqual(os.fsdecode(cwd), os.getcwd())
170
171
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000172# Tests creating TESTFN
173class FileTests(unittest.TestCase):
174 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800175 if os.path.lexists(os_helper.TESTFN):
176 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000177 tearDown = setUp
178
179 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800180 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000181 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800182 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000183
Christian Heimesfdab48e2008-01-20 09:06:41 +0000184 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800185 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000186 # We must allocate two consecutive file descriptors, otherwise
187 # it will mess up other file descriptors (perhaps even the three
188 # standard ones).
189 second = os.dup(first)
190 try:
191 retries = 0
192 while second != first + 1:
193 os.close(first)
194 retries += 1
195 if retries > 10:
196 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000197 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000198 first, second = second, os.dup(second)
199 finally:
200 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000201 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000202 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000203 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000204
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000205 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000206 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800207 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000208 old = sys.getrefcount(path)
209 self.assertRaises(TypeError, os.rename, path, 0)
210 new = sys.getrefcount(path)
211 self.assertEqual(old, new)
212
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000213 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800214 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000215 fobj.write(b"spam")
216 fobj.flush()
217 fd = fobj.fileno()
218 os.lseek(fd, 0, 0)
219 s = os.read(fd, 4)
220 self.assertEqual(type(s), bytes)
221 self.assertEqual(s, b"spam")
222
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200223 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200224 # Skip the test on 32-bit platforms: the number of bytes must fit in a
225 # Py_ssize_t type
226 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
227 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200228 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
229 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800230 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
231 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200232
233 # Issue #21932: Make sure that os.read() does not raise an
234 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800235 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200236 data = os.read(fp.fileno(), size)
237
Victor Stinner8c663fd2017-11-08 14:44:44 -0800238 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200239 # operating system is free to return less bytes than requested.
240 self.assertEqual(data, b'test')
241
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000242 def test_write(self):
243 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800244 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000245 self.assertRaises(TypeError, os.write, fd, "beans")
246 os.write(fd, b"bacon\n")
247 os.write(fd, bytearray(b"eggs\n"))
248 os.write(fd, memoryview(b"spam\n"))
249 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800250 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000251 self.assertEqual(fobj.read().splitlines(),
252 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000253
Victor Stinnere0daff12011-03-20 23:36:35 +0100254 def write_windows_console(self, *args):
255 retcode = subprocess.call(args,
256 # use a new console to not flood the test output
257 creationflags=subprocess.CREATE_NEW_CONSOLE,
258 # use a shell to hide the console window (SW_HIDE)
259 shell=True)
260 self.assertEqual(retcode, 0)
261
262 @unittest.skipUnless(sys.platform == 'win32',
263 'test specific to the Windows console')
264 def test_write_windows_console(self):
265 # Issue #11395: the Windows console returns an error (12: not enough
266 # space error) on writing into stdout if stdout mode is binary and the
267 # length is greater than 66,000 bytes (or less, depending on heap
268 # usage).
269 code = "print('x' * 100000)"
270 self.write_windows_console(sys.executable, "-c", code)
271 self.write_windows_console(sys.executable, "-u", "-c", code)
272
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000273 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800274 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Inada Naokia6925652021-04-29 11:35:36 +0900275 f = os.fdopen(fd, *args, encoding="utf-8")
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200276 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000277
278 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800279 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200280 os.close(fd)
281
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000282 self.fdopen_helper()
283 self.fdopen_helper('r')
284 self.fdopen_helper('r', 100)
285
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100286 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800287 TESTFN2 = os_helper.TESTFN + ".2"
288 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
289 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100290
Hai Shi0c4f0f32020-06-30 21:46:31 +0800291 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100292 create_file(TESTFN2, b"2")
293
Hai Shi0c4f0f32020-06-30 21:46:31 +0800294 os.replace(os_helper.TESTFN, TESTFN2)
295 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Inada Naokia6925652021-04-29 11:35:36 +0900296 with open(TESTFN2, 'r', encoding='utf-8') as f:
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100297 self.assertEqual(f.read(), "1")
298
Martin Panterbf19d162015-09-09 01:01:13 +0000299 def test_open_keywords(self):
300 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
301 dir_fd=None)
302 os.close(f)
303
304 def test_symlink_keywords(self):
305 symlink = support.get_attribute(os, "symlink")
306 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800307 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000308 target_is_directory=False, dir_fd=None)
309 except (NotImplementedError, OSError):
310 pass # No OS support or unprivileged user
311
Pablo Galindoaac4d032019-05-31 19:39:47 +0100312 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
313 def test_copy_file_range_invalid_values(self):
314 with self.assertRaises(ValueError):
315 os.copy_file_range(0, 1, -10)
316
317 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
318 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800319 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100320 data = b'0123456789'
321
Hai Shi0c4f0f32020-06-30 21:46:31 +0800322 create_file(os_helper.TESTFN, data)
323 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100324
Hai Shi0c4f0f32020-06-30 21:46:31 +0800325 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100326 self.addCleanup(in_file.close)
327 in_fd = in_file.fileno()
328
329 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800330 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100331 self.addCleanup(out_file.close)
332 out_fd = out_file.fileno()
333
334 try:
335 i = os.copy_file_range(in_fd, out_fd, 5)
336 except OSError as e:
337 # Handle the case in which Python was compiled
338 # in a system with the syscall but without support
339 # in the kernel.
340 if e.errno != errno.ENOSYS:
341 raise
342 self.skipTest(e)
343 else:
344 # The number of copied bytes can be less than
345 # the number of bytes originally requested.
346 self.assertIn(i, range(0, 6));
347
348 with open(TESTFN2, 'rb') as in_file:
349 self.assertEqual(in_file.read(), data[:i])
350
351 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
352 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800353 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100354 data = b'0123456789'
355 bytes_to_copy = 6
356 in_skip = 3
357 out_seek = 5
358
Hai Shi0c4f0f32020-06-30 21:46:31 +0800359 create_file(os_helper.TESTFN, data)
360 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100361
Hai Shi0c4f0f32020-06-30 21:46:31 +0800362 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100363 self.addCleanup(in_file.close)
364 in_fd = in_file.fileno()
365
366 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800367 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100368 self.addCleanup(out_file.close)
369 out_fd = out_file.fileno()
370
371 try:
372 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
373 offset_src=in_skip,
374 offset_dst=out_seek)
375 except OSError as e:
376 # Handle the case in which Python was compiled
377 # in a system with the syscall but without support
378 # in the kernel.
379 if e.errno != errno.ENOSYS:
380 raise
381 self.skipTest(e)
382 else:
383 # The number of copied bytes can be less than
384 # the number of bytes originally requested.
385 self.assertIn(i, range(0, bytes_to_copy+1));
386
387 with open(TESTFN4, 'rb') as in_file:
388 read = in_file.read()
389 # seeked bytes (5) are zero'ed
390 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
391 # 012 are skipped (in_skip)
392 # 345678 are copied in the file (in_skip + bytes_to_copy)
393 self.assertEqual(read[out_seek:],
394 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200395
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000396 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
397 def test_splice_invalid_values(self):
398 with self.assertRaises(ValueError):
399 os.splice(0, 1, -10)
400
401 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100402 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000403 def test_splice(self):
404 TESTFN2 = os_helper.TESTFN + ".3"
405 data = b'0123456789'
406
407 create_file(os_helper.TESTFN, data)
408 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
409
410 in_file = open(os_helper.TESTFN, 'rb')
411 self.addCleanup(in_file.close)
412 in_fd = in_file.fileno()
413
414 read_fd, write_fd = os.pipe()
415 self.addCleanup(lambda: os.close(read_fd))
416 self.addCleanup(lambda: os.close(write_fd))
417
418 try:
419 i = os.splice(in_fd, write_fd, 5)
420 except OSError as e:
421 # Handle the case in which Python was compiled
422 # in a system with the syscall but without support
423 # in the kernel.
424 if e.errno != errno.ENOSYS:
425 raise
426 self.skipTest(e)
427 else:
428 # The number of copied bytes can be less than
429 # the number of bytes originally requested.
430 self.assertIn(i, range(0, 6));
431
432 self.assertEqual(os.read(read_fd, 100), data[:i])
433
434 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100435 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000436 def test_splice_offset_in(self):
437 TESTFN4 = os_helper.TESTFN + ".4"
438 data = b'0123456789'
439 bytes_to_copy = 6
440 in_skip = 3
441
442 create_file(os_helper.TESTFN, data)
443 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
444
445 in_file = open(os_helper.TESTFN, 'rb')
446 self.addCleanup(in_file.close)
447 in_fd = in_file.fileno()
448
449 read_fd, write_fd = os.pipe()
450 self.addCleanup(lambda: os.close(read_fd))
451 self.addCleanup(lambda: os.close(write_fd))
452
453 try:
454 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
455 except OSError as e:
456 # Handle the case in which Python was compiled
457 # in a system with the syscall but without support
458 # in the kernel.
459 if e.errno != errno.ENOSYS:
460 raise
461 self.skipTest(e)
462 else:
463 # The number of copied bytes can be less than
464 # the number of bytes originally requested.
465 self.assertIn(i, range(0, bytes_to_copy+1));
466
467 read = os.read(read_fd, 100)
468 # 012 are skipped (in_skip)
469 # 345678 are copied in the file (in_skip + bytes_to_copy)
470 self.assertEqual(read, data[in_skip:in_skip+i])
471
472 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
Victor Stinner1de61d32020-11-17 23:08:10 +0100473 @requires_splice_pipe
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000474 def test_splice_offset_out(self):
475 TESTFN4 = os_helper.TESTFN + ".4"
476 data = b'0123456789'
477 bytes_to_copy = 6
478 out_seek = 3
479
480 create_file(os_helper.TESTFN, data)
481 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
482
483 read_fd, write_fd = os.pipe()
484 self.addCleanup(lambda: os.close(read_fd))
485 self.addCleanup(lambda: os.close(write_fd))
486 os.write(write_fd, data)
487
488 out_file = open(TESTFN4, 'w+b')
489 self.addCleanup(os_helper.unlink, TESTFN4)
490 self.addCleanup(out_file.close)
491 out_fd = out_file.fileno()
492
493 try:
494 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
495 except OSError as e:
496 # Handle the case in which Python was compiled
497 # in a system with the syscall but without support
498 # in the kernel.
499 if e.errno != errno.ENOSYS:
500 raise
501 self.skipTest(e)
502 else:
503 # The number of copied bytes can be less than
504 # the number of bytes originally requested.
505 self.assertIn(i, range(0, bytes_to_copy+1));
506
507 with open(TESTFN4, 'rb') as in_file:
508 read = in_file.read()
509 # seeked bytes (5) are zero'ed
510 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
511 # 012 are skipped (in_skip)
512 # 345678 are copied in the file (in_skip + bytes_to_copy)
513 self.assertEqual(read[out_seek:], data[:i])
514
515
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000516# Test attributes on return values from os.*stat* family.
517class StatAttributeTests(unittest.TestCase):
518 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800519 self.fname = os_helper.TESTFN
520 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100521 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000522
Antoine Pitrou38425292010-09-21 18:19:07 +0000523 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000524 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000525
526 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000527 self.assertEqual(result[stat.ST_SIZE], 3)
528 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000529
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000530 # Make sure all the attributes are there
531 members = dir(result)
532 for name in dir(stat):
533 if name[:3] == 'ST_':
534 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000535 if name.endswith("TIME"):
536 def trunc(x): return int(x)
537 else:
538 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000539 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000540 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000541 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000542
Larry Hastings6fe20b32012-04-19 15:07:49 -0700543 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700544 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700545 for name in 'st_atime st_mtime st_ctime'.split():
546 floaty = int(getattr(result, name) * 100000)
547 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700548 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700549
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000550 try:
551 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200552 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000553 except IndexError:
554 pass
555
556 # Make sure that assignment fails
557 try:
558 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200559 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000560 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000561 pass
562
563 try:
564 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200565 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000566 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000567 pass
568
569 try:
570 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200571 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000572 except AttributeError:
573 pass
574
575 # Use the stat_result constructor with a too-short tuple.
576 try:
577 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200578 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000579 except TypeError:
580 pass
581
Ezio Melotti42da6632011-03-15 05:18:48 +0200582 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000583 try:
584 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
585 except TypeError:
586 pass
587
Antoine Pitrou38425292010-09-21 18:19:07 +0000588 def test_stat_attributes(self):
589 self.check_stat_attributes(self.fname)
590
591 def test_stat_attributes_bytes(self):
592 try:
593 fname = self.fname.encode(sys.getfilesystemencoding())
594 except UnicodeEncodeError:
595 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700596 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000597
Christian Heimes25827622013-10-12 01:27:08 +0200598 def test_stat_result_pickle(self):
599 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200600 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
601 p = pickle.dumps(result, proto)
602 self.assertIn(b'stat_result', p)
603 if proto < 4:
604 self.assertIn(b'cos\nstat_result\n', p)
605 unpickled = pickle.loads(p)
606 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200607
Serhiy Storchaka43767632013-11-03 21:31:38 +0200608 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000609 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700610 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000611
612 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000613 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000614
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000615 # Make sure all the attributes are there.
616 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
617 'ffree', 'favail', 'flag', 'namemax')
618 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000619 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000620
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100621 self.assertTrue(isinstance(result.f_fsid, int))
622
623 # Test that the size of the tuple doesn't change
624 self.assertEqual(len(result), 10)
625
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000626 # Make sure that assignment really fails
627 try:
628 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200629 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000630 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000631 pass
632
633 try:
634 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200635 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000636 except AttributeError:
637 pass
638
639 # Use the constructor with a too-short tuple.
640 try:
641 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200642 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000643 except TypeError:
644 pass
645
Ezio Melotti42da6632011-03-15 05:18:48 +0200646 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000647 try:
648 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
649 except TypeError:
650 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000651
Christian Heimes25827622013-10-12 01:27:08 +0200652 @unittest.skipUnless(hasattr(os, 'statvfs'),
653 "need os.statvfs()")
654 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700655 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200656
Serhiy Storchakabad12572014-12-15 14:03:42 +0200657 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
658 p = pickle.dumps(result, proto)
659 self.assertIn(b'statvfs_result', p)
660 if proto < 4:
661 self.assertIn(b'cos\nstatvfs_result\n', p)
662 unpickled = pickle.loads(p)
663 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200664
Serhiy Storchaka43767632013-11-03 21:31:38 +0200665 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
666 def test_1686475(self):
667 # Verify that an open file can be stat'ed
668 try:
669 os.stat(r"c:\pagefile.sys")
670 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600671 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200672 except OSError as e:
673 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000674
Serhiy Storchaka43767632013-11-03 21:31:38 +0200675 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
676 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
677 def test_15261(self):
678 # Verify that stat'ing a closed fd does not cause crash
679 r, w = os.pipe()
680 try:
681 os.stat(r) # should not raise error
682 finally:
683 os.close(r)
684 os.close(w)
685 with self.assertRaises(OSError) as ctx:
686 os.stat(r)
687 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100688
Zachary Ware63f277b2014-06-19 09:46:37 -0500689 def check_file_attributes(self, result):
690 self.assertTrue(hasattr(result, 'st_file_attributes'))
691 self.assertTrue(isinstance(result.st_file_attributes, int))
692 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
693
694 @unittest.skipUnless(sys.platform == "win32",
695 "st_file_attributes is Win32 specific")
696 def test_file_attributes(self):
697 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
698 result = os.stat(self.fname)
699 self.check_file_attributes(result)
700 self.assertEqual(
701 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
702 0)
703
704 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800705 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200706 os.mkdir(dirname)
707 self.addCleanup(os.rmdir, dirname)
708
709 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500710 self.check_file_attributes(result)
711 self.assertEqual(
712 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
713 stat.FILE_ATTRIBUTE_DIRECTORY)
714
Berker Peksag0b4dc482016-09-17 15:49:59 +0300715 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
716 def test_access_denied(self):
717 # Default to FindFirstFile WIN32_FIND_DATA when access is
718 # denied. See issue 28075.
719 # os.environ['TEMP'] should be located on a volume that
720 # supports file ACLs.
721 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800722 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300723 create_file(fname, b'ABC')
724 # Deny the right to [S]YNCHRONIZE on the file to
725 # force CreateFile to fail with ERROR_ACCESS_DENIED.
726 DETACHED_PROCESS = 8
727 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500728 # bpo-30584: Use security identifier *S-1-5-32-545 instead
729 # of localized "Users" to not depend on the locale.
730 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300731 creationflags=DETACHED_PROCESS
732 )
733 result = os.stat(fname)
734 self.assertNotEqual(result.st_size, 0)
735
Steve Dower772ec0f2019-09-04 14:42:54 -0700736 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
737 def test_stat_block_device(self):
738 # bpo-38030: os.stat fails for block devices
739 # Test a filename like "//./C:"
740 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
741 result = os.stat(fname)
742 self.assertEqual(result.st_mode, stat.S_IFBLK)
743
Victor Stinner47aacc82015-06-12 17:26:23 +0200744
745class UtimeTests(unittest.TestCase):
746 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800747 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200748 self.fname = os.path.join(self.dirname, "f1")
749
Hai Shi0c4f0f32020-06-30 21:46:31 +0800750 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200751 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100752 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200753
Victor Stinner47aacc82015-06-12 17:26:23 +0200754 def support_subsecond(self, filename):
755 # Heuristic to check if the filesystem supports timestamp with
756 # subsecond resolution: check if float and int timestamps are different
757 st = os.stat(filename)
758 return ((st.st_atime != st[7])
759 or (st.st_mtime != st[8])
760 or (st.st_ctime != st[9]))
761
762 def _test_utime(self, set_time, filename=None):
763 if not filename:
764 filename = self.fname
765
766 support_subsecond = self.support_subsecond(filename)
767 if support_subsecond:
768 # Timestamp with a resolution of 1 microsecond (10^-6).
769 #
770 # The resolution of the C internal function used by os.utime()
771 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
772 # test with a resolution of 1 ns requires more work:
773 # see the issue #15745.
774 atime_ns = 1002003000 # 1.002003 seconds
775 mtime_ns = 4005006000 # 4.005006 seconds
776 else:
777 # use a resolution of 1 second
778 atime_ns = 5 * 10**9
779 mtime_ns = 8 * 10**9
780
781 set_time(filename, (atime_ns, mtime_ns))
782 st = os.stat(filename)
783
784 if support_subsecond:
785 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
786 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
787 else:
788 self.assertEqual(st.st_atime, atime_ns * 1e-9)
789 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
790 self.assertEqual(st.st_atime_ns, atime_ns)
791 self.assertEqual(st.st_mtime_ns, mtime_ns)
792
793 def test_utime(self):
794 def set_time(filename, ns):
795 # test the ns keyword parameter
796 os.utime(filename, ns=ns)
797 self._test_utime(set_time)
798
799 @staticmethod
800 def ns_to_sec(ns):
801 # Convert a number of nanosecond (int) to a number of seconds (float).
802 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
803 # issue, os.utime() rounds towards minus infinity.
804 return (ns * 1e-9) + 0.5e-9
805
806 def test_utime_by_indexed(self):
807 # pass times as floating point seconds as the second indexed parameter
808 def set_time(filename, ns):
809 atime_ns, mtime_ns = ns
810 atime = self.ns_to_sec(atime_ns)
811 mtime = self.ns_to_sec(mtime_ns)
812 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
813 # or utime(time_t)
814 os.utime(filename, (atime, mtime))
815 self._test_utime(set_time)
816
817 def test_utime_by_times(self):
818 def set_time(filename, ns):
819 atime_ns, mtime_ns = ns
820 atime = self.ns_to_sec(atime_ns)
821 mtime = self.ns_to_sec(mtime_ns)
822 # test the times keyword parameter
823 os.utime(filename, times=(atime, mtime))
824 self._test_utime(set_time)
825
826 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
827 "follow_symlinks support for utime required "
828 "for this test.")
829 def test_utime_nofollow_symlinks(self):
830 def set_time(filename, ns):
831 # use follow_symlinks=False to test utimensat(timespec)
832 # or lutimes(timeval)
833 os.utime(filename, ns=ns, follow_symlinks=False)
834 self._test_utime(set_time)
835
836 @unittest.skipUnless(os.utime in os.supports_fd,
837 "fd support for utime required for this test.")
838 def test_utime_fd(self):
839 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100840 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200841 # use a file descriptor to test futimens(timespec)
842 # or futimes(timeval)
843 os.utime(fp.fileno(), ns=ns)
844 self._test_utime(set_time)
845
846 @unittest.skipUnless(os.utime in os.supports_dir_fd,
847 "dir_fd support for utime required for this test.")
848 def test_utime_dir_fd(self):
849 def set_time(filename, ns):
850 dirname, name = os.path.split(filename)
851 dirfd = os.open(dirname, os.O_RDONLY)
852 try:
853 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
854 os.utime(name, dir_fd=dirfd, ns=ns)
855 finally:
856 os.close(dirfd)
857 self._test_utime(set_time)
858
859 def test_utime_directory(self):
860 def set_time(filename, ns):
861 # test calling os.utime() on a directory
862 os.utime(filename, ns=ns)
863 self._test_utime(set_time, filename=self.dirname)
864
865 def _test_utime_current(self, set_time):
866 # Get the system clock
867 current = time.time()
868
869 # Call os.utime() to set the timestamp to the current system clock
870 set_time(self.fname)
871
872 if not self.support_subsecond(self.fname):
873 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700874 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200875 # On Windows, the usual resolution of time.time() is 15.6 ms.
876 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700877 #
878 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
879 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200880 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200881 st = os.stat(self.fname)
882 msg = ("st_time=%r, current=%r, dt=%r"
883 % (st.st_mtime, current, st.st_mtime - current))
884 self.assertAlmostEqual(st.st_mtime, current,
885 delta=delta, msg=msg)
886
887 def test_utime_current(self):
888 def set_time(filename):
889 # Set to the current time in the new way
890 os.utime(self.fname)
891 self._test_utime_current(set_time)
892
893 def test_utime_current_old(self):
894 def set_time(filename):
895 # Set to the current time in the old explicit way.
896 os.utime(self.fname, None)
897 self._test_utime_current(set_time)
898
899 def get_file_system(self, path):
900 if sys.platform == 'win32':
901 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
902 import ctypes
903 kernel32 = ctypes.windll.kernel32
904 buf = ctypes.create_unicode_buffer("", 100)
905 ok = kernel32.GetVolumeInformationW(root, None, 0,
906 None, None, None,
907 buf, len(buf))
908 if ok:
909 return buf.value
910 # return None if the filesystem is unknown
911
912 def test_large_time(self):
913 # Many filesystems are limited to the year 2038. At least, the test
914 # pass with NTFS filesystem.
915 if self.get_file_system(self.dirname) != "NTFS":
916 self.skipTest("requires NTFS")
917
918 large = 5000000000 # some day in 2128
919 os.utime(self.fname, (large, large))
920 self.assertEqual(os.stat(self.fname).st_mtime, large)
921
922 def test_utime_invalid_arguments(self):
923 # seconds and nanoseconds parameters are mutually exclusive
924 with self.assertRaises(ValueError):
925 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200926 with self.assertRaises(TypeError):
927 os.utime(self.fname, [5, 5])
928 with self.assertRaises(TypeError):
929 os.utime(self.fname, (5,))
930 with self.assertRaises(TypeError):
931 os.utime(self.fname, (5, 5, 5))
932 with self.assertRaises(TypeError):
933 os.utime(self.fname, ns=[5, 5])
934 with self.assertRaises(TypeError):
935 os.utime(self.fname, ns=(5,))
936 with self.assertRaises(TypeError):
937 os.utime(self.fname, ns=(5, 5, 5))
938
939 if os.utime not in os.supports_follow_symlinks:
940 with self.assertRaises(NotImplementedError):
941 os.utime(self.fname, (5, 5), follow_symlinks=False)
942 if os.utime not in os.supports_fd:
943 with open(self.fname, 'wb', 0) as fp:
944 with self.assertRaises(TypeError):
945 os.utime(fp.fileno(), (5, 5))
946 if os.utime not in os.supports_dir_fd:
947 with self.assertRaises(NotImplementedError):
948 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200949
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300950 @support.cpython_only
951 def test_issue31577(self):
952 # The interpreter shouldn't crash in case utime() received a bad
953 # ns argument.
954 def get_bad_int(divmod_ret_val):
955 class BadInt:
956 def __divmod__(*args):
957 return divmod_ret_val
958 return BadInt()
959 with self.assertRaises(TypeError):
960 os.utime(self.fname, ns=(get_bad_int(42), 1))
961 with self.assertRaises(TypeError):
962 os.utime(self.fname, ns=(get_bad_int(()), 1))
963 with self.assertRaises(TypeError):
964 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
965
Victor Stinner47aacc82015-06-12 17:26:23 +0200966
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000967from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000968
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000969class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000970 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000971 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000972
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000973 def setUp(self):
974 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000975 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000976 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000977 for key, value in self._reference().items():
978 os.environ[key] = value
979
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000980 def tearDown(self):
981 os.environ.clear()
982 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000983 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000984 os.environb.clear()
985 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000986
Christian Heimes90333392007-11-01 19:08:42 +0000987 def _reference(self):
988 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
989
990 def _empty_mapping(self):
991 os.environ.clear()
992 return os.environ
993
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000994 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200995 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
996 'requires a shell')
pxinwre1e3c2d2020-12-16 05:20:07 +0800997 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
Martin v. Löwis5510f652005-02-17 21:23:20 +0000998 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000999 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +03001000 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +02001001 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +03001002 value = popen.read().strip()
1003 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +00001004
Xavier de Gayed1415312016-07-22 12:15:29 +02001005 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1006 'requires a shell')
pxinwre1e3c2d2020-12-16 05:20:07 +08001007 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
Christian Heimes1a13d592007-11-08 14:16:55 +00001008 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +02001009 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
1010 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +03001011 it = iter(popen)
1012 self.assertEqual(next(it), "line1\n")
1013 self.assertEqual(next(it), "line2\n")
1014 self.assertEqual(next(it), "line3\n")
1015 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +00001016
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001017 # Verify environ keys and values from the OS are of the
1018 # correct str type.
1019 def test_keyvalue_types(self):
1020 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001021 self.assertEqual(type(key), str)
1022 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001023
Christian Heimes90333392007-11-01 19:08:42 +00001024 def test_items(self):
1025 for key, value in self._reference().items():
1026 self.assertEqual(os.environ.get(key), value)
1027
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001028 # Issue 7310
1029 def test___repr__(self):
1030 """Check that the repr() of os.environ looks like environ({...})."""
1031 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +00001032 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
1033 '{!r}: {!r}'.format(key, value)
1034 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001035
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +00001036 def test_get_exec_path(self):
1037 defpath_list = os.defpath.split(os.pathsep)
1038 test_path = ['/monty', '/python', '', '/flying/circus']
1039 test_env = {'PATH': os.pathsep.join(test_path)}
1040
1041 saved_environ = os.environ
1042 try:
1043 os.environ = dict(test_env)
1044 # Test that defaulting to os.environ works.
1045 self.assertSequenceEqual(test_path, os.get_exec_path())
1046 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1047 finally:
1048 os.environ = saved_environ
1049
1050 # No PATH environment variable
1051 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1052 # Empty PATH environment variable
1053 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1054 # Supplied PATH environment variable
1055 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1056
Victor Stinnerb745a742010-05-18 17:17:23 +00001057 if os.supports_bytes_environ:
1058 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +00001059 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001060 # ignore BytesWarning warning
1061 with warnings.catch_warnings(record=True):
1062 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +00001063 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001064 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +00001065 pass
1066 else:
1067 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +00001068
1069 # bytes key and/or value
1070 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1071 ['abc'])
1072 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1073 ['abc'])
1074 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1075 ['abc'])
1076
1077 @unittest.skipUnless(os.supports_bytes_environ,
1078 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +00001079 def test_environb(self):
1080 # os.environ -> os.environb
1081 value = 'euro\u20ac'
1082 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001083 value_bytes = value.encode(sys.getfilesystemencoding(),
1084 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +00001085 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001086 msg = "U+20AC character is not encodable to %s" % (
1087 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +00001088 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +00001089 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001090 self.assertEqual(os.environ['unicode'], value)
1091 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +00001092
1093 # os.environb -> os.environ
1094 value = b'\xff'
1095 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +00001097 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001098 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001099
Victor Stinner161e7b32020-01-24 11:53:44 +01001100 def test_putenv_unsetenv(self):
1101 name = "PYTHONTESTVAR"
1102 value = "testvalue"
1103 code = f'import os; print(repr(os.environ.get({name!r})))'
1104
Hai Shi0c4f0f32020-06-30 21:46:31 +08001105 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +01001106 env.pop(name, None)
1107
1108 os.putenv(name, value)
1109 proc = subprocess.run([sys.executable, '-c', code], check=True,
1110 stdout=subprocess.PIPE, text=True)
1111 self.assertEqual(proc.stdout.rstrip(), repr(value))
1112
1113 os.unsetenv(name)
1114 proc = subprocess.run([sys.executable, '-c', code], check=True,
1115 stdout=subprocess.PIPE, text=True)
1116 self.assertEqual(proc.stdout.rstrip(), repr(None))
1117
Victor Stinner13ff2452018-01-22 18:32:50 +01001118 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +01001119 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +01001120 def test_putenv_unsetenv_error(self):
1121 # Empty variable name is invalid.
1122 # "=" and null character are not allowed in a variable name.
1123 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1124 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1125 self.assertRaises((OSError, ValueError), os.unsetenv, name)
1126
Victor Stinnerb73dd022020-01-22 21:11:17 +01001127 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +01001128 # On Windows, an environment variable string ("name=value" string)
1129 # is limited to 32,767 characters
1130 longstr = 'x' * 32_768
1131 self.assertRaises(ValueError, os.putenv, longstr, "1")
1132 self.assertRaises(ValueError, os.putenv, "X", longstr)
1133 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +01001134
Victor Stinner6d101392013-04-14 16:35:04 +02001135 def test_key_type(self):
1136 missing = 'missingkey'
1137 self.assertNotIn(missing, os.environ)
1138
Victor Stinner839e5ea2013-04-14 16:43:03 +02001139 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001140 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001141 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001142 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001143
Victor Stinner839e5ea2013-04-14 16:43:03 +02001144 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001145 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001146 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001147 self.assertTrue(cm.exception.__suppress_context__)
1148
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001149 def _test_environ_iteration(self, collection):
1150 iterator = iter(collection)
1151 new_key = "__new_key__"
1152
1153 next(iterator) # start iteration over os.environ.items
1154
1155 # add a new key in os.environ mapping
1156 os.environ[new_key] = "test_environ_iteration"
1157
1158 try:
1159 next(iterator) # force iteration over modified mapping
1160 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1161 finally:
1162 del os.environ[new_key]
1163
1164 def test_iter_error_when_changing_os_environ(self):
1165 self._test_environ_iteration(os.environ)
1166
1167 def test_iter_error_when_changing_os_environ_items(self):
1168 self._test_environ_iteration(os.environ.items())
1169
1170 def test_iter_error_when_changing_os_environ_values(self):
1171 self._test_environ_iteration(os.environ.values())
1172
Charles Burklandd648ef12020-03-13 09:04:43 -07001173 def _test_underlying_process_env(self, var, expected):
1174 if not (unix_shell and os.path.exists(unix_shell)):
1175 return
1176
1177 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1178 value = popen.read().strip()
1179
1180 self.assertEqual(expected, value)
1181
1182 def test_or_operator(self):
1183 overridden_key = '_TEST_VAR_'
1184 original_value = 'original_value'
1185 os.environ[overridden_key] = original_value
1186
1187 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1188 expected = dict(os.environ)
1189 expected.update(new_vars_dict)
1190
1191 actual = os.environ | new_vars_dict
1192 self.assertDictEqual(expected, actual)
1193 self.assertEqual('3', actual[overridden_key])
1194
1195 new_vars_items = new_vars_dict.items()
1196 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1197
1198 self._test_underlying_process_env('_A_', '')
1199 self._test_underlying_process_env(overridden_key, original_value)
1200
1201 def test_ior_operator(self):
1202 overridden_key = '_TEST_VAR_'
1203 os.environ[overridden_key] = 'original_value'
1204
1205 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1206 expected = dict(os.environ)
1207 expected.update(new_vars_dict)
1208
1209 os.environ |= new_vars_dict
1210 self.assertEqual(expected, os.environ)
1211 self.assertEqual('3', os.environ[overridden_key])
1212
1213 self._test_underlying_process_env('_A_', '1')
1214 self._test_underlying_process_env(overridden_key, '3')
1215
1216 def test_ior_operator_invalid_dicts(self):
1217 os_environ_copy = os.environ.copy()
1218 with self.assertRaises(TypeError):
1219 dict_with_bad_key = {1: '_A_'}
1220 os.environ |= dict_with_bad_key
1221
1222 with self.assertRaises(TypeError):
1223 dict_with_bad_val = {'_A_': 1}
1224 os.environ |= dict_with_bad_val
1225
1226 # Check nothing was added.
1227 self.assertEqual(os_environ_copy, os.environ)
1228
1229 def test_ior_operator_key_value_iterable(self):
1230 overridden_key = '_TEST_VAR_'
1231 os.environ[overridden_key] = 'original_value'
1232
1233 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1234 expected = dict(os.environ)
1235 expected.update(new_vars_items)
1236
1237 os.environ |= new_vars_items
1238 self.assertEqual(expected, os.environ)
1239 self.assertEqual('3', os.environ[overridden_key])
1240
1241 self._test_underlying_process_env('_A_', '1')
1242 self._test_underlying_process_env(overridden_key, '3')
1243
1244 def test_ror_operator(self):
1245 overridden_key = '_TEST_VAR_'
1246 original_value = 'original_value'
1247 os.environ[overridden_key] = original_value
1248
1249 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1250 expected = dict(new_vars_dict)
1251 expected.update(os.environ)
1252
1253 actual = new_vars_dict | os.environ
1254 self.assertDictEqual(expected, actual)
1255 self.assertEqual(original_value, actual[overridden_key])
1256
1257 new_vars_items = new_vars_dict.items()
1258 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1259
1260 self._test_underlying_process_env('_A_', '')
1261 self._test_underlying_process_env(overridden_key, original_value)
1262
Victor Stinner6d101392013-04-14 16:35:04 +02001263
Tim Petersc4e09402003-04-25 07:11:48 +00001264class WalkTests(unittest.TestCase):
1265 """Tests for os.walk()."""
1266
Victor Stinner0561c532015-03-12 10:28:24 +01001267 # Wrapper to hide minor differences between os.walk and os.fwalk
1268 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001269 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001270 if 'follow_symlinks' in kwargs:
1271 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001272 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001273
Charles-François Natali7372b062012-02-05 15:15:38 +01001274 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001275 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001276 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001277
1278 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001279 # TESTFN/
1280 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001281 # tmp1
1282 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001283 # tmp2
1284 # SUB11/ no kids
1285 # SUB2/ a file kid and a dirsymlink kid
1286 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001287 # SUB21/ not readable
1288 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001289 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001290 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001291 # broken_link2
1292 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001293 # TEST2/
1294 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001295 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001296 self.sub1_path = join(self.walk_path, "SUB1")
1297 self.sub11_path = join(self.sub1_path, "SUB11")
1298 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001299 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001300 tmp1_path = join(self.walk_path, "tmp1")
1301 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001302 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001303 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001304 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001305 t2_path = join(os_helper.TESTFN, "TEST2")
1306 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001307 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001308 broken_link2_path = join(sub2_path, "broken_link2")
1309 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001310
1311 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001312 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001313 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001314 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001315 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001316
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001317 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001318 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001319 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001320
Hai Shi0c4f0f32020-06-30 21:46:31 +08001321 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001322 os.symlink(os.path.abspath(t2_path), self.link_path)
1323 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001324 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1325 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001326 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001327 ["broken_link", "broken_link2", "broken_link3",
1328 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001329 else:
pxinwr3e028b22019-02-15 13:04:47 +08001330 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001331
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001332 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001333 try:
1334 os.listdir(sub21_path)
1335 except PermissionError:
1336 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1337 else:
1338 os.chmod(sub21_path, stat.S_IRWXU)
1339 os.unlink(tmp5_path)
1340 os.rmdir(sub21_path)
1341 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001342
Victor Stinner0561c532015-03-12 10:28:24 +01001343 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001344 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001345 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001346
Tim Petersc4e09402003-04-25 07:11:48 +00001347 self.assertEqual(len(all), 4)
1348 # We can't know which order SUB1 and SUB2 will appear in.
1349 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1350 # flipped: TESTFN, SUB2, SUB1, SUB11
1351 flipped = all[0][1][0] != "SUB1"
1352 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001353 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001354 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001355 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1356 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1357 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1358 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001359
Brett Cannon3f9183b2016-08-26 14:44:48 -07001360 def test_walk_prune(self, walk_path=None):
1361 if walk_path is None:
1362 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001363 # Prune the search.
1364 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001365 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001366 all.append((root, dirs, files))
1367 # Don't descend into SUB1.
1368 if 'SUB1' in dirs:
1369 # Note that this also mutates the dirs we appended to all!
1370 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001371
Victor Stinner0561c532015-03-12 10:28:24 +01001372 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001373 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001374
1375 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001376 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001377 self.assertEqual(all[1], self.sub2_tree)
1378
Brett Cannon3f9183b2016-08-26 14:44:48 -07001379 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001380 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001381
Victor Stinner0561c532015-03-12 10:28:24 +01001382 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001383 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001384 all = list(self.walk(self.walk_path, topdown=False))
1385
Victor Stinner53b0a412016-03-26 01:12:36 +01001386 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001387 # We can't know which order SUB1 and SUB2 will appear in.
1388 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1389 # flipped: SUB2, SUB11, SUB1, TESTFN
1390 flipped = all[3][1][0] != "SUB1"
1391 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001392 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001393 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001394 self.assertEqual(all[3],
1395 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1396 self.assertEqual(all[flipped],
1397 (self.sub11_path, [], []))
1398 self.assertEqual(all[flipped + 1],
1399 (self.sub1_path, ["SUB11"], ["tmp2"]))
1400 self.assertEqual(all[2 - 2 * flipped],
1401 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001402
Victor Stinner0561c532015-03-12 10:28:24 +01001403 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001404 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001405 self.skipTest("need symlink support")
1406
1407 # Walk, following symlinks.
1408 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1409 for root, dirs, files in walk_it:
1410 if root == self.link_path:
1411 self.assertEqual(dirs, [])
1412 self.assertEqual(files, ["tmp4"])
1413 break
1414 else:
1415 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001416
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001417 def test_walk_bad_dir(self):
1418 # Walk top-down.
1419 errors = []
1420 walk_it = self.walk(self.walk_path, onerror=errors.append)
1421 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001422 self.assertEqual(errors, [])
1423 dir1 = 'SUB1'
1424 path1 = os.path.join(root, dir1)
1425 path1new = os.path.join(root, dir1 + '.new')
1426 os.rename(path1, path1new)
1427 try:
1428 roots = [r for r, d, f in walk_it]
1429 self.assertTrue(errors)
1430 self.assertNotIn(path1, roots)
1431 self.assertNotIn(path1new, roots)
1432 for dir2 in dirs:
1433 if dir2 != dir1:
1434 self.assertIn(os.path.join(root, dir2), roots)
1435 finally:
1436 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001437
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001438 def test_walk_many_open_files(self):
1439 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001440 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001441 p = os.path.join(base, *(['d']*depth))
1442 os.makedirs(p)
1443
1444 iters = [self.walk(base, topdown=False) for j in range(100)]
1445 for i in range(depth + 1):
1446 expected = (p, ['d'] if i else [], [])
1447 for it in iters:
1448 self.assertEqual(next(it), expected)
1449 p = os.path.dirname(p)
1450
1451 iters = [self.walk(base, topdown=True) for j in range(100)]
1452 p = base
1453 for i in range(depth + 1):
1454 expected = (p, ['d'] if i < depth else [], [])
1455 for it in iters:
1456 self.assertEqual(next(it), expected)
1457 p = os.path.join(p, 'd')
1458
Charles-François Natali7372b062012-02-05 15:15:38 +01001459
1460@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1461class FwalkTests(WalkTests):
1462 """Tests for os.fwalk()."""
1463
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001464 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001465 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001466 yield (root, dirs, files)
1467
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001468 def fwalk(self, *args, **kwargs):
1469 return os.fwalk(*args, **kwargs)
1470
Larry Hastingsc48fe982012-06-25 04:49:05 -07001471 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1472 """
1473 compare with walk() results.
1474 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001475 walk_kwargs = walk_kwargs.copy()
1476 fwalk_kwargs = fwalk_kwargs.copy()
1477 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1478 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1479 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001480
Charles-François Natali7372b062012-02-05 15:15:38 +01001481 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001482 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001483 expected[root] = (set(dirs), set(files))
1484
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001485 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001486 self.assertIn(root, expected)
1487 self.assertEqual(expected[root], (set(dirs), set(files)))
1488
Larry Hastingsc48fe982012-06-25 04:49:05 -07001489 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001490 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001491 self._compare_to_walk(kwargs, kwargs)
1492
Charles-François Natali7372b062012-02-05 15:15:38 +01001493 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001494 try:
1495 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001496 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001497 fwalk_kwargs = walk_kwargs.copy()
1498 fwalk_kwargs['dir_fd'] = fd
1499 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1500 finally:
1501 os.close(fd)
1502
1503 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001504 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001505 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001506 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001507 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001508 # check that the FD is valid
1509 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001510 # redundant check
1511 os.stat(rootfd)
1512 # check that listdir() returns consistent information
1513 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001514
1515 def test_fd_leak(self):
1516 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1517 # we both check that calling fwalk() a large number of times doesn't
1518 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1519 minfd = os.dup(1)
1520 os.close(minfd)
1521 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001522 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001523 pass
1524 newfd = os.dup(1)
1525 self.addCleanup(os.close, newfd)
1526 self.assertEqual(newfd, minfd)
1527
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001528 # fwalk() keeps file descriptors open
1529 test_walk_many_open_files = None
1530
1531
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001532class BytesWalkTests(WalkTests):
1533 """Tests for os.walk() with bytes."""
1534 def walk(self, top, **kwargs):
1535 if 'follow_symlinks' in kwargs:
1536 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1537 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1538 root = os.fsdecode(broot)
1539 dirs = list(map(os.fsdecode, bdirs))
1540 files = list(map(os.fsdecode, bfiles))
1541 yield (root, dirs, files)
1542 bdirs[:] = list(map(os.fsencode, dirs))
1543 bfiles[:] = list(map(os.fsencode, files))
1544
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001545@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1546class BytesFwalkTests(FwalkTests):
1547 """Tests for os.walk() with bytes."""
1548 def fwalk(self, top='.', *args, **kwargs):
1549 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1550 root = os.fsdecode(broot)
1551 dirs = list(map(os.fsdecode, bdirs))
1552 files = list(map(os.fsdecode, bfiles))
1553 yield (root, dirs, files, topfd)
1554 bdirs[:] = list(map(os.fsencode, dirs))
1555 bfiles[:] = list(map(os.fsencode, files))
1556
Charles-François Natali7372b062012-02-05 15:15:38 +01001557
Guido van Rossume7ba4952007-06-06 23:52:48 +00001558class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001559 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001560 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001561
1562 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001563 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001564 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1565 os.makedirs(path) # Should work
1566 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1567 os.makedirs(path)
1568
1569 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001570 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001571 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1572 os.makedirs(path)
1573 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1574 'dir5', 'dir6')
1575 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001576
Serhiy Storchakae304e332017-03-24 13:27:42 +02001577 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001578 with os_helper.temp_umask(0o002):
1579 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001580 parent = os.path.join(base, 'dir1')
1581 path = os.path.join(parent, 'dir2')
1582 os.makedirs(path, 0o555)
1583 self.assertTrue(os.path.exists(path))
1584 self.assertTrue(os.path.isdir(path))
1585 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001586 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1587 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001588
Terry Reedy5a22b652010-12-02 07:05:56 +00001589 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001590 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001591 mode = 0o777
1592 old_mask = os.umask(0o022)
1593 os.makedirs(path, mode)
1594 self.assertRaises(OSError, os.makedirs, path, mode)
1595 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001596 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001597 os.makedirs(path, mode=mode, exist_ok=True)
1598 os.umask(old_mask)
1599
Martin Pantera82642f2015-11-19 04:48:44 +00001600 # Issue #25583: A drive root could raise PermissionError on Windows
1601 os.makedirs(os.path.abspath('/'), exist_ok=True)
1602
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001603 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001604 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001605 S_ISGID = stat.S_ISGID
1606 mode = 0o777
1607 old_mask = os.umask(0o022)
1608 try:
1609 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001610 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001611 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001612 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001613 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001614 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001615 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001616 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1617 # The os should apply S_ISGID from the parent dir for us, but
1618 # this test need not depend on that behavior. Be explicit.
1619 os.makedirs(path, mode | S_ISGID)
1620 # http://bugs.python.org/issue14992
1621 # Should not fail when the bit is already set.
1622 os.makedirs(path, mode, exist_ok=True)
1623 # remove the bit.
1624 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001625 # May work even when the bit is not already set when demanded.
1626 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001627 finally:
1628 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001629
1630 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001631 base = os_helper.TESTFN
1632 path = os.path.join(os_helper.TESTFN, 'dir1')
Inada Naokia6925652021-04-29 11:35:36 +09001633 with open(path, 'w', encoding='utf-8') as f:
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001634 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001635 self.assertRaises(OSError, os.makedirs, path)
1636 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1637 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1638 os.remove(path)
1639
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001640 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001641 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001642 'dir4', 'dir5', 'dir6')
1643 # If the tests failed, the bottom-most directory ('../dir6')
1644 # may not have been created, so we look for the outermost directory
1645 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001646 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001647 path = os.path.dirname(path)
1648
1649 os.removedirs(path)
1650
Andrew Svetlov405faed2012-12-25 12:18:09 +02001651
R David Murrayf2ad1732014-12-25 18:36:56 -05001652@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1653class ChownFileTests(unittest.TestCase):
1654
Berker Peksag036a71b2015-07-21 09:29:48 +03001655 @classmethod
1656 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001657 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001658
1659 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001660 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001661 uid = stat.st_uid
1662 gid = stat.st_gid
1663 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001664 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1665 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1666 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1667 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001668
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001669 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1670 def test_chown_gid(self):
1671 groups = os.getgroups()
1672 if len(groups) < 2:
1673 self.skipTest("test needs at least 2 groups")
1674
R David Murrayf2ad1732014-12-25 18:36:56 -05001675 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001676 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001677
Hai Shi0c4f0f32020-06-30 21:46:31 +08001678 os.chown(os_helper.TESTFN, uid, gid_1)
1679 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001680 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001681
Hai Shi0c4f0f32020-06-30 21:46:31 +08001682 os.chown(os_helper.TESTFN, uid, gid_2)
1683 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001684 self.assertEqual(gid, gid_2)
1685
1686 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1687 "test needs root privilege and more than one user")
1688 def test_chown_with_root(self):
1689 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001690 gid = os.stat(os_helper.TESTFN).st_gid
1691 os.chown(os_helper.TESTFN, uid_1, gid)
1692 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001693 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001694 os.chown(os_helper.TESTFN, uid_2, gid)
1695 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001696 self.assertEqual(uid, uid_2)
1697
1698 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1699 "test needs non-root account and more than one user")
1700 def test_chown_without_permission(self):
1701 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001702 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001703 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001704 os.chown(os_helper.TESTFN, uid_1, gid)
1705 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001706
Berker Peksag036a71b2015-07-21 09:29:48 +03001707 @classmethod
1708 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001709 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001710
1711
Andrew Svetlov405faed2012-12-25 12:18:09 +02001712class RemoveDirsTests(unittest.TestCase):
1713 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001714 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001715
1716 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001717 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001718
1719 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001720 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001721 os.mkdir(dira)
1722 dirb = os.path.join(dira, 'dirb')
1723 os.mkdir(dirb)
1724 os.removedirs(dirb)
1725 self.assertFalse(os.path.exists(dirb))
1726 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001727 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001728
1729 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001730 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001731 os.mkdir(dira)
1732 dirb = os.path.join(dira, 'dirb')
1733 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001734 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001735 os.removedirs(dirb)
1736 self.assertFalse(os.path.exists(dirb))
1737 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001738 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001739
1740 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001741 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001742 os.mkdir(dira)
1743 dirb = os.path.join(dira, 'dirb')
1744 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001745 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001746 with self.assertRaises(OSError):
1747 os.removedirs(dirb)
1748 self.assertTrue(os.path.exists(dirb))
1749 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001750 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001751
1752
Guido van Rossume7ba4952007-06-06 23:52:48 +00001753class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001754 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001755 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001756 f.write(b'hello')
1757 f.close()
1758 with open(os.devnull, 'rb') as f:
1759 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001760
Andrew Svetlov405faed2012-12-25 12:18:09 +02001761
Guido van Rossume7ba4952007-06-06 23:52:48 +00001762class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001763 def test_urandom_length(self):
1764 self.assertEqual(len(os.urandom(0)), 0)
1765 self.assertEqual(len(os.urandom(1)), 1)
1766 self.assertEqual(len(os.urandom(10)), 10)
1767 self.assertEqual(len(os.urandom(100)), 100)
1768 self.assertEqual(len(os.urandom(1000)), 1000)
1769
1770 def test_urandom_value(self):
1771 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001772 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001773 data2 = os.urandom(16)
1774 self.assertNotEqual(data1, data2)
1775
1776 def get_urandom_subprocess(self, count):
1777 code = '\n'.join((
1778 'import os, sys',
1779 'data = os.urandom(%s)' % count,
1780 'sys.stdout.buffer.write(data)',
1781 'sys.stdout.buffer.flush()'))
1782 out = assert_python_ok('-c', code)
1783 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001784 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001785 return stdout
1786
1787 def test_urandom_subprocess(self):
1788 data1 = self.get_urandom_subprocess(16)
1789 data2 = self.get_urandom_subprocess(16)
1790 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001791
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001792
Victor Stinner9b1f4742016-09-06 16:18:52 -07001793@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1794class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001795 @classmethod
1796 def setUpClass(cls):
1797 try:
1798 os.getrandom(1)
1799 except OSError as exc:
1800 if exc.errno == errno.ENOSYS:
1801 # Python compiled on a more recent Linux version
1802 # than the current Linux kernel
1803 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1804 else:
1805 raise
1806
Victor Stinner9b1f4742016-09-06 16:18:52 -07001807 def test_getrandom_type(self):
1808 data = os.getrandom(16)
1809 self.assertIsInstance(data, bytes)
1810 self.assertEqual(len(data), 16)
1811
1812 def test_getrandom0(self):
1813 empty = os.getrandom(0)
1814 self.assertEqual(empty, b'')
1815
1816 def test_getrandom_random(self):
1817 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1818
1819 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1820 # resource /dev/random
1821
1822 def test_getrandom_nonblock(self):
1823 # The call must not fail. Check also that the flag exists
1824 try:
1825 os.getrandom(1, os.GRND_NONBLOCK)
1826 except BlockingIOError:
1827 # System urandom is not initialized yet
1828 pass
1829
1830 def test_getrandom_value(self):
1831 data1 = os.getrandom(16)
1832 data2 = os.getrandom(16)
1833 self.assertNotEqual(data1, data2)
1834
1835
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001836# os.urandom() doesn't use a file descriptor when it is implemented with the
1837# getentropy() function, the getrandom() function or the getrandom() syscall
1838OS_URANDOM_DONT_USE_FD = (
1839 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1840 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1841 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001842
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001843@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1844 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001845@unittest.skipIf(sys.platform == "vxworks",
1846 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001847class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001848 @unittest.skipUnless(resource, "test requires the resource module")
1849 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001850 # Check urandom() failing when it is not able to open /dev/random.
1851 # We spawn a new process to make the test more robust (if getrlimit()
1852 # failed to restore the file descriptor limit after this, the whole
1853 # test suite would crash; this actually happened on the OS X Tiger
1854 # buildbot).
1855 code = """if 1:
1856 import errno
1857 import os
1858 import resource
1859
1860 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1861 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1862 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001863 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001864 except OSError as e:
1865 assert e.errno == errno.EMFILE, e.errno
1866 else:
1867 raise AssertionError("OSError not raised")
1868 """
1869 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001870
Antoine Pitroue472aea2014-04-26 14:33:03 +02001871 def test_urandom_fd_closed(self):
1872 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1873 # closed.
1874 code = """if 1:
1875 import os
1876 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001877 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001878 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001879 with test.support.SuppressCrashReport():
1880 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001881 sys.stdout.buffer.write(os.urandom(4))
1882 """
1883 rc, out, err = assert_python_ok('-Sc', code)
1884
1885 def test_urandom_fd_reopened(self):
1886 # Issue #21207: urandom() should detect its fd to /dev/urandom
1887 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001888 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1889 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001890
Antoine Pitroue472aea2014-04-26 14:33:03 +02001891 code = """if 1:
1892 import os
1893 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001894 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001895 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001896 with test.support.SuppressCrashReport():
1897 for fd in range(3, 256):
1898 try:
1899 os.close(fd)
1900 except OSError:
1901 pass
1902 else:
1903 # Found the urandom fd (XXX hopefully)
1904 break
1905 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001906 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001907 new_fd = f.fileno()
1908 # Issue #26935: posix allows new_fd and fd to be equal but
1909 # some libc implementations have dup2 return an error in this
1910 # case.
1911 if new_fd != fd:
1912 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001913 sys.stdout.buffer.write(os.urandom(4))
1914 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001915 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001916 rc, out, err = assert_python_ok('-Sc', code)
1917 self.assertEqual(len(out), 8)
1918 self.assertNotEqual(out[0:4], out[4:8])
1919 rc, out2, err2 = assert_python_ok('-Sc', code)
1920 self.assertEqual(len(out2), 8)
1921 self.assertNotEqual(out2, out)
1922
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001923
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001924@contextlib.contextmanager
1925def _execvpe_mockup(defpath=None):
1926 """
1927 Stubs out execv and execve functions when used as context manager.
1928 Records exec calls. The mock execv and execve functions always raise an
1929 exception as they would normally never return.
1930 """
1931 # A list of tuples containing (function name, first arg, args)
1932 # of calls to execv or execve that have been made.
1933 calls = []
1934
1935 def mock_execv(name, *args):
1936 calls.append(('execv', name, args))
1937 raise RuntimeError("execv called")
1938
1939 def mock_execve(name, *args):
1940 calls.append(('execve', name, args))
1941 raise OSError(errno.ENOTDIR, "execve called")
1942
1943 try:
1944 orig_execv = os.execv
1945 orig_execve = os.execve
1946 orig_defpath = os.defpath
1947 os.execv = mock_execv
1948 os.execve = mock_execve
1949 if defpath is not None:
1950 os.defpath = defpath
1951 yield calls
1952 finally:
1953 os.execv = orig_execv
1954 os.execve = orig_execve
1955 os.defpath = orig_defpath
1956
pxinwrf2d7ac72019-05-21 18:46:37 +08001957@unittest.skipUnless(hasattr(os, 'execv'),
1958 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001959class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001960 @unittest.skipIf(USING_LINUXTHREADS,
1961 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001962 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001963 self.assertRaises(OSError, os.execvpe, 'no such app-',
1964 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001965
Steve Dowerbce26262016-11-19 19:17:26 -08001966 def test_execv_with_bad_arglist(self):
1967 self.assertRaises(ValueError, os.execv, 'notepad', ())
1968 self.assertRaises(ValueError, os.execv, 'notepad', [])
1969 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1970 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1971
Thomas Heller6790d602007-08-30 17:15:14 +00001972 def test_execvpe_with_bad_arglist(self):
1973 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001974 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1975 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001976
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001977 @unittest.skipUnless(hasattr(os, '_execvpe'),
1978 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001979 def _test_internal_execvpe(self, test_type):
1980 program_path = os.sep + 'absolutepath'
1981 if test_type is bytes:
1982 program = b'executable'
1983 fullpath = os.path.join(os.fsencode(program_path), program)
1984 native_fullpath = fullpath
1985 arguments = [b'progname', 'arg1', 'arg2']
1986 else:
1987 program = 'executable'
1988 arguments = ['progname', 'arg1', 'arg2']
1989 fullpath = os.path.join(program_path, program)
1990 if os.name != "nt":
1991 native_fullpath = os.fsencode(fullpath)
1992 else:
1993 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001994 env = {'spam': 'beans'}
1995
Victor Stinnerb745a742010-05-18 17:17:23 +00001996 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001997 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001998 self.assertRaises(RuntimeError,
1999 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002000 self.assertEqual(len(calls), 1)
2001 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
2002
Victor Stinnerb745a742010-05-18 17:17:23 +00002003 # test os._execvpe() with a relative path:
2004 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002005 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00002006 self.assertRaises(OSError,
2007 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002008 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00002009 self.assertSequenceEqual(calls[0],
2010 ('execve', native_fullpath, (arguments, env)))
2011
2012 # test os._execvpe() with a relative path:
2013 # os.get_exec_path() reads the 'PATH' variable
2014 with _execvpe_mockup() as calls:
2015 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00002016 if test_type is bytes:
2017 env_path[b'PATH'] = program_path
2018 else:
2019 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00002020 self.assertRaises(OSError,
2021 os._execvpe, program, arguments, env=env_path)
2022 self.assertEqual(len(calls), 1)
2023 self.assertSequenceEqual(calls[0],
2024 ('execve', native_fullpath, (arguments, env_path)))
2025
2026 def test_internal_execvpe_str(self):
2027 self._test_internal_execvpe(str)
2028 if os.name != "nt":
2029 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002030
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002031 def test_execve_invalid_env(self):
2032 args = [sys.executable, '-c', 'pass']
2033
Ville Skyttä49b27342017-08-03 09:00:59 +03002034 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002035 newenv = os.environ.copy()
2036 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2037 with self.assertRaises(ValueError):
2038 os.execve(args[0], args, newenv)
2039
Ville Skyttä49b27342017-08-03 09:00:59 +03002040 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002041 newenv = os.environ.copy()
2042 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2043 with self.assertRaises(ValueError):
2044 os.execve(args[0], args, newenv)
2045
Ville Skyttä49b27342017-08-03 09:00:59 +03002046 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002047 newenv = os.environ.copy()
2048 newenv["FRUIT=ORANGE"] = "lemon"
2049 with self.assertRaises(ValueError):
2050 os.execve(args[0], args, newenv)
2051
Alexey Izbyshev83460312018-10-20 03:28:22 +03002052 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2053 def test_execve_with_empty_path(self):
2054 # bpo-32890: Check GetLastError() misuse
2055 try:
2056 os.execve('', ['arg'], {})
2057 except OSError as e:
2058 self.assertTrue(e.winerror is None or e.winerror != 0)
2059 else:
2060 self.fail('No OSError raised')
2061
Gregory P. Smith4ae37772010-05-08 18:05:46 +00002062
Serhiy Storchaka43767632013-11-03 21:31:38 +02002063@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002064class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01002065 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01002066 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002067 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01002068 except FileNotFoundError:
2069 exists = False
2070 except OSError as exc:
2071 exists = True
2072 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08002073 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01002074 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002075 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01002076
Thomas Wouters477c8d52006-05-27 19:21:47 +00002077 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002078 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002079
2080 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002081 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002082
2083 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002084 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002085
2086 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002087 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002088
Hai Shi0c4f0f32020-06-30 21:46:31 +08002089 with open(os_helper.TESTFN, "x") as f:
2090 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002091
2092 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002093 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002094
Thomas Wouters477c8d52006-05-27 19:21:47 +00002095 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002096 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002097
Victor Stinnere77c9742016-03-25 10:28:23 +01002098
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002099class TestInvalidFD(unittest.TestCase):
Inada Naokia6925652021-04-29 11:35:36 +09002100 singles = ["fchdir", "dup", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002101 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2102 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07002103 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002104 def get_single(f):
2105 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00002106 if hasattr(os, f):
2107 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002108 return helper
2109 for f in singles:
2110 locals()["test_"+f] = get_single(f)
2111
Inada Naokia6925652021-04-29 11:35:36 +09002112 def check(self, f, *args, **kwargs):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002113 try:
Inada Naokia6925652021-04-29 11:35:36 +09002114 f(os_helper.make_bad_fd(), *args, **kwargs)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002115 except OSError as e:
2116 self.assertEqual(e.errno, errno.EBADF)
2117 else:
Martin Panter7462b6492015-11-02 03:37:02 +00002118 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002119 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00002120
Inada Naokia6925652021-04-29 11:35:36 +09002121 def test_fdopen(self):
2122 self.check(os.fdopen, encoding="utf-8")
2123
Serhiy Storchaka43767632013-11-03 21:31:38 +02002124 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002125 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002126 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002127
Serhiy Storchaka43767632013-11-03 21:31:38 +02002128 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002129 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002130 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002131 # Make sure none of the descriptors we are about to close are
2132 # currently valid (issue 6542).
2133 for i in range(10):
2134 try: os.fstat(fd+i)
2135 except OSError:
2136 pass
2137 else:
2138 break
2139 if i < 2:
2140 raise unittest.SkipTest(
2141 "Unable to acquire a range of invalid file descriptors")
2142 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002143
Serhiy Storchaka43767632013-11-03 21:31:38 +02002144 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002145 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002146 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002147
Serhiy Storchaka43767632013-11-03 21:31:38 +02002148 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002149 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002150 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002151
Serhiy Storchaka43767632013-11-03 21:31:38 +02002152 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002153 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002154 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002155
Serhiy Storchaka43767632013-11-03 21:31:38 +02002156 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002157 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002158 self.check(os.pathconf, "PC_NAME_MAX")
2159 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002160
Serhiy Storchaka43767632013-11-03 21:31:38 +02002161 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002162 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002163 self.check(os.truncate, 0)
2164 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002165
Serhiy Storchaka43767632013-11-03 21:31:38 +02002166 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002167 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002168 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002169
Serhiy Storchaka43767632013-11-03 21:31:38 +02002170 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002171 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002172 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002173
Victor Stinner57ddf782014-01-08 15:21:28 +01002174 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2175 def test_readv(self):
2176 buf = bytearray(10)
2177 self.check(os.readv, [buf])
2178
Serhiy Storchaka43767632013-11-03 21:31:38 +02002179 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002180 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002181 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002182
Serhiy Storchaka43767632013-11-03 21:31:38 +02002183 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002184 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002185 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002186
Victor Stinner57ddf782014-01-08 15:21:28 +01002187 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2188 def test_writev(self):
2189 self.check(os.writev, [b'abc'])
2190
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002191 def test_inheritable(self):
2192 self.check(os.get_inheritable)
2193 self.check(os.set_inheritable, True)
2194
2195 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2196 'needs os.get_blocking() and os.set_blocking()')
2197 def test_blocking(self):
2198 self.check(os.get_blocking)
2199 self.check(os.set_blocking, True)
2200
Brian Curtin1b9df392010-11-24 20:24:31 +00002201
2202class LinkTests(unittest.TestCase):
2203 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002204 self.file1 = os_helper.TESTFN
2205 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002206
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002207 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002208 for file in (self.file1, self.file2):
2209 if os.path.exists(file):
2210 os.unlink(file)
2211
Brian Curtin1b9df392010-11-24 20:24:31 +00002212 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002213 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002214
xdegaye6a55d092017-11-12 17:57:04 +01002215 try:
2216 os.link(file1, file2)
2217 except PermissionError as e:
2218 self.skipTest('os.link(): %s' % e)
Inada Naokia6925652021-04-29 11:35:36 +09002219 with open(file1, "rb") as f1, open(file2, "rb") as f2:
Brian Curtin1b9df392010-11-24 20:24:31 +00002220 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2221
2222 def test_link(self):
2223 self._test_link(self.file1, self.file2)
2224
2225 def test_link_bytes(self):
2226 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2227 bytes(self.file2, sys.getfilesystemencoding()))
2228
Brian Curtinf498b752010-11-30 15:54:04 +00002229 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002230 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002231 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002232 except UnicodeError:
2233 raise unittest.SkipTest("Unable to encode for this platform.")
2234
Brian Curtinf498b752010-11-30 15:54:04 +00002235 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002236 self.file2 = self.file1 + "2"
2237 self._test_link(self.file1, self.file2)
2238
Serhiy Storchaka43767632013-11-03 21:31:38 +02002239@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2240class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002241 # uid_t and gid_t are 32-bit unsigned integers on Linux
2242 UID_OVERFLOW = (1 << 32)
2243 GID_OVERFLOW = (1 << 32)
2244
Serhiy Storchaka43767632013-11-03 21:31:38 +02002245 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2246 def test_setuid(self):
2247 if os.getuid() != 0:
2248 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002249 self.assertRaises(TypeError, os.setuid, 'not an int')
2250 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002251
Serhiy Storchaka43767632013-11-03 21:31:38 +02002252 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2253 def test_setgid(self):
2254 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2255 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002256 self.assertRaises(TypeError, os.setgid, 'not an int')
2257 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002258
Serhiy Storchaka43767632013-11-03 21:31:38 +02002259 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2260 def test_seteuid(self):
2261 if os.getuid() != 0:
2262 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002263 self.assertRaises(TypeError, os.setegid, 'not an int')
2264 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002265
Serhiy Storchaka43767632013-11-03 21:31:38 +02002266 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2267 def test_setegid(self):
2268 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2269 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002270 self.assertRaises(TypeError, os.setegid, 'not an int')
2271 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002272
Serhiy Storchaka43767632013-11-03 21:31:38 +02002273 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2274 def test_setreuid(self):
2275 if os.getuid() != 0:
2276 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002277 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2278 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2279 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2280 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002281
Serhiy Storchaka43767632013-11-03 21:31:38 +02002282 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2283 def test_setreuid_neg1(self):
2284 # Needs to accept -1. We run this in a subprocess to avoid
2285 # altering the test runner's process state (issue8045).
2286 subprocess.check_call([
2287 sys.executable, '-c',
2288 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002289
Serhiy Storchaka43767632013-11-03 21:31:38 +02002290 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2291 def test_setregid(self):
2292 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2293 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002294 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2295 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2296 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2297 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002298
Serhiy Storchaka43767632013-11-03 21:31:38 +02002299 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2300 def test_setregid_neg1(self):
2301 # Needs to accept -1. We run this in a subprocess to avoid
2302 # altering the test runner's process state (issue8045).
2303 subprocess.check_call([
2304 sys.executable, '-c',
2305 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002306
Serhiy Storchaka43767632013-11-03 21:31:38 +02002307@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2308class Pep383Tests(unittest.TestCase):
2309 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002310 if os_helper.TESTFN_UNENCODABLE:
2311 self.dir = os_helper.TESTFN_UNENCODABLE
2312 elif os_helper.TESTFN_NONASCII:
2313 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002314 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002315 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002316 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002317
Serhiy Storchaka43767632013-11-03 21:31:38 +02002318 bytesfn = []
2319 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002320 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002321 fn = os.fsencode(fn)
2322 except UnicodeEncodeError:
2323 return
2324 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002325 add_filename(os_helper.TESTFN_UNICODE)
2326 if os_helper.TESTFN_UNENCODABLE:
2327 add_filename(os_helper.TESTFN_UNENCODABLE)
2328 if os_helper.TESTFN_NONASCII:
2329 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002330 if not bytesfn:
2331 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002332
Serhiy Storchaka43767632013-11-03 21:31:38 +02002333 self.unicodefn = set()
2334 os.mkdir(self.dir)
2335 try:
2336 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002337 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002338 fn = os.fsdecode(fn)
2339 if fn in self.unicodefn:
2340 raise ValueError("duplicate filename")
2341 self.unicodefn.add(fn)
2342 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002343 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002344 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002345
Serhiy Storchaka43767632013-11-03 21:31:38 +02002346 def tearDown(self):
2347 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002348
Serhiy Storchaka43767632013-11-03 21:31:38 +02002349 def test_listdir(self):
2350 expected = self.unicodefn
2351 found = set(os.listdir(self.dir))
2352 self.assertEqual(found, expected)
2353 # test listdir without arguments
2354 current_directory = os.getcwd()
2355 try:
2356 os.chdir(os.sep)
2357 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2358 finally:
2359 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002360
Serhiy Storchaka43767632013-11-03 21:31:38 +02002361 def test_open(self):
2362 for fn in self.unicodefn:
2363 f = open(os.path.join(self.dir, fn), 'rb')
2364 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002365
Serhiy Storchaka43767632013-11-03 21:31:38 +02002366 @unittest.skipUnless(hasattr(os, 'statvfs'),
2367 "need os.statvfs()")
2368 def test_statvfs(self):
2369 # issue #9645
2370 for fn in self.unicodefn:
2371 # should not fail with file not found error
2372 fullname = os.path.join(self.dir, fn)
2373 os.statvfs(fullname)
2374
2375 def test_stat(self):
2376 for fn in self.unicodefn:
2377 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002378
Brian Curtineb24d742010-04-12 17:16:38 +00002379@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2380class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002381 def _kill(self, sig):
2382 # Start sys.executable as a subprocess and communicate from the
2383 # subprocess to the parent that the interpreter is ready. When it
2384 # becomes ready, send *sig* via os.kill to the subprocess and check
2385 # that the return code is equal to *sig*.
2386 import ctypes
2387 from ctypes import wintypes
2388 import msvcrt
2389
2390 # Since we can't access the contents of the process' stdout until the
2391 # process has exited, use PeekNamedPipe to see what's inside stdout
2392 # without waiting. This is done so we can tell that the interpreter
2393 # is started and running at a point where it could handle a signal.
2394 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2395 PeekNamedPipe.restype = wintypes.BOOL
2396 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2397 ctypes.POINTER(ctypes.c_char), # stdout buf
2398 wintypes.DWORD, # Buffer size
2399 ctypes.POINTER(wintypes.DWORD), # bytes read
2400 ctypes.POINTER(wintypes.DWORD), # bytes avail
2401 ctypes.POINTER(wintypes.DWORD)) # bytes left
2402 msg = "running"
2403 proc = subprocess.Popen([sys.executable, "-c",
2404 "import sys;"
2405 "sys.stdout.write('{}');"
2406 "sys.stdout.flush();"
2407 "input()".format(msg)],
2408 stdout=subprocess.PIPE,
2409 stderr=subprocess.PIPE,
2410 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002411 self.addCleanup(proc.stdout.close)
2412 self.addCleanup(proc.stderr.close)
2413 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002414
2415 count, max = 0, 100
2416 while count < max and proc.poll() is None:
2417 # Create a string buffer to store the result of stdout from the pipe
2418 buf = ctypes.create_string_buffer(len(msg))
2419 # Obtain the text currently in proc.stdout
2420 # Bytes read/avail/left are left as NULL and unused
2421 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2422 buf, ctypes.sizeof(buf), None, None, None)
2423 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2424 if buf.value:
2425 self.assertEqual(msg, buf.value.decode())
2426 break
2427 time.sleep(0.1)
2428 count += 1
2429 else:
2430 self.fail("Did not receive communication from the subprocess")
2431
Brian Curtineb24d742010-04-12 17:16:38 +00002432 os.kill(proc.pid, sig)
2433 self.assertEqual(proc.wait(), sig)
2434
2435 def test_kill_sigterm(self):
2436 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002437 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002438
2439 def test_kill_int(self):
2440 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002441 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002442
2443 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002444 tagname = "test_os_%s" % uuid.uuid1()
2445 m = mmap.mmap(-1, 1, tagname)
2446 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002447 # Run a script which has console control handling enabled.
2448 proc = subprocess.Popen([sys.executable,
2449 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002450 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002451 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2452 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002453 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002454 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002455 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002456 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002457 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002458 count += 1
2459 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002460 # Forcefully kill the process if we weren't able to signal it.
2461 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002462 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002463 os.kill(proc.pid, event)
2464 # proc.send_signal(event) could also be done here.
2465 # Allow time for the signal to be passed and the process to exit.
2466 time.sleep(0.5)
2467 if not proc.poll():
2468 # Forcefully kill the process if we weren't able to signal it.
2469 os.kill(proc.pid, signal.SIGINT)
2470 self.fail("subprocess did not stop on {}".format(name))
2471
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002472 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002473 def test_CTRL_C_EVENT(self):
2474 from ctypes import wintypes
2475 import ctypes
2476
2477 # Make a NULL value by creating a pointer with no argument.
2478 NULL = ctypes.POINTER(ctypes.c_int)()
2479 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2480 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2481 wintypes.BOOL)
2482 SetConsoleCtrlHandler.restype = wintypes.BOOL
2483
2484 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002485 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002486 # by subprocesses.
2487 SetConsoleCtrlHandler(NULL, 0)
2488
2489 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2490
2491 def test_CTRL_BREAK_EVENT(self):
2492 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2493
2494
Brian Curtind40e6f72010-07-08 21:39:08 +00002495@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002496class Win32ListdirTests(unittest.TestCase):
2497 """Test listdir on Windows."""
2498
2499 def setUp(self):
2500 self.created_paths = []
2501 for i in range(2):
2502 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002503 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002504 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002505 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002506 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002507 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002508 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2509 self.created_paths.extend([dir_name, file_name])
2510 self.created_paths.sort()
2511
2512 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002513 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002514
2515 def test_listdir_no_extended_path(self):
2516 """Test when the path is not an "extended" path."""
2517 # unicode
2518 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002519 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002520 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002521
Tim Golden781bbeb2013-10-25 20:24:06 +01002522 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002523 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002524 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002525 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002526
2527 def test_listdir_extended_path(self):
2528 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002529 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002530 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002531 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002532 self.assertEqual(
2533 sorted(os.listdir(path)),
2534 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002535
Tim Golden781bbeb2013-10-25 20:24:06 +01002536 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002537 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002538 self.assertEqual(
2539 sorted(os.listdir(path)),
2540 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002541
2542
Berker Peksage0b5b202018-08-15 13:03:41 +03002543@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2544class ReadlinkTests(unittest.TestCase):
2545 filelink = 'readlinktest'
2546 filelink_target = os.path.abspath(__file__)
2547 filelinkb = os.fsencode(filelink)
2548 filelinkb_target = os.fsencode(filelink_target)
2549
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002550 def assertPathEqual(self, left, right):
2551 left = os.path.normcase(left)
2552 right = os.path.normcase(right)
2553 if sys.platform == 'win32':
2554 # Bad practice to blindly strip the prefix as it may be required to
2555 # correctly refer to the file, but we're only comparing paths here.
2556 has_prefix = lambda p: p.startswith(
2557 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2558 if has_prefix(left):
2559 left = left[4:]
2560 if has_prefix(right):
2561 right = right[4:]
2562 self.assertEqual(left, right)
2563
Berker Peksage0b5b202018-08-15 13:03:41 +03002564 def setUp(self):
2565 self.assertTrue(os.path.exists(self.filelink_target))
2566 self.assertTrue(os.path.exists(self.filelinkb_target))
2567 self.assertFalse(os.path.exists(self.filelink))
2568 self.assertFalse(os.path.exists(self.filelinkb))
2569
2570 def test_not_symlink(self):
2571 filelink_target = FakePath(self.filelink_target)
2572 self.assertRaises(OSError, os.readlink, self.filelink_target)
2573 self.assertRaises(OSError, os.readlink, filelink_target)
2574
2575 def test_missing_link(self):
2576 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2577 self.assertRaises(FileNotFoundError, os.readlink,
2578 FakePath('missing-link'))
2579
Hai Shi0c4f0f32020-06-30 21:46:31 +08002580 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002581 def test_pathlike(self):
2582 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002583 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002584 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002585 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002586
Hai Shi0c4f0f32020-06-30 21:46:31 +08002587 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002588 def test_pathlike_bytes(self):
2589 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002590 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002591 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002592 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002593 self.assertIsInstance(path, bytes)
2594
Hai Shi0c4f0f32020-06-30 21:46:31 +08002595 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002596 def test_bytes(self):
2597 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002598 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002599 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002600 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002601 self.assertIsInstance(path, bytes)
2602
2603
Tim Golden781bbeb2013-10-25 20:24:06 +01002604@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002605@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002606class Win32SymlinkTests(unittest.TestCase):
2607 filelink = 'filelinktest'
2608 filelink_target = os.path.abspath(__file__)
2609 dirlink = 'dirlinktest'
2610 dirlink_target = os.path.dirname(filelink_target)
2611 missing_link = 'missing link'
2612
2613 def setUp(self):
2614 assert os.path.exists(self.dirlink_target)
2615 assert os.path.exists(self.filelink_target)
2616 assert not os.path.exists(self.dirlink)
2617 assert not os.path.exists(self.filelink)
2618 assert not os.path.exists(self.missing_link)
2619
2620 def tearDown(self):
2621 if os.path.exists(self.filelink):
2622 os.remove(self.filelink)
2623 if os.path.exists(self.dirlink):
2624 os.rmdir(self.dirlink)
2625 if os.path.lexists(self.missing_link):
2626 os.remove(self.missing_link)
2627
2628 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002629 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002630 self.assertTrue(os.path.exists(self.dirlink))
2631 self.assertTrue(os.path.isdir(self.dirlink))
2632 self.assertTrue(os.path.islink(self.dirlink))
2633 self.check_stat(self.dirlink, self.dirlink_target)
2634
2635 def test_file_link(self):
2636 os.symlink(self.filelink_target, self.filelink)
2637 self.assertTrue(os.path.exists(self.filelink))
2638 self.assertTrue(os.path.isfile(self.filelink))
2639 self.assertTrue(os.path.islink(self.filelink))
2640 self.check_stat(self.filelink, self.filelink_target)
2641
2642 def _create_missing_dir_link(self):
2643 'Create a "directory" link to a non-existent target'
2644 linkname = self.missing_link
2645 if os.path.lexists(linkname):
2646 os.remove(linkname)
2647 target = r'c:\\target does not exist.29r3c740'
2648 assert not os.path.exists(target)
2649 target_is_dir = True
2650 os.symlink(target, linkname, target_is_dir)
2651
2652 def test_remove_directory_link_to_missing_target(self):
2653 self._create_missing_dir_link()
2654 # For compatibility with Unix, os.remove will check the
2655 # directory status and call RemoveDirectory if the symlink
2656 # was created with target_is_dir==True.
2657 os.remove(self.missing_link)
2658
Brian Curtind40e6f72010-07-08 21:39:08 +00002659 def test_isdir_on_directory_link_to_missing_target(self):
2660 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002661 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002662
Brian Curtind40e6f72010-07-08 21:39:08 +00002663 def test_rmdir_on_directory_link_to_missing_target(self):
2664 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002665 os.rmdir(self.missing_link)
2666
2667 def check_stat(self, link, target):
2668 self.assertEqual(os.stat(link), os.stat(target))
2669 self.assertNotEqual(os.lstat(link), os.stat(link))
2670
Brian Curtind25aef52011-06-13 15:16:04 -05002671 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002672 self.assertEqual(os.stat(bytes_link), os.stat(target))
2673 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002674
2675 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002676 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002677 level2 = os.path.join(level1, "level2")
2678 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002679 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002680
2681 os.mkdir(level1)
2682 os.mkdir(level2)
2683 os.mkdir(level3)
2684
2685 file1 = os.path.abspath(os.path.join(level1, "file1"))
2686 create_file(file1)
2687
2688 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002689 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002690 os.chdir(level2)
2691 link = os.path.join(level2, "link")
2692 os.symlink(os.path.relpath(file1), "link")
2693 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002694
Victor Stinnerae39d232016-03-24 17:12:55 +01002695 # Check os.stat calls from the same dir as the link
2696 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002697
Victor Stinnerae39d232016-03-24 17:12:55 +01002698 # Check os.stat calls from a dir below the link
2699 os.chdir(level1)
2700 self.assertEqual(os.stat(file1),
2701 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002702
Victor Stinnerae39d232016-03-24 17:12:55 +01002703 # Check os.stat calls from a dir above the link
2704 os.chdir(level3)
2705 self.assertEqual(os.stat(file1),
2706 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002707 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002708 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002709
SSE43c34aad2018-02-13 00:10:35 +07002710 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2711 and os.path.exists(r'C:\ProgramData'),
2712 'Test directories not found')
2713 def test_29248(self):
2714 # os.symlink() calls CreateSymbolicLink, which creates
2715 # the reparse data buffer with the print name stored
2716 # first, so the offset is always 0. CreateSymbolicLink
2717 # stores the "PrintName" DOS path (e.g. "C:\") first,
2718 # with an offset of 0, followed by the "SubstituteName"
2719 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2720 # the other hand, seems to have been created manually
2721 # with an inverted order.
2722 target = os.readlink(r'C:\Users\All Users')
2723 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2724
Steve Dower6921e732018-03-05 14:26:08 -08002725 def test_buffer_overflow(self):
2726 # Older versions would have a buffer overflow when detecting
2727 # whether a link source was a directory. This test ensures we
2728 # no longer crash, but does not otherwise validate the behavior
2729 segment = 'X' * 27
2730 path = os.path.join(*[segment] * 10)
2731 test_cases = [
2732 # overflow with absolute src
2733 ('\\' + path, segment),
2734 # overflow dest with relative src
2735 (segment, path),
2736 # overflow when joining src
2737 (path[:180], path[:180]),
2738 ]
2739 for src, dest in test_cases:
2740 try:
2741 os.symlink(src, dest)
2742 except FileNotFoundError:
2743 pass
2744 else:
2745 try:
2746 os.remove(dest)
2747 except OSError:
2748 pass
2749 # Also test with bytes, since that is a separate code path.
2750 try:
2751 os.symlink(os.fsencode(src), os.fsencode(dest))
2752 except FileNotFoundError:
2753 pass
2754 else:
2755 try:
2756 os.remove(dest)
2757 except OSError:
2758 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002759
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002760 def test_appexeclink(self):
2761 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002762 if not os.path.isdir(root):
2763 self.skipTest("test requires a WindowsApps directory")
2764
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002765 aliases = [os.path.join(root, a)
2766 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2767
2768 for alias in aliases:
2769 if support.verbose:
2770 print()
2771 print("Testing with", alias)
2772 st = os.lstat(alias)
2773 self.assertEqual(st, os.stat(alias))
2774 self.assertFalse(stat.S_ISLNK(st.st_mode))
2775 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2776 # testing the first one we see is sufficient
2777 break
2778 else:
2779 self.skipTest("test requires an app execution alias")
2780
Tim Golden0321cf22014-05-05 19:46:17 +01002781@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2782class Win32JunctionTests(unittest.TestCase):
2783 junction = 'junctiontest'
2784 junction_target = os.path.dirname(os.path.abspath(__file__))
2785
2786 def setUp(self):
2787 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002788 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002789
2790 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002791 if os.path.lexists(self.junction):
2792 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002793
2794 def test_create_junction(self):
2795 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002796 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002797 self.assertTrue(os.path.exists(self.junction))
2798 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002799 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2800 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002801
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002802 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002803 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002804 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2805 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002806
2807 def test_unlink_removes_junction(self):
2808 _winapi.CreateJunction(self.junction_target, self.junction)
2809 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002810 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002811
2812 os.unlink(self.junction)
2813 self.assertFalse(os.path.exists(self.junction))
2814
Mark Becwarb82bfac2019-02-02 16:08:23 -05002815@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2816class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002817 def test_getfinalpathname_handles(self):
Hai Shid94af3f2020-08-08 17:32:41 +08002818 nt = import_helper.import_module('nt')
2819 ctypes = import_helper.import_module('ctypes')
Berker Peksag6ef726a2019-04-22 18:46:28 +03002820 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002821
2822 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2823 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2824
2825 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2826 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2827 ctypes.wintypes.LPDWORD)
2828
2829 # This is a pseudo-handle that doesn't need to be closed
2830 hproc = kernel.GetCurrentProcess()
2831
2832 handle_count = ctypes.wintypes.DWORD()
2833 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2834 self.assertEqual(1, ok)
2835
2836 before_count = handle_count.value
2837
2838 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002839 filenames = [
2840 r'\\?\C:',
2841 r'\\?\NUL',
2842 r'\\?\CONIN',
2843 __file__,
2844 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002845
Berker Peksag6ef726a2019-04-22 18:46:28 +03002846 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002847 for name in filenames:
2848 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002849 nt._getfinalpathname(name)
2850 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002851 # Failure is expected
2852 pass
2853 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002854 os.stat(name)
2855 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002856 pass
2857
2858 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2859 self.assertEqual(1, ok)
2860
2861 handle_delta = handle_count.value - before_count
2862
2863 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002864
Hai Shi0c4f0f32020-06-30 21:46:31 +08002865@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002866class NonLocalSymlinkTests(unittest.TestCase):
2867
2868 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002869 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002870 Create this structure:
2871
2872 base
2873 \___ some_dir
2874 """
2875 os.makedirs('base/some_dir')
2876
2877 def tearDown(self):
2878 shutil.rmtree('base')
2879
2880 def test_directory_link_nonlocal(self):
2881 """
2882 The symlink target should resolve relative to the link, not relative
2883 to the current directory.
2884
2885 Then, link base/some_link -> base/some_dir and ensure that some_link
2886 is resolved as a directory.
2887
2888 In issue13772, it was discovered that directory detection failed if
2889 the symlink target was not specified relative to the current
2890 directory, which was a defect in the implementation.
2891 """
2892 src = os.path.join('base', 'some_link')
2893 os.symlink('some_dir', src)
2894 assert os.path.isdir(src)
2895
2896
Victor Stinnere8d51452010-08-19 01:05:19 +00002897class FSEncodingTests(unittest.TestCase):
2898 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002899 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2900 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002901
Victor Stinnere8d51452010-08-19 01:05:19 +00002902 def test_identity(self):
2903 # assert fsdecode(fsencode(x)) == x
2904 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2905 try:
2906 bytesfn = os.fsencode(fn)
2907 except UnicodeEncodeError:
2908 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002909 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002910
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002911
Brett Cannonefb00c02012-02-29 18:31:31 -05002912
2913class DeviceEncodingTests(unittest.TestCase):
2914
2915 def test_bad_fd(self):
2916 # Return None when an fd doesn't actually exist.
2917 self.assertIsNone(os.device_encoding(123456))
2918
Paul Monson62dfd7d2019-04-25 11:36:45 -07002919 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002920 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002921 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002922 def test_device_encoding(self):
2923 encoding = os.device_encoding(0)
2924 self.assertIsNotNone(encoding)
2925 self.assertTrue(codecs.lookup(encoding))
2926
2927
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002928class PidTests(unittest.TestCase):
2929 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2930 def test_getppid(self):
2931 p = subprocess.Popen([sys.executable, '-c',
2932 'import os; print(os.getppid())'],
2933 stdout=subprocess.PIPE)
2934 stdout, _ = p.communicate()
2935 # We are the parent of our subprocess
2936 self.assertEqual(int(stdout), os.getpid())
2937
Victor Stinner9bee32b2020-04-22 16:30:35 +02002938 def check_waitpid(self, code, exitcode, callback=None):
2939 if sys.platform == 'win32':
2940 # On Windows, os.spawnv() simply joins arguments with spaces:
2941 # arguments need to be quoted
2942 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2943 else:
2944 args = [sys.executable, '-c', code]
2945 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002946
Victor Stinner9bee32b2020-04-22 16:30:35 +02002947 if callback is not None:
2948 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002949
Victor Stinner9bee32b2020-04-22 16:30:35 +02002950 # don't use support.wait_process() to test directly os.waitpid()
2951 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002952 pid2, status = os.waitpid(pid, 0)
2953 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2954 self.assertEqual(pid2, pid)
2955
Victor Stinner9bee32b2020-04-22 16:30:35 +02002956 def test_waitpid(self):
2957 self.check_waitpid(code='pass', exitcode=0)
2958
2959 def test_waitstatus_to_exitcode(self):
2960 exitcode = 23
2961 code = f'import sys; sys.exit({exitcode})'
2962 self.check_waitpid(code, exitcode=exitcode)
2963
2964 with self.assertRaises(TypeError):
2965 os.waitstatus_to_exitcode(0.0)
2966
2967 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2968 def test_waitpid_windows(self):
2969 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2970 # with exit code larger than INT_MAX.
2971 STATUS_CONTROL_C_EXIT = 0xC000013A
2972 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2973 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2974
2975 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2976 def test_waitstatus_to_exitcode_windows(self):
2977 max_exitcode = 2 ** 32 - 1
2978 for exitcode in (0, 1, 5, max_exitcode):
2979 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2980 exitcode)
2981
2982 # invalid values
2983 with self.assertRaises(ValueError):
2984 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2985 with self.assertRaises(OverflowError):
2986 os.waitstatus_to_exitcode(-1)
2987
Victor Stinner65a796e2020-04-01 18:49:29 +02002988 # Skip the test on Windows
2989 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2990 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002991 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002992 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002993
Victor Stinner9bee32b2020-04-22 16:30:35 +02002994 def kill_process(pid):
2995 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002996
Victor Stinner9bee32b2020-04-22 16:30:35 +02002997 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002998
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002999
Victor Stinner4659ccf2016-09-14 10:57:00 +02003000class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03003001 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02003002 self.exitcode = 17
3003
Hai Shi0c4f0f32020-06-30 21:46:31 +08003004 filename = os_helper.TESTFN
3005 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003006
3007 if not with_env:
3008 code = 'import sys; sys.exit(%s)' % self.exitcode
3009 else:
3010 self.env = dict(os.environ)
3011 # create an unique key
3012 self.key = str(uuid.uuid4())
3013 self.env[self.key] = self.key
3014 # read the variable from os.environ to check that it exists
3015 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
3016 % (self.key, self.exitcode))
3017
Inada Naokia6925652021-04-29 11:35:36 +09003018 with open(filename, "w", encoding="utf-8") as fp:
Victor Stinner4659ccf2016-09-14 10:57:00 +02003019 fp.write(code)
3020
Berker Peksag81816462016-09-15 20:19:47 +03003021 args = [sys.executable, filename]
3022 if use_bytes:
3023 args = [os.fsencode(a) for a in args]
3024 self.env = {os.fsencode(k): os.fsencode(v)
3025 for k, v in self.env.items()}
3026
3027 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02003028
Berker Peksag4af23d72016-09-15 20:32:44 +03003029 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003030 def test_spawnl(self):
3031 args = self.create_args()
3032 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
3033 self.assertEqual(exitcode, self.exitcode)
3034
Berker Peksag4af23d72016-09-15 20:32:44 +03003035 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003036 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003037 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003038 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
3039 self.assertEqual(exitcode, self.exitcode)
3040
Berker Peksag4af23d72016-09-15 20:32:44 +03003041 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003042 def test_spawnlp(self):
3043 args = self.create_args()
3044 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
3045 self.assertEqual(exitcode, self.exitcode)
3046
Berker Peksag4af23d72016-09-15 20:32:44 +03003047 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003048 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003049 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003050 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
3051 self.assertEqual(exitcode, self.exitcode)
3052
Berker Peksag4af23d72016-09-15 20:32:44 +03003053 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003054 def test_spawnv(self):
3055 args = self.create_args()
3056 exitcode = os.spawnv(os.P_WAIT, args[0], args)
3057 self.assertEqual(exitcode, self.exitcode)
3058
Victor Stinner9bee32b2020-04-22 16:30:35 +02003059 # Test for PyUnicode_FSConverter()
3060 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
3061 self.assertEqual(exitcode, self.exitcode)
3062
Berker Peksag4af23d72016-09-15 20:32:44 +03003063 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003064 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003065 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003066 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3067 self.assertEqual(exitcode, self.exitcode)
3068
Berker Peksag4af23d72016-09-15 20:32:44 +03003069 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003070 def test_spawnvp(self):
3071 args = self.create_args()
3072 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
3073 self.assertEqual(exitcode, self.exitcode)
3074
Berker Peksag4af23d72016-09-15 20:32:44 +03003075 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003076 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003077 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003078 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
3079 self.assertEqual(exitcode, self.exitcode)
3080
Berker Peksag4af23d72016-09-15 20:32:44 +03003081 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003082 def test_nowait(self):
3083 args = self.create_args()
3084 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02003085 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003086
Berker Peksag4af23d72016-09-15 20:32:44 +03003087 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03003088 def test_spawnve_bytes(self):
3089 # Test bytes handling in parse_arglist and parse_envlist (#28114)
3090 args = self.create_args(with_env=True, use_bytes=True)
3091 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3092 self.assertEqual(exitcode, self.exitcode)
3093
Steve Dower859fd7b2016-11-19 18:53:19 -08003094 @requires_os_func('spawnl')
3095 def test_spawnl_noargs(self):
3096 args = self.create_args()
3097 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08003098 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08003099
3100 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08003101 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003102 args = self.create_args()
3103 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003104 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08003105
3106 @requires_os_func('spawnv')
3107 def test_spawnv_noargs(self):
3108 args = self.create_args()
3109 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
3110 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08003111 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
3112 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08003113
3114 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08003115 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003116 args = self.create_args()
3117 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
3118 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003119 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
3120 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02003121
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003122 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03003123 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003124
Ville Skyttä49b27342017-08-03 09:00:59 +03003125 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003126 newenv = os.environ.copy()
3127 newenv["FRUIT\0VEGETABLE"] = "cabbage"
3128 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003129 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003130 except ValueError:
3131 pass
3132 else:
3133 self.assertEqual(exitcode, 127)
3134
Ville Skyttä49b27342017-08-03 09:00:59 +03003135 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003136 newenv = os.environ.copy()
3137 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3138 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003139 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003140 except ValueError:
3141 pass
3142 else:
3143 self.assertEqual(exitcode, 127)
3144
Ville Skyttä49b27342017-08-03 09:00:59 +03003145 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003146 newenv = os.environ.copy()
3147 newenv["FRUIT=ORANGE"] = "lemon"
3148 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003149 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003150 except ValueError:
3151 pass
3152 else:
3153 self.assertEqual(exitcode, 127)
3154
Ville Skyttä49b27342017-08-03 09:00:59 +03003155 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003156 filename = os_helper.TESTFN
3157 self.addCleanup(os_helper.unlink, filename)
Inada Naokia6925652021-04-29 11:35:36 +09003158 with open(filename, "w", encoding="utf-8") as fp:
Serhiy Storchaka77703942017-06-25 07:33:01 +03003159 fp.write('import sys, os\n'
3160 'if os.getenv("FRUIT") != "orange=lemon":\n'
3161 ' raise AssertionError')
3162 args = [sys.executable, filename]
3163 newenv = os.environ.copy()
3164 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003165 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003166 self.assertEqual(exitcode, 0)
3167
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003168 @requires_os_func('spawnve')
3169 def test_spawnve_invalid_env(self):
3170 self._test_invalid_env(os.spawnve)
3171
3172 @requires_os_func('spawnvpe')
3173 def test_spawnvpe_invalid_env(self):
3174 self._test_invalid_env(os.spawnvpe)
3175
Serhiy Storchaka77703942017-06-25 07:33:01 +03003176
Brian Curtin0151b8e2010-09-24 13:43:43 +00003177# The introduction of this TestCase caused at least two different errors on
3178# *nix buildbots. Temporarily skip this to let the buildbots move along.
3179@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003180@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3181class LoginTests(unittest.TestCase):
3182 def test_getlogin(self):
3183 user_name = os.getlogin()
3184 self.assertNotEqual(len(user_name), 0)
3185
3186
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003187@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3188 "needs os.getpriority and os.setpriority")
3189class ProgramPriorityTests(unittest.TestCase):
3190 """Tests for os.getpriority() and os.setpriority()."""
3191
3192 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003193
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003194 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3195 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3196 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003197 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3198 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003199 raise unittest.SkipTest("unable to reliably test setpriority "
3200 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003201 else:
3202 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003203 finally:
3204 try:
3205 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3206 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003207 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003208 raise
3209
3210
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003211class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003212
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003213 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003214
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003215 def __init__(self, conn):
3216 asynchat.async_chat.__init__(self, conn)
3217 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003218 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 self.closed = False
3220 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003221
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 def handle_read(self):
3223 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003224 if self.accumulate:
3225 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003226
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003227 def get_data(self):
3228 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003229
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003230 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003231 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003232 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003233
3234 def handle_error(self):
3235 raise
3236
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003237 def __init__(self, address):
3238 threading.Thread.__init__(self)
3239 asyncore.dispatcher.__init__(self)
3240 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3241 self.bind(address)
3242 self.listen(5)
3243 self.host, self.port = self.socket.getsockname()[:2]
3244 self.handler_instance = None
3245 self._active = False
3246 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003247
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003248 # --- public API
3249
3250 @property
3251 def running(self):
3252 return self._active
3253
3254 def start(self):
3255 assert not self.running
3256 self.__flag = threading.Event()
3257 threading.Thread.start(self)
3258 self.__flag.wait()
3259
3260 def stop(self):
3261 assert self.running
3262 self._active = False
3263 self.join()
3264
3265 def wait(self):
3266 # wait for handler connection to be closed, then stop the server
3267 while not getattr(self.handler_instance, "closed", False):
3268 time.sleep(0.001)
3269 self.stop()
3270
3271 # --- internals
3272
3273 def run(self):
3274 self._active = True
3275 self.__flag.set()
3276 while self._active and asyncore.socket_map:
3277 self._active_lock.acquire()
3278 asyncore.loop(timeout=0.001, count=1)
3279 self._active_lock.release()
3280 asyncore.close_all()
3281
3282 def handle_accept(self):
3283 conn, addr = self.accept()
3284 self.handler_instance = self.Handler(conn)
3285
3286 def handle_connect(self):
3287 self.close()
3288 handle_read = handle_connect
3289
3290 def writable(self):
3291 return 0
3292
3293 def handle_error(self):
3294 raise
3295
3296
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003297@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3298class TestSendfile(unittest.TestCase):
3299
Victor Stinner8c663fd2017-11-08 14:44:44 -08003300 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003301 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003302 not sys.platform.startswith("solaris") and \
3303 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003304 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3305 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003306 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3307 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003308
3309 @classmethod
3310 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003311 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003312 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003313
3314 @classmethod
3315 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003316 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003317 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003318
3319 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003320 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003321 self.server.start()
3322 self.client = socket.socket()
3323 self.client.connect((self.server.host, self.server.port))
3324 self.client.settimeout(1)
3325 # synchronize by waiting for "220 ready" response
3326 self.client.recv(1024)
3327 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003328 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003329 self.fileno = self.file.fileno()
3330
3331 def tearDown(self):
3332 self.file.close()
3333 self.client.close()
3334 if self.server.running:
3335 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003336 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003337
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003338 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003339 """A higher level wrapper representing how an application is
3340 supposed to use sendfile().
3341 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003342 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003343 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003344 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003345 except OSError as err:
3346 if err.errno == errno.ECONNRESET:
3347 # disconnected
3348 raise
3349 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3350 # we have to retry send data
3351 continue
3352 else:
3353 raise
3354
3355 def test_send_whole_file(self):
3356 # normal send
3357 total_sent = 0
3358 offset = 0
3359 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003360 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003361 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3362 if sent == 0:
3363 break
3364 offset += sent
3365 total_sent += sent
3366 self.assertTrue(sent <= nbytes)
3367 self.assertEqual(offset, total_sent)
3368
3369 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003370 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003371 self.client.close()
3372 self.server.wait()
3373 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003374 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003375 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003376
3377 def test_send_at_certain_offset(self):
3378 # start sending a file at a certain offset
3379 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003380 offset = len(self.DATA) // 2
3381 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003382 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003383 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003384 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3385 if sent == 0:
3386 break
3387 offset += sent
3388 total_sent += sent
3389 self.assertTrue(sent <= nbytes)
3390
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003391 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003392 self.client.close()
3393 self.server.wait()
3394 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003395 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003396 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003397 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003398 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003399
3400 def test_offset_overflow(self):
3401 # specify an offset > file size
3402 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003403 try:
3404 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3405 except OSError as e:
3406 # Solaris can raise EINVAL if offset >= file length, ignore.
3407 if e.errno != errno.EINVAL:
3408 raise
3409 else:
3410 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003411 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003412 self.client.close()
3413 self.server.wait()
3414 data = self.server.handler_instance.get_data()
3415 self.assertEqual(data, b'')
3416
3417 def test_invalid_offset(self):
3418 with self.assertRaises(OSError) as cm:
3419 os.sendfile(self.sockno, self.fileno, -1, 4096)
3420 self.assertEqual(cm.exception.errno, errno.EINVAL)
3421
Martin Panterbf19d162015-09-09 01:01:13 +00003422 def test_keywords(self):
3423 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003424 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3425 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003426 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003427 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3428 offset=0, count=4096,
3429 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003430
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003431 # --- headers / trailers tests
3432
Serhiy Storchaka43767632013-11-03 21:31:38 +02003433 @requires_headers_trailers
3434 def test_headers(self):
3435 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003436 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003437 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003438 headers=[b"x" * 512, b"y" * 256])
3439 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003440 total_sent += sent
3441 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003442 while total_sent < len(expected_data):
3443 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003444 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3445 offset, nbytes)
3446 if sent == 0:
3447 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003448 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003449 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003450 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003451
Serhiy Storchaka43767632013-11-03 21:31:38 +02003452 self.assertEqual(total_sent, len(expected_data))
3453 self.client.close()
3454 self.server.wait()
3455 data = self.server.handler_instance.get_data()
3456 self.assertEqual(hash(data), hash(expected_data))
3457
3458 @requires_headers_trailers
3459 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003460 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003461 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003462
Hai Shi0c4f0f32020-06-30 21:46:31 +08003463 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003464 create_file(TESTFN2, file_data)
3465
3466 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003467 os.sendfile(self.sockno, f.fileno(), 0, 5,
3468 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003469 self.client.close()
3470 self.server.wait()
3471 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003472 self.assertEqual(data, b"abcde123456789")
3473
3474 @requires_headers_trailers
3475 @requires_32b
3476 def test_headers_overflow_32bits(self):
3477 self.server.handler_instance.accumulate = False
3478 with self.assertRaises(OSError) as cm:
3479 os.sendfile(self.sockno, self.fileno, 0, 0,
3480 headers=[b"x" * 2**16] * 2**15)
3481 self.assertEqual(cm.exception.errno, errno.EINVAL)
3482
3483 @requires_headers_trailers
3484 @requires_32b
3485 def test_trailers_overflow_32bits(self):
3486 self.server.handler_instance.accumulate = False
3487 with self.assertRaises(OSError) as cm:
3488 os.sendfile(self.sockno, self.fileno, 0, 0,
3489 trailers=[b"x" * 2**16] * 2**15)
3490 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003491
Serhiy Storchaka43767632013-11-03 21:31:38 +02003492 @requires_headers_trailers
3493 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3494 'test needs os.SF_NODISKIO')
3495 def test_flags(self):
3496 try:
3497 os.sendfile(self.sockno, self.fileno, 0, 4096,
3498 flags=os.SF_NODISKIO)
3499 except OSError as err:
3500 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3501 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003502
3503
Larry Hastings9cf065c2012-06-22 16:30:09 -07003504def supports_extended_attributes():
3505 if not hasattr(os, "setxattr"):
3506 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003507
Larry Hastings9cf065c2012-06-22 16:30:09 -07003508 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003509 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003510 try:
3511 os.setxattr(fp.fileno(), b"user.test", b"")
3512 except OSError:
3513 return False
3514 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003515 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003516
3517 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003518
3519
3520@unittest.skipUnless(supports_extended_attributes(),
3521 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003522# Kernels < 2.6.39 don't respect setxattr flags.
3523@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003524class ExtendedAttributeTests(unittest.TestCase):
3525
Larry Hastings9cf065c2012-06-22 16:30:09 -07003526 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003527 fn = os_helper.TESTFN
3528 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003529 create_file(fn)
3530
Benjamin Peterson799bd802011-08-31 22:15:17 -04003531 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003532 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003533 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003534
Victor Stinnerf12e5062011-10-16 22:12:03 +02003535 init_xattr = listxattr(fn)
3536 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003537
Larry Hastings9cf065c2012-06-22 16:30:09 -07003538 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003539 xattr = set(init_xattr)
3540 xattr.add("user.test")
3541 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003542 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3543 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3544 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003545
Benjamin Peterson799bd802011-08-31 22:15:17 -04003546 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003547 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003548 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003549
Benjamin Peterson799bd802011-08-31 22:15:17 -04003550 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003551 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003552 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003553
Larry Hastings9cf065c2012-06-22 16:30:09 -07003554 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003555 xattr.add("user.test2")
3556 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003557 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003558
Benjamin Peterson799bd802011-08-31 22:15:17 -04003559 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003560 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003561 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003562
Victor Stinnerf12e5062011-10-16 22:12:03 +02003563 xattr.remove("user.test")
3564 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003565 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3566 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3567 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3568 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003569 many = sorted("user.test{}".format(i) for i in range(100))
3570 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003571 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003572 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003573
Larry Hastings9cf065c2012-06-22 16:30:09 -07003574 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003575 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003576 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003577
3578 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003579 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003580
3581 def test_simple(self):
3582 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3583 os.listxattr)
3584
3585 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003586 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3587 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003588
3589 def test_fds(self):
3590 def getxattr(path, *args):
3591 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003592 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003593 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003594 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003595 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003596 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003597 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003598 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003599 def listxattr(path, *args):
3600 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003601 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003602 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3603
3604
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003605@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3606class TermsizeTests(unittest.TestCase):
3607 def test_does_not_crash(self):
3608 """Check if get_terminal_size() returns a meaningful value.
3609
3610 There's no easy portable way to actually check the size of the
3611 terminal, so let's check if it returns something sensible instead.
3612 """
3613 try:
3614 size = os.get_terminal_size()
3615 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003616 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003617 # Under win32 a generic OSError can be thrown if the
3618 # handle cannot be retrieved
3619 self.skipTest("failed to query terminal size")
3620 raise
3621
Antoine Pitroucfade362012-02-08 23:48:59 +01003622 self.assertGreaterEqual(size.columns, 0)
3623 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003624
3625 def test_stty_match(self):
3626 """Check if stty returns the same results
3627
3628 stty actually tests stdin, so get_terminal_size is invoked on
3629 stdin explicitly. If stty succeeded, then get_terminal_size()
3630 should work too.
3631 """
3632 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003633 size = (
3634 subprocess.check_output(
3635 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3636 ).split()
3637 )
xdegaye6a55d092017-11-12 17:57:04 +01003638 except (FileNotFoundError, subprocess.CalledProcessError,
3639 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003640 self.skipTest("stty invocation failed")
3641 expected = (int(size[1]), int(size[0])) # reversed order
3642
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003643 try:
3644 actual = os.get_terminal_size(sys.__stdin__.fileno())
3645 except OSError as e:
3646 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3647 # Under win32 a generic OSError can be thrown if the
3648 # handle cannot be retrieved
3649 self.skipTest("failed to query terminal size")
3650 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003651 self.assertEqual(expected, actual)
3652
3653
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003654@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003655@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003656class MemfdCreateTests(unittest.TestCase):
3657 def test_memfd_create(self):
3658 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3659 self.assertNotEqual(fd, -1)
3660 self.addCleanup(os.close, fd)
3661 self.assertFalse(os.get_inheritable(fd))
3662 with open(fd, "wb", closefd=False) as f:
3663 f.write(b'memfd_create')
3664 self.assertEqual(f.tell(), 12)
3665
3666 fd2 = os.memfd_create("Hi")
3667 self.addCleanup(os.close, fd2)
3668 self.assertFalse(os.get_inheritable(fd2))
3669
3670
Christian Heimescd9fed62020-11-13 19:48:52 +01003671@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3672@support.requires_linux_version(2, 6, 30)
3673class EventfdTests(unittest.TestCase):
3674 def test_eventfd_initval(self):
3675 def pack(value):
3676 """Pack as native uint64_t
3677 """
3678 return struct.pack("@Q", value)
3679 size = 8 # read/write 8 bytes
3680 initval = 42
3681 fd = os.eventfd(initval)
3682 self.assertNotEqual(fd, -1)
3683 self.addCleanup(os.close, fd)
3684 self.assertFalse(os.get_inheritable(fd))
3685
3686 # test with raw read/write
3687 res = os.read(fd, size)
3688 self.assertEqual(res, pack(initval))
3689
3690 os.write(fd, pack(23))
3691 res = os.read(fd, size)
3692 self.assertEqual(res, pack(23))
3693
3694 os.write(fd, pack(40))
3695 os.write(fd, pack(2))
3696 res = os.read(fd, size)
3697 self.assertEqual(res, pack(42))
3698
3699 # test with eventfd_read/eventfd_write
3700 os.eventfd_write(fd, 20)
3701 os.eventfd_write(fd, 3)
3702 res = os.eventfd_read(fd)
3703 self.assertEqual(res, 23)
3704
3705 def test_eventfd_semaphore(self):
3706 initval = 2
3707 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3708 fd = os.eventfd(initval, flags)
3709 self.assertNotEqual(fd, -1)
3710 self.addCleanup(os.close, fd)
3711
3712 # semaphore starts has initval 2, two reads return '1'
3713 res = os.eventfd_read(fd)
3714 self.assertEqual(res, 1)
3715 res = os.eventfd_read(fd)
3716 self.assertEqual(res, 1)
3717 # third read would block
3718 with self.assertRaises(BlockingIOError):
3719 os.eventfd_read(fd)
3720 with self.assertRaises(BlockingIOError):
3721 os.read(fd, 8)
3722
3723 # increase semaphore counter, read one
3724 os.eventfd_write(fd, 1)
3725 res = os.eventfd_read(fd)
3726 self.assertEqual(res, 1)
3727 # next read would block, too
3728 with self.assertRaises(BlockingIOError):
3729 os.eventfd_read(fd)
3730
3731 def test_eventfd_select(self):
3732 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3733 fd = os.eventfd(0, flags)
3734 self.assertNotEqual(fd, -1)
3735 self.addCleanup(os.close, fd)
3736
3737 # counter is zero, only writeable
3738 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3739 self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3740
3741 # counter is non-zero, read and writeable
3742 os.eventfd_write(fd, 23)
3743 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3744 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3745 self.assertEqual(os.eventfd_read(fd), 23)
3746
3747 # counter at max, only readable
3748 os.eventfd_write(fd, (2**64) - 2)
3749 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3750 self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3751 os.eventfd_read(fd)
3752
3753
Victor Stinner292c8352012-10-30 02:17:38 +01003754class OSErrorTests(unittest.TestCase):
3755 def setUp(self):
3756 class Str(str):
3757 pass
3758
Victor Stinnerafe17062012-10-31 22:47:43 +01003759 self.bytes_filenames = []
3760 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003761 if os_helper.TESTFN_UNENCODABLE is not None:
3762 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003763 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003764 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003765 self.unicode_filenames.append(decoded)
3766 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003767 if os_helper.TESTFN_UNDECODABLE is not None:
3768 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003769 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003770 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003771 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003772 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003773 self.bytes_filenames.append(memoryview(encoded))
3774
3775 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003776
3777 def test_oserror_filename(self):
3778 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003779 (self.filenames, os.chdir,),
3780 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003781 (self.filenames, os.lstat,),
3782 (self.filenames, os.open, os.O_RDONLY),
3783 (self.filenames, os.rmdir,),
3784 (self.filenames, os.stat,),
3785 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003786 ]
3787 if sys.platform == "win32":
3788 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003789 (self.bytes_filenames, os.rename, b"dst"),
3790 (self.bytes_filenames, os.replace, b"dst"),
3791 (self.unicode_filenames, os.rename, "dst"),
3792 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003793 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003794 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003795 else:
3796 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003797 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003798 (self.filenames, os.rename, "dst"),
3799 (self.filenames, os.replace, "dst"),
3800 ))
3801 if hasattr(os, "chown"):
3802 funcs.append((self.filenames, os.chown, 0, 0))
3803 if hasattr(os, "lchown"):
3804 funcs.append((self.filenames, os.lchown, 0, 0))
3805 if hasattr(os, "truncate"):
3806 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003807 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003808 funcs.append((self.filenames, os.chflags, 0))
3809 if hasattr(os, "lchflags"):
3810 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003811 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003812 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003813 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003814 if sys.platform == "win32":
3815 funcs.append((self.bytes_filenames, os.link, b"dst"))
3816 funcs.append((self.unicode_filenames, os.link, "dst"))
3817 else:
3818 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003819 if hasattr(os, "listxattr"):
3820 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003821 (self.filenames, os.listxattr,),
3822 (self.filenames, os.getxattr, "user.test"),
3823 (self.filenames, os.setxattr, "user.test", b'user'),
3824 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003825 ))
3826 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003827 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003828 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003829 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003830
Steve Dowercc16be82016-09-08 10:35:16 -07003831
Victor Stinnerafe17062012-10-31 22:47:43 +01003832 for filenames, func, *func_args in funcs:
3833 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003834 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003835 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003836 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003837 else:
3838 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3839 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003840 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003841 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003842 except UnicodeDecodeError:
3843 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003844 else:
3845 self.fail("No exception thrown by {}".format(func))
3846
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003847class CPUCountTests(unittest.TestCase):
3848 def test_cpu_count(self):
3849 cpus = os.cpu_count()
3850 if cpus is not None:
3851 self.assertIsInstance(cpus, int)
3852 self.assertGreater(cpus, 0)
3853 else:
3854 self.skipTest("Could not determine the number of CPUs")
3855
Victor Stinnerdaf45552013-08-28 00:53:59 +02003856
3857class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003858 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003859 fd = os.open(__file__, os.O_RDONLY)
3860 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003861 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003862
Victor Stinnerdaf45552013-08-28 00:53:59 +02003863 os.set_inheritable(fd, True)
3864 self.assertEqual(os.get_inheritable(fd), True)
3865
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003866 @unittest.skipIf(fcntl is None, "need fcntl")
3867 def test_get_inheritable_cloexec(self):
3868 fd = os.open(__file__, os.O_RDONLY)
3869 self.addCleanup(os.close, fd)
3870 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003871
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003872 # clear FD_CLOEXEC flag
3873 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3874 flags &= ~fcntl.FD_CLOEXEC
3875 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003876
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003877 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003878
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003879 @unittest.skipIf(fcntl is None, "need fcntl")
3880 def test_set_inheritable_cloexec(self):
3881 fd = os.open(__file__, os.O_RDONLY)
3882 self.addCleanup(os.close, fd)
3883 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3884 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003885
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003886 os.set_inheritable(fd, True)
3887 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3888 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003889
cptpcrd7dc71c42021-01-20 09:05:51 -05003890 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3891 def test_get_set_inheritable_o_path(self):
3892 fd = os.open(__file__, os.O_PATH)
3893 self.addCleanup(os.close, fd)
3894 self.assertEqual(os.get_inheritable(fd), False)
3895
3896 os.set_inheritable(fd, True)
3897 self.assertEqual(os.get_inheritable(fd), True)
3898
3899 os.set_inheritable(fd, False)
3900 self.assertEqual(os.get_inheritable(fd), False)
3901
3902 def test_get_set_inheritable_badf(self):
3903 fd = os_helper.make_bad_fd()
3904
3905 with self.assertRaises(OSError) as ctx:
3906 os.get_inheritable(fd)
3907 self.assertEqual(ctx.exception.errno, errno.EBADF)
3908
3909 with self.assertRaises(OSError) as ctx:
3910 os.set_inheritable(fd, True)
3911 self.assertEqual(ctx.exception.errno, errno.EBADF)
3912
3913 with self.assertRaises(OSError) as ctx:
3914 os.set_inheritable(fd, False)
3915 self.assertEqual(ctx.exception.errno, errno.EBADF)
3916
Victor Stinnerdaf45552013-08-28 00:53:59 +02003917 def test_open(self):
3918 fd = os.open(__file__, os.O_RDONLY)
3919 self.addCleanup(os.close, fd)
3920 self.assertEqual(os.get_inheritable(fd), False)
3921
3922 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3923 def test_pipe(self):
3924 rfd, wfd = os.pipe()
3925 self.addCleanup(os.close, rfd)
3926 self.addCleanup(os.close, wfd)
3927 self.assertEqual(os.get_inheritable(rfd), False)
3928 self.assertEqual(os.get_inheritable(wfd), False)
3929
3930 def test_dup(self):
3931 fd1 = os.open(__file__, os.O_RDONLY)
3932 self.addCleanup(os.close, fd1)
3933
3934 fd2 = os.dup(fd1)
3935 self.addCleanup(os.close, fd2)
3936 self.assertEqual(os.get_inheritable(fd2), False)
3937
Zackery Spytz5be66602019-08-23 12:38:41 -06003938 def test_dup_standard_stream(self):
3939 fd = os.dup(1)
3940 self.addCleanup(os.close, fd)
3941 self.assertGreater(fd, 0)
3942
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003943 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3944 def test_dup_nul(self):
3945 # os.dup() was creating inheritable fds for character files.
3946 fd1 = os.open('NUL', os.O_RDONLY)
3947 self.addCleanup(os.close, fd1)
3948 fd2 = os.dup(fd1)
3949 self.addCleanup(os.close, fd2)
3950 self.assertFalse(os.get_inheritable(fd2))
3951
Victor Stinnerdaf45552013-08-28 00:53:59 +02003952 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3953 def test_dup2(self):
3954 fd = os.open(__file__, os.O_RDONLY)
3955 self.addCleanup(os.close, fd)
3956
3957 # inheritable by default
3958 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003959 self.addCleanup(os.close, fd2)
3960 self.assertEqual(os.dup2(fd, fd2), fd2)
3961 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003962
3963 # force non-inheritable
3964 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003965 self.addCleanup(os.close, fd3)
3966 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3967 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003968
3969 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3970 def test_openpty(self):
3971 master_fd, slave_fd = os.openpty()
3972 self.addCleanup(os.close, master_fd)
3973 self.addCleanup(os.close, slave_fd)
3974 self.assertEqual(os.get_inheritable(master_fd), False)
3975 self.assertEqual(os.get_inheritable(slave_fd), False)
3976
3977
Brett Cannon3f9183b2016-08-26 14:44:48 -07003978class PathTConverterTests(unittest.TestCase):
3979 # tuples of (function name, allows fd arguments, additional arguments to
3980 # function, cleanup function)
3981 functions = [
3982 ('stat', True, (), None),
3983 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003984 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003985 ('chflags', False, (0,), None),
3986 ('lchflags', False, (0,), None),
3987 ('open', False, (0,), getattr(os, 'close', None)),
3988 ]
3989
3990 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003991 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003992 if os.name == 'nt':
3993 bytes_fspath = bytes_filename = None
3994 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003995 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003996 bytes_fspath = FakePath(bytes_filename)
3997 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003998 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003999 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07004000
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004001 int_fspath = FakePath(fd)
4002 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07004003
4004 for name, allow_fd, extra_args, cleanup_fn in self.functions:
4005 with self.subTest(name=name):
4006 try:
4007 fn = getattr(os, name)
4008 except AttributeError:
4009 continue
4010
Brett Cannon8f96a302016-08-26 19:30:11 -07004011 for path in (str_filename, bytes_filename, str_fspath,
4012 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07004013 if path is None:
4014 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07004015 with self.subTest(name=name, path=path):
4016 result = fn(path, *extra_args)
4017 if cleanup_fn is not None:
4018 cleanup_fn(result)
4019
4020 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004021 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07004022 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07004023
4024 if allow_fd:
4025 result = fn(fd, *extra_args) # should not fail
4026 if cleanup_fn is not None:
4027 cleanup_fn(result)
4028 else:
4029 with self.assertRaisesRegex(
4030 TypeError,
4031 'os.PathLike'):
4032 fn(fd, *extra_args)
4033
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004034 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004035 msg = r'__fspath__\(\) to return str or bytes, not %s'
4036 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004037 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004038 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004039 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02004040 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00004041 os.stat(FakePath(object()))
4042
Brett Cannon3f9183b2016-08-26 14:44:48 -07004043
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004044@unittest.skipUnless(hasattr(os, 'get_blocking'),
4045 'needs os.get_blocking() and os.set_blocking()')
4046class BlockingTests(unittest.TestCase):
4047 def test_blocking(self):
4048 fd = os.open(__file__, os.O_RDONLY)
4049 self.addCleanup(os.close, fd)
4050 self.assertEqual(os.get_blocking(fd), True)
4051
4052 os.set_blocking(fd, False)
4053 self.assertEqual(os.get_blocking(fd), False)
4054
4055 os.set_blocking(fd, True)
4056 self.assertEqual(os.get_blocking(fd), True)
4057
4058
Yury Selivanov97e2e062014-09-26 12:33:06 -04004059
4060class ExportsTests(unittest.TestCase):
4061 def test_os_all(self):
4062 self.assertIn('open', os.__all__)
4063 self.assertIn('walk', os.__all__)
4064
4065
Eddie Elizondob3966632019-11-05 07:16:14 -08004066class TestDirEntry(unittest.TestCase):
4067 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004068 self.path = os.path.realpath(os_helper.TESTFN)
4069 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08004070 os.mkdir(self.path)
4071
4072 def test_uninstantiable(self):
4073 self.assertRaises(TypeError, os.DirEntry)
4074
4075 def test_unpickable(self):
4076 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4077 entry = [entry for entry in os.scandir(self.path)].pop()
4078 self.assertIsInstance(entry, os.DirEntry)
4079 self.assertEqual(entry.name, "file.txt")
4080 import pickle
4081 self.assertRaises(TypeError, pickle.dumps, entry, filename)
4082
4083
Victor Stinner6036e442015-03-08 01:58:04 +01004084class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004085 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004086
Victor Stinner6036e442015-03-08 01:58:04 +01004087 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004088 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07004089 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08004090 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01004091 os.mkdir(self.path)
4092
4093 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07004094 path = self.bytes_path if isinstance(name, bytes) else self.path
4095 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01004096 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01004097 return filename
4098
4099 def get_entries(self, names):
4100 entries = dict((entry.name, entry)
4101 for entry in os.scandir(self.path))
4102 self.assertEqual(sorted(entries.keys()), names)
4103 return entries
4104
4105 def assert_stat_equal(self, stat1, stat2, skip_fields):
4106 if skip_fields:
4107 for attr in dir(stat1):
4108 if not attr.startswith("st_"):
4109 continue
4110 if attr in ("st_dev", "st_ino", "st_nlink"):
4111 continue
4112 self.assertEqual(getattr(stat1, attr),
4113 getattr(stat2, attr),
4114 (stat1, stat2, attr))
4115 else:
4116 self.assertEqual(stat1, stat2)
4117
Eddie Elizondob3966632019-11-05 07:16:14 -08004118 def test_uninstantiable(self):
4119 scandir_iter = os.scandir(self.path)
4120 self.assertRaises(TypeError, type(scandir_iter))
4121 scandir_iter.close()
4122
4123 def test_unpickable(self):
4124 filename = self.create_file("file.txt")
4125 scandir_iter = os.scandir(self.path)
4126 import pickle
4127 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4128 scandir_iter.close()
4129
Victor Stinner6036e442015-03-08 01:58:04 +01004130 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07004131 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01004132 self.assertEqual(entry.name, name)
4133 self.assertEqual(entry.path, os.path.join(self.path, name))
4134 self.assertEqual(entry.inode(),
4135 os.stat(entry.path, follow_symlinks=False).st_ino)
4136
4137 entry_stat = os.stat(entry.path)
4138 self.assertEqual(entry.is_dir(),
4139 stat.S_ISDIR(entry_stat.st_mode))
4140 self.assertEqual(entry.is_file(),
4141 stat.S_ISREG(entry_stat.st_mode))
4142 self.assertEqual(entry.is_symlink(),
4143 os.path.islink(entry.path))
4144
4145 entry_lstat = os.stat(entry.path, follow_symlinks=False)
4146 self.assertEqual(entry.is_dir(follow_symlinks=False),
4147 stat.S_ISDIR(entry_lstat.st_mode))
4148 self.assertEqual(entry.is_file(follow_symlinks=False),
4149 stat.S_ISREG(entry_lstat.st_mode))
4150
4151 self.assert_stat_equal(entry.stat(),
4152 entry_stat,
4153 os.name == 'nt' and not is_symlink)
4154 self.assert_stat_equal(entry.stat(follow_symlinks=False),
4155 entry_lstat,
4156 os.name == 'nt')
4157
4158 def test_attributes(self):
4159 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08004160 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01004161
4162 dirname = os.path.join(self.path, "dir")
4163 os.mkdir(dirname)
4164 filename = self.create_file("file.txt")
4165 if link:
xdegaye6a55d092017-11-12 17:57:04 +01004166 try:
4167 os.link(filename, os.path.join(self.path, "link_file.txt"))
4168 except PermissionError as e:
4169 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01004170 if symlink:
4171 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4172 target_is_directory=True)
4173 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4174
4175 names = ['dir', 'file.txt']
4176 if link:
4177 names.append('link_file.txt')
4178 if symlink:
4179 names.extend(('symlink_dir', 'symlink_file.txt'))
4180 entries = self.get_entries(names)
4181
4182 entry = entries['dir']
4183 self.check_entry(entry, 'dir', True, False, False)
4184
4185 entry = entries['file.txt']
4186 self.check_entry(entry, 'file.txt', False, True, False)
4187
4188 if link:
4189 entry = entries['link_file.txt']
4190 self.check_entry(entry, 'link_file.txt', False, True, False)
4191
4192 if symlink:
4193 entry = entries['symlink_dir']
4194 self.check_entry(entry, 'symlink_dir', True, False, True)
4195
4196 entry = entries['symlink_file.txt']
4197 self.check_entry(entry, 'symlink_file.txt', False, True, True)
4198
4199 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07004200 path = self.bytes_path if isinstance(name, bytes) else self.path
4201 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01004202 self.assertEqual(len(entries), 1)
4203
4204 entry = entries[0]
4205 self.assertEqual(entry.name, name)
4206 return entry
4207
Brett Cannon96881cd2016-06-10 14:37:21 -07004208 def create_file_entry(self, name='file.txt'):
4209 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01004210 return self.get_entry(os.path.basename(filename))
4211
4212 def test_current_directory(self):
4213 filename = self.create_file()
4214 old_dir = os.getcwd()
4215 try:
4216 os.chdir(self.path)
4217
4218 # call scandir() without parameter: it must list the content
4219 # of the current directory
4220 entries = dict((entry.name, entry) for entry in os.scandir())
4221 self.assertEqual(sorted(entries.keys()),
4222 [os.path.basename(filename)])
4223 finally:
4224 os.chdir(old_dir)
4225
4226 def test_repr(self):
4227 entry = self.create_file_entry()
4228 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4229
Brett Cannon96881cd2016-06-10 14:37:21 -07004230 def test_fspath_protocol(self):
4231 entry = self.create_file_entry()
4232 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4233
4234 def test_fspath_protocol_bytes(self):
4235 bytes_filename = os.fsencode('bytesfile.txt')
4236 bytes_entry = self.create_file_entry(name=bytes_filename)
4237 fspath = os.fspath(bytes_entry)
4238 self.assertIsInstance(fspath, bytes)
4239 self.assertEqual(fspath,
4240 os.path.join(os.fsencode(self.path),bytes_filename))
4241
Victor Stinner6036e442015-03-08 01:58:04 +01004242 def test_removed_dir(self):
4243 path = os.path.join(self.path, 'dir')
4244
4245 os.mkdir(path)
4246 entry = self.get_entry('dir')
4247 os.rmdir(path)
4248
4249 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4250 if os.name == 'nt':
4251 self.assertTrue(entry.is_dir())
4252 self.assertFalse(entry.is_file())
4253 self.assertFalse(entry.is_symlink())
4254 if os.name == 'nt':
4255 self.assertRaises(FileNotFoundError, entry.inode)
4256 # don't fail
4257 entry.stat()
4258 entry.stat(follow_symlinks=False)
4259 else:
4260 self.assertGreater(entry.inode(), 0)
4261 self.assertRaises(FileNotFoundError, entry.stat)
4262 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4263
4264 def test_removed_file(self):
4265 entry = self.create_file_entry()
4266 os.unlink(entry.path)
4267
4268 self.assertFalse(entry.is_dir())
4269 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4270 if os.name == 'nt':
4271 self.assertTrue(entry.is_file())
4272 self.assertFalse(entry.is_symlink())
4273 if os.name == 'nt':
4274 self.assertRaises(FileNotFoundError, entry.inode)
4275 # don't fail
4276 entry.stat()
4277 entry.stat(follow_symlinks=False)
4278 else:
4279 self.assertGreater(entry.inode(), 0)
4280 self.assertRaises(FileNotFoundError, entry.stat)
4281 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4282
4283 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004284 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004285 return self.skipTest('cannot create symbolic link')
4286
4287 filename = self.create_file("file.txt")
4288 os.symlink(filename,
4289 os.path.join(self.path, "symlink.txt"))
4290 entries = self.get_entries(['file.txt', 'symlink.txt'])
4291 entry = entries['symlink.txt']
4292 os.unlink(filename)
4293
4294 self.assertGreater(entry.inode(), 0)
4295 self.assertFalse(entry.is_dir())
4296 self.assertFalse(entry.is_file()) # broken symlink returns False
4297 self.assertFalse(entry.is_dir(follow_symlinks=False))
4298 self.assertFalse(entry.is_file(follow_symlinks=False))
4299 self.assertTrue(entry.is_symlink())
4300 self.assertRaises(FileNotFoundError, entry.stat)
4301 # don't fail
4302 entry.stat(follow_symlinks=False)
4303
4304 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004305 self.create_file("file.txt")
4306
4307 path_bytes = os.fsencode(self.path)
4308 entries = list(os.scandir(path_bytes))
4309 self.assertEqual(len(entries), 1, entries)
4310 entry = entries[0]
4311
4312 self.assertEqual(entry.name, b'file.txt')
4313 self.assertEqual(entry.path,
4314 os.fsencode(os.path.join(self.path, 'file.txt')))
4315
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004316 def test_bytes_like(self):
4317 self.create_file("file.txt")
4318
4319 for cls in bytearray, memoryview:
4320 path_bytes = cls(os.fsencode(self.path))
4321 with self.assertWarns(DeprecationWarning):
4322 entries = list(os.scandir(path_bytes))
4323 self.assertEqual(len(entries), 1, entries)
4324 entry = entries[0]
4325
4326 self.assertEqual(entry.name, b'file.txt')
4327 self.assertEqual(entry.path,
4328 os.fsencode(os.path.join(self.path, 'file.txt')))
4329 self.assertIs(type(entry.name), bytes)
4330 self.assertIs(type(entry.path), bytes)
4331
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004332 @unittest.skipUnless(os.listdir in os.supports_fd,
4333 'fd support for listdir required for this test.')
4334 def test_fd(self):
4335 self.assertIn(os.scandir, os.supports_fd)
4336 self.create_file('file.txt')
4337 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004338 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004339 os.symlink('file.txt', os.path.join(self.path, 'link'))
4340 expected_names.append('link')
4341
4342 fd = os.open(self.path, os.O_RDONLY)
4343 try:
4344 with os.scandir(fd) as it:
4345 entries = list(it)
4346 names = [entry.name for entry in entries]
4347 self.assertEqual(sorted(names), expected_names)
4348 self.assertEqual(names, os.listdir(fd))
4349 for entry in entries:
4350 self.assertEqual(entry.path, entry.name)
4351 self.assertEqual(os.fspath(entry), entry.name)
4352 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4353 if os.stat in os.supports_dir_fd:
4354 st = os.stat(entry.name, dir_fd=fd)
4355 self.assertEqual(entry.stat(), st)
4356 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4357 self.assertEqual(entry.stat(follow_symlinks=False), st)
4358 finally:
4359 os.close(fd)
4360
Victor Stinner6036e442015-03-08 01:58:04 +01004361 def test_empty_path(self):
4362 self.assertRaises(FileNotFoundError, os.scandir, '')
4363
4364 def test_consume_iterator_twice(self):
4365 self.create_file("file.txt")
4366 iterator = os.scandir(self.path)
4367
4368 entries = list(iterator)
4369 self.assertEqual(len(entries), 1, entries)
4370
4371 # check than consuming the iterator twice doesn't raise exception
4372 entries2 = list(iterator)
4373 self.assertEqual(len(entries2), 0, entries2)
4374
4375 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004376 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004377 self.assertRaises(TypeError, os.scandir, obj)
4378
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004379 def test_close(self):
4380 self.create_file("file.txt")
4381 self.create_file("file2.txt")
4382 iterator = os.scandir(self.path)
4383 next(iterator)
4384 iterator.close()
4385 # multiple closes
4386 iterator.close()
4387 with self.check_no_resource_warning():
4388 del iterator
4389
4390 def test_context_manager(self):
4391 self.create_file("file.txt")
4392 self.create_file("file2.txt")
4393 with os.scandir(self.path) as iterator:
4394 next(iterator)
4395 with self.check_no_resource_warning():
4396 del iterator
4397
4398 def test_context_manager_close(self):
4399 self.create_file("file.txt")
4400 self.create_file("file2.txt")
4401 with os.scandir(self.path) as iterator:
4402 next(iterator)
4403 iterator.close()
4404
4405 def test_context_manager_exception(self):
4406 self.create_file("file.txt")
4407 self.create_file("file2.txt")
4408 with self.assertRaises(ZeroDivisionError):
4409 with os.scandir(self.path) as iterator:
4410 next(iterator)
4411 1/0
4412 with self.check_no_resource_warning():
4413 del iterator
4414
4415 def test_resource_warning(self):
4416 self.create_file("file.txt")
4417 self.create_file("file2.txt")
4418 iterator = os.scandir(self.path)
4419 next(iterator)
4420 with self.assertWarns(ResourceWarning):
4421 del iterator
4422 support.gc_collect()
4423 # exhausted iterator
4424 iterator = os.scandir(self.path)
4425 list(iterator)
4426 with self.check_no_resource_warning():
4427 del iterator
4428
Victor Stinner6036e442015-03-08 01:58:04 +01004429
Ethan Furmancdc08792016-06-02 15:06:09 -07004430class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004431
4432 # Abstracted so it can be overridden to test pure Python implementation
4433 # if a C version is provided.
4434 fspath = staticmethod(os.fspath)
4435
Ethan Furmancdc08792016-06-02 15:06:09 -07004436 def test_return_bytes(self):
4437 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004438 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004439
4440 def test_return_string(self):
4441 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004442 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004443
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004444 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004445 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004446 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004447
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004448 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004449 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4450 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4451
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004452 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004453 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4454 self.assertTrue(issubclass(FakePath, os.PathLike))
4455 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004456
Ethan Furmancdc08792016-06-02 15:06:09 -07004457 def test_garbage_in_exception_out(self):
4458 vapor = type('blah', (), {})
4459 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004460 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004461
4462 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004463 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004464
Brett Cannon044283a2016-07-15 10:41:49 -07004465 def test_bad_pathlike(self):
4466 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004467 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004468 # __fspath__ attribute that is not callable.
4469 c = type('foo', (), {})
4470 c.__fspath__ = 1
4471 self.assertRaises(TypeError, self.fspath, c())
4472 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004473 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004474 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004475
Bar Hareleae87e32019-12-22 11:57:27 +02004476 def test_pathlike_subclasshook(self):
4477 # bpo-38878: subclasshook causes subclass checks
4478 # true on abstract implementation.
4479 class A(os.PathLike):
4480 pass
4481 self.assertFalse(issubclass(FakePath, A))
4482 self.assertTrue(issubclass(FakePath, os.PathLike))
4483
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004484 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004485 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004486
Victor Stinnerc29b5852017-11-02 07:28:27 -07004487
4488class TimesTests(unittest.TestCase):
4489 def test_times(self):
4490 times = os.times()
4491 self.assertIsInstance(times, os.times_result)
4492
4493 for field in ('user', 'system', 'children_user', 'children_system',
4494 'elapsed'):
4495 value = getattr(times, field)
4496 self.assertIsInstance(value, float)
4497
4498 if os.name == 'nt':
4499 self.assertEqual(times.children_user, 0)
4500 self.assertEqual(times.children_system, 0)
4501 self.assertEqual(times.elapsed, 0)
4502
4503
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004504# Only test if the C version is provided, otherwise TestPEP519 already tested
4505# the pure Python implementation.
4506if hasattr(os, "_fspath"):
4507 class TestPEP519PurePython(TestPEP519):
4508
4509 """Explicitly test the pure Python implementation of os.fspath()."""
4510
4511 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004512
4513
Fred Drake2e2be372001-09-20 21:33:42 +00004514if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004515 unittest.main()