blob: 501b4a975566a7d29689a1c80d18461c24a2c5aa [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Christian Heimescd9fed62020-11-13 19:48:52 +010018import select
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Christian Heimescd9fed62020-11-13 19:48:52 +010023import struct
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020027import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020028import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020029import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070030import types
Victor Stinner47aacc82015-06-12 17:26:23 +020031import unittest
32import uuid
33import warnings
34from test import support
Hai Shid94af3f2020-08-08 17:32:41 +080035from test.support import import_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080036from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030037from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080038from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080039from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070040from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020041
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042try:
43 import resource
44except ImportError:
45 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010050try:
51 import _winapi
52except ImportError:
53 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020054try:
R David Murrayf2ad1732014-12-25 18:36:56 -050055 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Christian Heimescd9fed62020-11-13 19:48:52 +010064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080066from test.support import unix_shell
67from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
Berker Peksag4af23d72016-09-15 20:32:44 +030087def requires_os_func(name):
88 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
89
90
Victor Stinnerae39d232016-03-24 17:12:55 +010091def create_file(filename, content=b'content'):
92 with open(filename, "xb", 0) as fp:
93 fp.write(content)
94
95
Victor Stinner689830e2019-06-26 17:31:12 +020096class MiscTests(unittest.TestCase):
97 def test_getcwd(self):
98 cwd = os.getcwd()
99 self.assertIsInstance(cwd, str)
100
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200101 def test_getcwd_long_path(self):
102 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
103 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
104 # longer path if longer paths support is enabled. Internally, the os
105 # module uses MAXPATHLEN which is at least 1024.
106 #
107 # Use a directory name of 200 characters to fit into Windows MAX_PATH
108 # limit.
109 #
110 # On Windows, the test can stop when trying to create a path longer
111 # than MAX_PATH if long paths support is disabled:
112 # see RtlAreLongPathsEnabled().
113 min_len = 2000 # characters
114 dirlen = 200 # characters
115 dirname = 'python_test_dir_'
116 dirname = dirname + ('a' * (dirlen - len(dirname)))
117
118 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800119 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200120 expected = path
121
122 while True:
123 cwd = os.getcwd()
124 self.assertEqual(cwd, expected)
125
126 need = min_len - (len(cwd) + len(os.path.sep))
127 if need <= 0:
128 break
129 if len(dirname) > need and need > 0:
130 dirname = dirname[:need]
131
132 path = os.path.join(path, dirname)
133 try:
134 os.mkdir(path)
135 # On Windows, chdir() can fail
136 # even if mkdir() succeeded
137 os.chdir(path)
138 except FileNotFoundError:
139 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
140 # ERROR_FILENAME_EXCED_RANGE (206) errors
141 # ("The filename or extension is too long")
142 break
143 except OSError as exc:
144 if exc.errno == errno.ENAMETOOLONG:
145 break
146 else:
147 raise
148
149 expected = path
150
151 if support.verbose:
152 print(f"Tested current directory length: {len(cwd)}")
153
Victor Stinner689830e2019-06-26 17:31:12 +0200154 def test_getcwdb(self):
155 cwd = os.getcwdb()
156 self.assertIsInstance(cwd, bytes)
157 self.assertEqual(os.fsdecode(cwd), os.getcwd())
158
159
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000160# Tests creating TESTFN
161class FileTests(unittest.TestCase):
162 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800163 if os.path.lexists(os_helper.TESTFN):
164 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000165 tearDown = setUp
166
167 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800168 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000169 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800170 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000171
Christian Heimesfdab48e2008-01-20 09:06:41 +0000172 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800173 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000174 # We must allocate two consecutive file descriptors, otherwise
175 # it will mess up other file descriptors (perhaps even the three
176 # standard ones).
177 second = os.dup(first)
178 try:
179 retries = 0
180 while second != first + 1:
181 os.close(first)
182 retries += 1
183 if retries > 10:
184 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000185 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000186 first, second = second, os.dup(second)
187 finally:
188 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000189 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000190 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000192
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000193 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000194 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800195 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000196 old = sys.getrefcount(path)
197 self.assertRaises(TypeError, os.rename, path, 0)
198 new = sys.getrefcount(path)
199 self.assertEqual(old, new)
200
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000201 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800202 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000203 fobj.write(b"spam")
204 fobj.flush()
205 fd = fobj.fileno()
206 os.lseek(fd, 0, 0)
207 s = os.read(fd, 4)
208 self.assertEqual(type(s), bytes)
209 self.assertEqual(s, b"spam")
210
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200211 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200212 # Skip the test on 32-bit platforms: the number of bytes must fit in a
213 # Py_ssize_t type
214 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
215 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200216 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
217 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800218 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
219 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200220
221 # Issue #21932: Make sure that os.read() does not raise an
222 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800223 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200224 data = os.read(fp.fileno(), size)
225
Victor Stinner8c663fd2017-11-08 14:44:44 -0800226 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200227 # operating system is free to return less bytes than requested.
228 self.assertEqual(data, b'test')
229
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000230 def test_write(self):
231 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800232 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000233 self.assertRaises(TypeError, os.write, fd, "beans")
234 os.write(fd, b"bacon\n")
235 os.write(fd, bytearray(b"eggs\n"))
236 os.write(fd, memoryview(b"spam\n"))
237 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800238 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000239 self.assertEqual(fobj.read().splitlines(),
240 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000241
Victor Stinnere0daff12011-03-20 23:36:35 +0100242 def write_windows_console(self, *args):
243 retcode = subprocess.call(args,
244 # use a new console to not flood the test output
245 creationflags=subprocess.CREATE_NEW_CONSOLE,
246 # use a shell to hide the console window (SW_HIDE)
247 shell=True)
248 self.assertEqual(retcode, 0)
249
250 @unittest.skipUnless(sys.platform == 'win32',
251 'test specific to the Windows console')
252 def test_write_windows_console(self):
253 # Issue #11395: the Windows console returns an error (12: not enough
254 # space error) on writing into stdout if stdout mode is binary and the
255 # length is greater than 66,000 bytes (or less, depending on heap
256 # usage).
257 code = "print('x' * 100000)"
258 self.write_windows_console(sys.executable, "-c", code)
259 self.write_windows_console(sys.executable, "-u", "-c", code)
260
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000261 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800262 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200263 f = os.fdopen(fd, *args)
264 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000265
266 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800267 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200268 os.close(fd)
269
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000270 self.fdopen_helper()
271 self.fdopen_helper('r')
272 self.fdopen_helper('r', 100)
273
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100274 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800275 TESTFN2 = os_helper.TESTFN + ".2"
276 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
277 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100278
Hai Shi0c4f0f32020-06-30 21:46:31 +0800279 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100280 create_file(TESTFN2, b"2")
281
Hai Shi0c4f0f32020-06-30 21:46:31 +0800282 os.replace(os_helper.TESTFN, TESTFN2)
283 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100284 with open(TESTFN2, 'r') as f:
285 self.assertEqual(f.read(), "1")
286
Martin Panterbf19d162015-09-09 01:01:13 +0000287 def test_open_keywords(self):
288 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
289 dir_fd=None)
290 os.close(f)
291
292 def test_symlink_keywords(self):
293 symlink = support.get_attribute(os, "symlink")
294 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800295 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000296 target_is_directory=False, dir_fd=None)
297 except (NotImplementedError, OSError):
298 pass # No OS support or unprivileged user
299
Pablo Galindoaac4d032019-05-31 19:39:47 +0100300 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
301 def test_copy_file_range_invalid_values(self):
302 with self.assertRaises(ValueError):
303 os.copy_file_range(0, 1, -10)
304
305 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
306 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800307 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100308 data = b'0123456789'
309
Hai Shi0c4f0f32020-06-30 21:46:31 +0800310 create_file(os_helper.TESTFN, data)
311 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100312
Hai Shi0c4f0f32020-06-30 21:46:31 +0800313 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100314 self.addCleanup(in_file.close)
315 in_fd = in_file.fileno()
316
317 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800318 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100319 self.addCleanup(out_file.close)
320 out_fd = out_file.fileno()
321
322 try:
323 i = os.copy_file_range(in_fd, out_fd, 5)
324 except OSError as e:
325 # Handle the case in which Python was compiled
326 # in a system with the syscall but without support
327 # in the kernel.
328 if e.errno != errno.ENOSYS:
329 raise
330 self.skipTest(e)
331 else:
332 # The number of copied bytes can be less than
333 # the number of bytes originally requested.
334 self.assertIn(i, range(0, 6));
335
336 with open(TESTFN2, 'rb') as in_file:
337 self.assertEqual(in_file.read(), data[:i])
338
339 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
340 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800341 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100342 data = b'0123456789'
343 bytes_to_copy = 6
344 in_skip = 3
345 out_seek = 5
346
Hai Shi0c4f0f32020-06-30 21:46:31 +0800347 create_file(os_helper.TESTFN, data)
348 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100349
Hai Shi0c4f0f32020-06-30 21:46:31 +0800350 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100351 self.addCleanup(in_file.close)
352 in_fd = in_file.fileno()
353
354 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800355 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100356 self.addCleanup(out_file.close)
357 out_fd = out_file.fileno()
358
359 try:
360 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
361 offset_src=in_skip,
362 offset_dst=out_seek)
363 except OSError as e:
364 # Handle the case in which Python was compiled
365 # in a system with the syscall but without support
366 # in the kernel.
367 if e.errno != errno.ENOSYS:
368 raise
369 self.skipTest(e)
370 else:
371 # The number of copied bytes can be less than
372 # the number of bytes originally requested.
373 self.assertIn(i, range(0, bytes_to_copy+1));
374
375 with open(TESTFN4, 'rb') as in_file:
376 read = in_file.read()
377 # seeked bytes (5) are zero'ed
378 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
379 # 012 are skipped (in_skip)
380 # 345678 are copied in the file (in_skip + bytes_to_copy)
381 self.assertEqual(read[out_seek:],
382 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200383
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000384# Test attributes on return values from os.*stat* family.
385class StatAttributeTests(unittest.TestCase):
386 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800387 self.fname = os_helper.TESTFN
388 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100389 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390
Antoine Pitrou38425292010-09-21 18:19:07 +0000391 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000392 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000393
394 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000395 self.assertEqual(result[stat.ST_SIZE], 3)
396 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000397
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 # Make sure all the attributes are there
399 members = dir(result)
400 for name in dir(stat):
401 if name[:3] == 'ST_':
402 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000403 if name.endswith("TIME"):
404 def trunc(x): return int(x)
405 else:
406 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000407 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000409 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000410
Larry Hastings6fe20b32012-04-19 15:07:49 -0700411 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700412 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700413 for name in 'st_atime st_mtime st_ctime'.split():
414 floaty = int(getattr(result, name) * 100000)
415 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700416 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700417
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000418 try:
419 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200420 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000421 except IndexError:
422 pass
423
424 # Make sure that assignment fails
425 try:
426 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200427 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000428 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000429 pass
430
431 try:
432 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200433 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000434 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000435 pass
436
437 try:
438 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200439 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000440 except AttributeError:
441 pass
442
443 # Use the stat_result constructor with a too-short tuple.
444 try:
445 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200446 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000447 except TypeError:
448 pass
449
Ezio Melotti42da6632011-03-15 05:18:48 +0200450 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000451 try:
452 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
453 except TypeError:
454 pass
455
Antoine Pitrou38425292010-09-21 18:19:07 +0000456 def test_stat_attributes(self):
457 self.check_stat_attributes(self.fname)
458
459 def test_stat_attributes_bytes(self):
460 try:
461 fname = self.fname.encode(sys.getfilesystemencoding())
462 except UnicodeEncodeError:
463 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700464 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000465
Christian Heimes25827622013-10-12 01:27:08 +0200466 def test_stat_result_pickle(self):
467 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200468 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
469 p = pickle.dumps(result, proto)
470 self.assertIn(b'stat_result', p)
471 if proto < 4:
472 self.assertIn(b'cos\nstat_result\n', p)
473 unpickled = pickle.loads(p)
474 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200475
Serhiy Storchaka43767632013-11-03 21:31:38 +0200476 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000477 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700478 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000479
480 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000481 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000482
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000483 # Make sure all the attributes are there.
484 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
485 'ffree', 'favail', 'flag', 'namemax')
486 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000487 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000488
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100489 self.assertTrue(isinstance(result.f_fsid, int))
490
491 # Test that the size of the tuple doesn't change
492 self.assertEqual(len(result), 10)
493
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 # Make sure that assignment really fails
495 try:
496 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200497 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000498 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000499 pass
500
501 try:
502 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200503 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000504 except AttributeError:
505 pass
506
507 # Use the constructor with a too-short tuple.
508 try:
509 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200510 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000511 except TypeError:
512 pass
513
Ezio Melotti42da6632011-03-15 05:18:48 +0200514 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000515 try:
516 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
517 except TypeError:
518 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000519
Christian Heimes25827622013-10-12 01:27:08 +0200520 @unittest.skipUnless(hasattr(os, 'statvfs'),
521 "need os.statvfs()")
522 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700523 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200524
Serhiy Storchakabad12572014-12-15 14:03:42 +0200525 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
526 p = pickle.dumps(result, proto)
527 self.assertIn(b'statvfs_result', p)
528 if proto < 4:
529 self.assertIn(b'cos\nstatvfs_result\n', p)
530 unpickled = pickle.loads(p)
531 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 def test_1686475(self):
535 # Verify that an open file can be stat'ed
536 try:
537 os.stat(r"c:\pagefile.sys")
538 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600539 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200540 except OSError as e:
541 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000542
Serhiy Storchaka43767632013-11-03 21:31:38 +0200543 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
544 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
545 def test_15261(self):
546 # Verify that stat'ing a closed fd does not cause crash
547 r, w = os.pipe()
548 try:
549 os.stat(r) # should not raise error
550 finally:
551 os.close(r)
552 os.close(w)
553 with self.assertRaises(OSError) as ctx:
554 os.stat(r)
555 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100556
Zachary Ware63f277b2014-06-19 09:46:37 -0500557 def check_file_attributes(self, result):
558 self.assertTrue(hasattr(result, 'st_file_attributes'))
559 self.assertTrue(isinstance(result.st_file_attributes, int))
560 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
561
562 @unittest.skipUnless(sys.platform == "win32",
563 "st_file_attributes is Win32 specific")
564 def test_file_attributes(self):
565 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
566 result = os.stat(self.fname)
567 self.check_file_attributes(result)
568 self.assertEqual(
569 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
570 0)
571
572 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800573 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200574 os.mkdir(dirname)
575 self.addCleanup(os.rmdir, dirname)
576
577 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500578 self.check_file_attributes(result)
579 self.assertEqual(
580 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
581 stat.FILE_ATTRIBUTE_DIRECTORY)
582
Berker Peksag0b4dc482016-09-17 15:49:59 +0300583 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
584 def test_access_denied(self):
585 # Default to FindFirstFile WIN32_FIND_DATA when access is
586 # denied. See issue 28075.
587 # os.environ['TEMP'] should be located on a volume that
588 # supports file ACLs.
589 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800590 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300591 create_file(fname, b'ABC')
592 # Deny the right to [S]YNCHRONIZE on the file to
593 # force CreateFile to fail with ERROR_ACCESS_DENIED.
594 DETACHED_PROCESS = 8
595 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500596 # bpo-30584: Use security identifier *S-1-5-32-545 instead
597 # of localized "Users" to not depend on the locale.
598 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300599 creationflags=DETACHED_PROCESS
600 )
601 result = os.stat(fname)
602 self.assertNotEqual(result.st_size, 0)
603
Steve Dower772ec0f2019-09-04 14:42:54 -0700604 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
605 def test_stat_block_device(self):
606 # bpo-38030: os.stat fails for block devices
607 # Test a filename like "//./C:"
608 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
609 result = os.stat(fname)
610 self.assertEqual(result.st_mode, stat.S_IFBLK)
611
Victor Stinner47aacc82015-06-12 17:26:23 +0200612
613class UtimeTests(unittest.TestCase):
614 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800615 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200616 self.fname = os.path.join(self.dirname, "f1")
617
Hai Shi0c4f0f32020-06-30 21:46:31 +0800618 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200619 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100620 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200621
Victor Stinner47aacc82015-06-12 17:26:23 +0200622 def support_subsecond(self, filename):
623 # Heuristic to check if the filesystem supports timestamp with
624 # subsecond resolution: check if float and int timestamps are different
625 st = os.stat(filename)
626 return ((st.st_atime != st[7])
627 or (st.st_mtime != st[8])
628 or (st.st_ctime != st[9]))
629
630 def _test_utime(self, set_time, filename=None):
631 if not filename:
632 filename = self.fname
633
634 support_subsecond = self.support_subsecond(filename)
635 if support_subsecond:
636 # Timestamp with a resolution of 1 microsecond (10^-6).
637 #
638 # The resolution of the C internal function used by os.utime()
639 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
640 # test with a resolution of 1 ns requires more work:
641 # see the issue #15745.
642 atime_ns = 1002003000 # 1.002003 seconds
643 mtime_ns = 4005006000 # 4.005006 seconds
644 else:
645 # use a resolution of 1 second
646 atime_ns = 5 * 10**9
647 mtime_ns = 8 * 10**9
648
649 set_time(filename, (atime_ns, mtime_ns))
650 st = os.stat(filename)
651
652 if support_subsecond:
653 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
654 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
655 else:
656 self.assertEqual(st.st_atime, atime_ns * 1e-9)
657 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
658 self.assertEqual(st.st_atime_ns, atime_ns)
659 self.assertEqual(st.st_mtime_ns, mtime_ns)
660
661 def test_utime(self):
662 def set_time(filename, ns):
663 # test the ns keyword parameter
664 os.utime(filename, ns=ns)
665 self._test_utime(set_time)
666
667 @staticmethod
668 def ns_to_sec(ns):
669 # Convert a number of nanosecond (int) to a number of seconds (float).
670 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
671 # issue, os.utime() rounds towards minus infinity.
672 return (ns * 1e-9) + 0.5e-9
673
674 def test_utime_by_indexed(self):
675 # pass times as floating point seconds as the second indexed parameter
676 def set_time(filename, ns):
677 atime_ns, mtime_ns = ns
678 atime = self.ns_to_sec(atime_ns)
679 mtime = self.ns_to_sec(mtime_ns)
680 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
681 # or utime(time_t)
682 os.utime(filename, (atime, mtime))
683 self._test_utime(set_time)
684
685 def test_utime_by_times(self):
686 def set_time(filename, ns):
687 atime_ns, mtime_ns = ns
688 atime = self.ns_to_sec(atime_ns)
689 mtime = self.ns_to_sec(mtime_ns)
690 # test the times keyword parameter
691 os.utime(filename, times=(atime, mtime))
692 self._test_utime(set_time)
693
694 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
695 "follow_symlinks support for utime required "
696 "for this test.")
697 def test_utime_nofollow_symlinks(self):
698 def set_time(filename, ns):
699 # use follow_symlinks=False to test utimensat(timespec)
700 # or lutimes(timeval)
701 os.utime(filename, ns=ns, follow_symlinks=False)
702 self._test_utime(set_time)
703
704 @unittest.skipUnless(os.utime in os.supports_fd,
705 "fd support for utime required for this test.")
706 def test_utime_fd(self):
707 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100708 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200709 # use a file descriptor to test futimens(timespec)
710 # or futimes(timeval)
711 os.utime(fp.fileno(), ns=ns)
712 self._test_utime(set_time)
713
714 @unittest.skipUnless(os.utime in os.supports_dir_fd,
715 "dir_fd support for utime required for this test.")
716 def test_utime_dir_fd(self):
717 def set_time(filename, ns):
718 dirname, name = os.path.split(filename)
719 dirfd = os.open(dirname, os.O_RDONLY)
720 try:
721 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
722 os.utime(name, dir_fd=dirfd, ns=ns)
723 finally:
724 os.close(dirfd)
725 self._test_utime(set_time)
726
727 def test_utime_directory(self):
728 def set_time(filename, ns):
729 # test calling os.utime() on a directory
730 os.utime(filename, ns=ns)
731 self._test_utime(set_time, filename=self.dirname)
732
733 def _test_utime_current(self, set_time):
734 # Get the system clock
735 current = time.time()
736
737 # Call os.utime() to set the timestamp to the current system clock
738 set_time(self.fname)
739
740 if not self.support_subsecond(self.fname):
741 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700742 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200743 # On Windows, the usual resolution of time.time() is 15.6 ms.
744 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700745 #
746 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
747 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200748 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200749 st = os.stat(self.fname)
750 msg = ("st_time=%r, current=%r, dt=%r"
751 % (st.st_mtime, current, st.st_mtime - current))
752 self.assertAlmostEqual(st.st_mtime, current,
753 delta=delta, msg=msg)
754
755 def test_utime_current(self):
756 def set_time(filename):
757 # Set to the current time in the new way
758 os.utime(self.fname)
759 self._test_utime_current(set_time)
760
761 def test_utime_current_old(self):
762 def set_time(filename):
763 # Set to the current time in the old explicit way.
764 os.utime(self.fname, None)
765 self._test_utime_current(set_time)
766
767 def get_file_system(self, path):
768 if sys.platform == 'win32':
769 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
770 import ctypes
771 kernel32 = ctypes.windll.kernel32
772 buf = ctypes.create_unicode_buffer("", 100)
773 ok = kernel32.GetVolumeInformationW(root, None, 0,
774 None, None, None,
775 buf, len(buf))
776 if ok:
777 return buf.value
778 # return None if the filesystem is unknown
779
780 def test_large_time(self):
781 # Many filesystems are limited to the year 2038. At least, the test
782 # pass with NTFS filesystem.
783 if self.get_file_system(self.dirname) != "NTFS":
784 self.skipTest("requires NTFS")
785
786 large = 5000000000 # some day in 2128
787 os.utime(self.fname, (large, large))
788 self.assertEqual(os.stat(self.fname).st_mtime, large)
789
790 def test_utime_invalid_arguments(self):
791 # seconds and nanoseconds parameters are mutually exclusive
792 with self.assertRaises(ValueError):
793 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200794 with self.assertRaises(TypeError):
795 os.utime(self.fname, [5, 5])
796 with self.assertRaises(TypeError):
797 os.utime(self.fname, (5,))
798 with self.assertRaises(TypeError):
799 os.utime(self.fname, (5, 5, 5))
800 with self.assertRaises(TypeError):
801 os.utime(self.fname, ns=[5, 5])
802 with self.assertRaises(TypeError):
803 os.utime(self.fname, ns=(5,))
804 with self.assertRaises(TypeError):
805 os.utime(self.fname, ns=(5, 5, 5))
806
807 if os.utime not in os.supports_follow_symlinks:
808 with self.assertRaises(NotImplementedError):
809 os.utime(self.fname, (5, 5), follow_symlinks=False)
810 if os.utime not in os.supports_fd:
811 with open(self.fname, 'wb', 0) as fp:
812 with self.assertRaises(TypeError):
813 os.utime(fp.fileno(), (5, 5))
814 if os.utime not in os.supports_dir_fd:
815 with self.assertRaises(NotImplementedError):
816 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200817
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300818 @support.cpython_only
819 def test_issue31577(self):
820 # The interpreter shouldn't crash in case utime() received a bad
821 # ns argument.
822 def get_bad_int(divmod_ret_val):
823 class BadInt:
824 def __divmod__(*args):
825 return divmod_ret_val
826 return BadInt()
827 with self.assertRaises(TypeError):
828 os.utime(self.fname, ns=(get_bad_int(42), 1))
829 with self.assertRaises(TypeError):
830 os.utime(self.fname, ns=(get_bad_int(()), 1))
831 with self.assertRaises(TypeError):
832 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
833
Victor Stinner47aacc82015-06-12 17:26:23 +0200834
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000835from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000836
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000837class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000839 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000840
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000841 def setUp(self):
842 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000843 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000844 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000845 for key, value in self._reference().items():
846 os.environ[key] = value
847
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000848 def tearDown(self):
849 os.environ.clear()
850 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000851 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000852 os.environb.clear()
853 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000854
Christian Heimes90333392007-11-01 19:08:42 +0000855 def _reference(self):
856 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
857
858 def _empty_mapping(self):
859 os.environ.clear()
860 return os.environ
861
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000862 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200863 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
864 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000865 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000866 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300867 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200868 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300869 value = popen.read().strip()
870 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000871
Xavier de Gayed1415312016-07-22 12:15:29 +0200872 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
873 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000874 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200875 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
876 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300877 it = iter(popen)
878 self.assertEqual(next(it), "line1\n")
879 self.assertEqual(next(it), "line2\n")
880 self.assertEqual(next(it), "line3\n")
881 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000882
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000883 # Verify environ keys and values from the OS are of the
884 # correct str type.
885 def test_keyvalue_types(self):
886 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000887 self.assertEqual(type(key), str)
888 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000889
Christian Heimes90333392007-11-01 19:08:42 +0000890 def test_items(self):
891 for key, value in self._reference().items():
892 self.assertEqual(os.environ.get(key), value)
893
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000894 # Issue 7310
895 def test___repr__(self):
896 """Check that the repr() of os.environ looks like environ({...})."""
897 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000898 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
899 '{!r}: {!r}'.format(key, value)
900 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000901
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000902 def test_get_exec_path(self):
903 defpath_list = os.defpath.split(os.pathsep)
904 test_path = ['/monty', '/python', '', '/flying/circus']
905 test_env = {'PATH': os.pathsep.join(test_path)}
906
907 saved_environ = os.environ
908 try:
909 os.environ = dict(test_env)
910 # Test that defaulting to os.environ works.
911 self.assertSequenceEqual(test_path, os.get_exec_path())
912 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
913 finally:
914 os.environ = saved_environ
915
916 # No PATH environment variable
917 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
918 # Empty PATH environment variable
919 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
920 # Supplied PATH environment variable
921 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
922
Victor Stinnerb745a742010-05-18 17:17:23 +0000923 if os.supports_bytes_environ:
924 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000925 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000926 # ignore BytesWarning warning
927 with warnings.catch_warnings(record=True):
928 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000929 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000930 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000931 pass
932 else:
933 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000934
935 # bytes key and/or value
936 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
937 ['abc'])
938 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
939 ['abc'])
940 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
941 ['abc'])
942
943 @unittest.skipUnless(os.supports_bytes_environ,
944 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 def test_environb(self):
946 # os.environ -> os.environb
947 value = 'euro\u20ac'
948 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000949 value_bytes = value.encode(sys.getfilesystemencoding(),
950 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000951 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000952 msg = "U+20AC character is not encodable to %s" % (
953 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000954 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000955 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000956 self.assertEqual(os.environ['unicode'], value)
957 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000958
959 # os.environb -> os.environ
960 value = b'\xff'
961 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000962 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000963 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000964 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000965
Victor Stinner161e7b32020-01-24 11:53:44 +0100966 def test_putenv_unsetenv(self):
967 name = "PYTHONTESTVAR"
968 value = "testvalue"
969 code = f'import os; print(repr(os.environ.get({name!r})))'
970
Hai Shi0c4f0f32020-06-30 21:46:31 +0800971 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +0100972 env.pop(name, None)
973
974 os.putenv(name, value)
975 proc = subprocess.run([sys.executable, '-c', code], check=True,
976 stdout=subprocess.PIPE, text=True)
977 self.assertEqual(proc.stdout.rstrip(), repr(value))
978
979 os.unsetenv(name)
980 proc = subprocess.run([sys.executable, '-c', code], check=True,
981 stdout=subprocess.PIPE, text=True)
982 self.assertEqual(proc.stdout.rstrip(), repr(None))
983
Victor Stinner13ff2452018-01-22 18:32:50 +0100984 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100985 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100986 def test_putenv_unsetenv_error(self):
987 # Empty variable name is invalid.
988 # "=" and null character are not allowed in a variable name.
989 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
990 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
991 self.assertRaises((OSError, ValueError), os.unsetenv, name)
992
Victor Stinnerb73dd022020-01-22 21:11:17 +0100993 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100994 # On Windows, an environment variable string ("name=value" string)
995 # is limited to 32,767 characters
996 longstr = 'x' * 32_768
997 self.assertRaises(ValueError, os.putenv, longstr, "1")
998 self.assertRaises(ValueError, os.putenv, "X", longstr)
999 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +01001000
Victor Stinner6d101392013-04-14 16:35:04 +02001001 def test_key_type(self):
1002 missing = 'missingkey'
1003 self.assertNotIn(missing, os.environ)
1004
Victor Stinner839e5ea2013-04-14 16:43:03 +02001005 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001006 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001007 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001008 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001009
Victor Stinner839e5ea2013-04-14 16:43:03 +02001010 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001011 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001012 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001013 self.assertTrue(cm.exception.__suppress_context__)
1014
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001015 def _test_environ_iteration(self, collection):
1016 iterator = iter(collection)
1017 new_key = "__new_key__"
1018
1019 next(iterator) # start iteration over os.environ.items
1020
1021 # add a new key in os.environ mapping
1022 os.environ[new_key] = "test_environ_iteration"
1023
1024 try:
1025 next(iterator) # force iteration over modified mapping
1026 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1027 finally:
1028 del os.environ[new_key]
1029
1030 def test_iter_error_when_changing_os_environ(self):
1031 self._test_environ_iteration(os.environ)
1032
1033 def test_iter_error_when_changing_os_environ_items(self):
1034 self._test_environ_iteration(os.environ.items())
1035
1036 def test_iter_error_when_changing_os_environ_values(self):
1037 self._test_environ_iteration(os.environ.values())
1038
Charles Burklandd648ef12020-03-13 09:04:43 -07001039 def _test_underlying_process_env(self, var, expected):
1040 if not (unix_shell and os.path.exists(unix_shell)):
1041 return
1042
1043 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1044 value = popen.read().strip()
1045
1046 self.assertEqual(expected, value)
1047
1048 def test_or_operator(self):
1049 overridden_key = '_TEST_VAR_'
1050 original_value = 'original_value'
1051 os.environ[overridden_key] = original_value
1052
1053 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1054 expected = dict(os.environ)
1055 expected.update(new_vars_dict)
1056
1057 actual = os.environ | new_vars_dict
1058 self.assertDictEqual(expected, actual)
1059 self.assertEqual('3', actual[overridden_key])
1060
1061 new_vars_items = new_vars_dict.items()
1062 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1063
1064 self._test_underlying_process_env('_A_', '')
1065 self._test_underlying_process_env(overridden_key, original_value)
1066
1067 def test_ior_operator(self):
1068 overridden_key = '_TEST_VAR_'
1069 os.environ[overridden_key] = 'original_value'
1070
1071 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1072 expected = dict(os.environ)
1073 expected.update(new_vars_dict)
1074
1075 os.environ |= new_vars_dict
1076 self.assertEqual(expected, os.environ)
1077 self.assertEqual('3', os.environ[overridden_key])
1078
1079 self._test_underlying_process_env('_A_', '1')
1080 self._test_underlying_process_env(overridden_key, '3')
1081
1082 def test_ior_operator_invalid_dicts(self):
1083 os_environ_copy = os.environ.copy()
1084 with self.assertRaises(TypeError):
1085 dict_with_bad_key = {1: '_A_'}
1086 os.environ |= dict_with_bad_key
1087
1088 with self.assertRaises(TypeError):
1089 dict_with_bad_val = {'_A_': 1}
1090 os.environ |= dict_with_bad_val
1091
1092 # Check nothing was added.
1093 self.assertEqual(os_environ_copy, os.environ)
1094
1095 def test_ior_operator_key_value_iterable(self):
1096 overridden_key = '_TEST_VAR_'
1097 os.environ[overridden_key] = 'original_value'
1098
1099 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1100 expected = dict(os.environ)
1101 expected.update(new_vars_items)
1102
1103 os.environ |= new_vars_items
1104 self.assertEqual(expected, os.environ)
1105 self.assertEqual('3', os.environ[overridden_key])
1106
1107 self._test_underlying_process_env('_A_', '1')
1108 self._test_underlying_process_env(overridden_key, '3')
1109
1110 def test_ror_operator(self):
1111 overridden_key = '_TEST_VAR_'
1112 original_value = 'original_value'
1113 os.environ[overridden_key] = original_value
1114
1115 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1116 expected = dict(new_vars_dict)
1117 expected.update(os.environ)
1118
1119 actual = new_vars_dict | os.environ
1120 self.assertDictEqual(expected, actual)
1121 self.assertEqual(original_value, actual[overridden_key])
1122
1123 new_vars_items = new_vars_dict.items()
1124 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1125
1126 self._test_underlying_process_env('_A_', '')
1127 self._test_underlying_process_env(overridden_key, original_value)
1128
Victor Stinner6d101392013-04-14 16:35:04 +02001129
Tim Petersc4e09402003-04-25 07:11:48 +00001130class WalkTests(unittest.TestCase):
1131 """Tests for os.walk()."""
1132
Victor Stinner0561c532015-03-12 10:28:24 +01001133 # Wrapper to hide minor differences between os.walk and os.fwalk
1134 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001135 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001136 if 'follow_symlinks' in kwargs:
1137 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001138 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001139
Charles-François Natali7372b062012-02-05 15:15:38 +01001140 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001141 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001142 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001143
1144 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001145 # TESTFN/
1146 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001147 # tmp1
1148 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001149 # tmp2
1150 # SUB11/ no kids
1151 # SUB2/ a file kid and a dirsymlink kid
1152 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001153 # SUB21/ not readable
1154 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001155 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001156 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001157 # broken_link2
1158 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001159 # TEST2/
1160 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001161 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001162 self.sub1_path = join(self.walk_path, "SUB1")
1163 self.sub11_path = join(self.sub1_path, "SUB11")
1164 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001165 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001166 tmp1_path = join(self.walk_path, "tmp1")
1167 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001168 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001169 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001170 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001171 t2_path = join(os_helper.TESTFN, "TEST2")
1172 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001173 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001174 broken_link2_path = join(sub2_path, "broken_link2")
1175 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001176
1177 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001178 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001179 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001180 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001181 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001182
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001183 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001184 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001185 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001186
Hai Shi0c4f0f32020-06-30 21:46:31 +08001187 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001188 os.symlink(os.path.abspath(t2_path), self.link_path)
1189 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001190 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1191 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001192 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001193 ["broken_link", "broken_link2", "broken_link3",
1194 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001195 else:
pxinwr3e028b22019-02-15 13:04:47 +08001196 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001197
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001198 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001199 try:
1200 os.listdir(sub21_path)
1201 except PermissionError:
1202 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1203 else:
1204 os.chmod(sub21_path, stat.S_IRWXU)
1205 os.unlink(tmp5_path)
1206 os.rmdir(sub21_path)
1207 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001208
Victor Stinner0561c532015-03-12 10:28:24 +01001209 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001210 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001211 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001212
Tim Petersc4e09402003-04-25 07:11:48 +00001213 self.assertEqual(len(all), 4)
1214 # We can't know which order SUB1 and SUB2 will appear in.
1215 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1216 # flipped: TESTFN, SUB2, SUB1, SUB11
1217 flipped = all[0][1][0] != "SUB1"
1218 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001219 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001220 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001221 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1222 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1223 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1224 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001225
Brett Cannon3f9183b2016-08-26 14:44:48 -07001226 def test_walk_prune(self, walk_path=None):
1227 if walk_path is None:
1228 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001229 # Prune the search.
1230 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001231 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001232 all.append((root, dirs, files))
1233 # Don't descend into SUB1.
1234 if 'SUB1' in dirs:
1235 # Note that this also mutates the dirs we appended to all!
1236 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001237
Victor Stinner0561c532015-03-12 10:28:24 +01001238 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001239 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001240
1241 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001242 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001243 self.assertEqual(all[1], self.sub2_tree)
1244
Brett Cannon3f9183b2016-08-26 14:44:48 -07001245 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001246 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001247
Victor Stinner0561c532015-03-12 10:28:24 +01001248 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001249 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001250 all = list(self.walk(self.walk_path, topdown=False))
1251
Victor Stinner53b0a412016-03-26 01:12:36 +01001252 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001253 # We can't know which order SUB1 and SUB2 will appear in.
1254 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1255 # flipped: SUB2, SUB11, SUB1, TESTFN
1256 flipped = all[3][1][0] != "SUB1"
1257 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001258 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001259 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001260 self.assertEqual(all[3],
1261 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1262 self.assertEqual(all[flipped],
1263 (self.sub11_path, [], []))
1264 self.assertEqual(all[flipped + 1],
1265 (self.sub1_path, ["SUB11"], ["tmp2"]))
1266 self.assertEqual(all[2 - 2 * flipped],
1267 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001268
Victor Stinner0561c532015-03-12 10:28:24 +01001269 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001270 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001271 self.skipTest("need symlink support")
1272
1273 # Walk, following symlinks.
1274 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1275 for root, dirs, files in walk_it:
1276 if root == self.link_path:
1277 self.assertEqual(dirs, [])
1278 self.assertEqual(files, ["tmp4"])
1279 break
1280 else:
1281 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001282
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001283 def test_walk_bad_dir(self):
1284 # Walk top-down.
1285 errors = []
1286 walk_it = self.walk(self.walk_path, onerror=errors.append)
1287 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001288 self.assertEqual(errors, [])
1289 dir1 = 'SUB1'
1290 path1 = os.path.join(root, dir1)
1291 path1new = os.path.join(root, dir1 + '.new')
1292 os.rename(path1, path1new)
1293 try:
1294 roots = [r for r, d, f in walk_it]
1295 self.assertTrue(errors)
1296 self.assertNotIn(path1, roots)
1297 self.assertNotIn(path1new, roots)
1298 for dir2 in dirs:
1299 if dir2 != dir1:
1300 self.assertIn(os.path.join(root, dir2), roots)
1301 finally:
1302 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001303
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001304 def test_walk_many_open_files(self):
1305 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001306 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001307 p = os.path.join(base, *(['d']*depth))
1308 os.makedirs(p)
1309
1310 iters = [self.walk(base, topdown=False) for j in range(100)]
1311 for i in range(depth + 1):
1312 expected = (p, ['d'] if i else [], [])
1313 for it in iters:
1314 self.assertEqual(next(it), expected)
1315 p = os.path.dirname(p)
1316
1317 iters = [self.walk(base, topdown=True) for j in range(100)]
1318 p = base
1319 for i in range(depth + 1):
1320 expected = (p, ['d'] if i < depth else [], [])
1321 for it in iters:
1322 self.assertEqual(next(it), expected)
1323 p = os.path.join(p, 'd')
1324
Charles-François Natali7372b062012-02-05 15:15:38 +01001325
1326@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1327class FwalkTests(WalkTests):
1328 """Tests for os.fwalk()."""
1329
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001330 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001331 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001332 yield (root, dirs, files)
1333
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001334 def fwalk(self, *args, **kwargs):
1335 return os.fwalk(*args, **kwargs)
1336
Larry Hastingsc48fe982012-06-25 04:49:05 -07001337 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1338 """
1339 compare with walk() results.
1340 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001341 walk_kwargs = walk_kwargs.copy()
1342 fwalk_kwargs = fwalk_kwargs.copy()
1343 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1344 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1345 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001346
Charles-François Natali7372b062012-02-05 15:15:38 +01001347 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001348 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001349 expected[root] = (set(dirs), set(files))
1350
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001351 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001352 self.assertIn(root, expected)
1353 self.assertEqual(expected[root], (set(dirs), set(files)))
1354
Larry Hastingsc48fe982012-06-25 04:49:05 -07001355 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001356 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001357 self._compare_to_walk(kwargs, kwargs)
1358
Charles-François Natali7372b062012-02-05 15:15:38 +01001359 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001360 try:
1361 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001362 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001363 fwalk_kwargs = walk_kwargs.copy()
1364 fwalk_kwargs['dir_fd'] = fd
1365 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1366 finally:
1367 os.close(fd)
1368
1369 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001370 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001371 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001372 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001373 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001374 # check that the FD is valid
1375 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001376 # redundant check
1377 os.stat(rootfd)
1378 # check that listdir() returns consistent information
1379 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001380
1381 def test_fd_leak(self):
1382 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1383 # we both check that calling fwalk() a large number of times doesn't
1384 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1385 minfd = os.dup(1)
1386 os.close(minfd)
1387 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001388 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001389 pass
1390 newfd = os.dup(1)
1391 self.addCleanup(os.close, newfd)
1392 self.assertEqual(newfd, minfd)
1393
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001394 # fwalk() keeps file descriptors open
1395 test_walk_many_open_files = None
1396
1397
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001398class BytesWalkTests(WalkTests):
1399 """Tests for os.walk() with bytes."""
1400 def walk(self, top, **kwargs):
1401 if 'follow_symlinks' in kwargs:
1402 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1403 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1404 root = os.fsdecode(broot)
1405 dirs = list(map(os.fsdecode, bdirs))
1406 files = list(map(os.fsdecode, bfiles))
1407 yield (root, dirs, files)
1408 bdirs[:] = list(map(os.fsencode, dirs))
1409 bfiles[:] = list(map(os.fsencode, files))
1410
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001411@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1412class BytesFwalkTests(FwalkTests):
1413 """Tests for os.walk() with bytes."""
1414 def fwalk(self, top='.', *args, **kwargs):
1415 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1416 root = os.fsdecode(broot)
1417 dirs = list(map(os.fsdecode, bdirs))
1418 files = list(map(os.fsdecode, bfiles))
1419 yield (root, dirs, files, topfd)
1420 bdirs[:] = list(map(os.fsencode, dirs))
1421 bfiles[:] = list(map(os.fsencode, files))
1422
Charles-François Natali7372b062012-02-05 15:15:38 +01001423
Guido van Rossume7ba4952007-06-06 23:52:48 +00001424class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001425 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001426 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001427
1428 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001429 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001430 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1431 os.makedirs(path) # Should work
1432 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1433 os.makedirs(path)
1434
1435 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001436 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001437 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1438 os.makedirs(path)
1439 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1440 'dir5', 'dir6')
1441 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001442
Serhiy Storchakae304e332017-03-24 13:27:42 +02001443 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001444 with os_helper.temp_umask(0o002):
1445 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001446 parent = os.path.join(base, 'dir1')
1447 path = os.path.join(parent, 'dir2')
1448 os.makedirs(path, 0o555)
1449 self.assertTrue(os.path.exists(path))
1450 self.assertTrue(os.path.isdir(path))
1451 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001452 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1453 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001454
Terry Reedy5a22b652010-12-02 07:05:56 +00001455 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001456 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001457 mode = 0o777
1458 old_mask = os.umask(0o022)
1459 os.makedirs(path, mode)
1460 self.assertRaises(OSError, os.makedirs, path, mode)
1461 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001462 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001463 os.makedirs(path, mode=mode, exist_ok=True)
1464 os.umask(old_mask)
1465
Martin Pantera82642f2015-11-19 04:48:44 +00001466 # Issue #25583: A drive root could raise PermissionError on Windows
1467 os.makedirs(os.path.abspath('/'), exist_ok=True)
1468
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001469 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001470 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001471 S_ISGID = stat.S_ISGID
1472 mode = 0o777
1473 old_mask = os.umask(0o022)
1474 try:
1475 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001476 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001477 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001478 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001479 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001480 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001481 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001482 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1483 # The os should apply S_ISGID from the parent dir for us, but
1484 # this test need not depend on that behavior. Be explicit.
1485 os.makedirs(path, mode | S_ISGID)
1486 # http://bugs.python.org/issue14992
1487 # Should not fail when the bit is already set.
1488 os.makedirs(path, mode, exist_ok=True)
1489 # remove the bit.
1490 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001491 # May work even when the bit is not already set when demanded.
1492 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001493 finally:
1494 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001495
1496 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001497 base = os_helper.TESTFN
1498 path = os.path.join(os_helper.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001499 with open(path, 'w') as f:
1500 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001501 self.assertRaises(OSError, os.makedirs, path)
1502 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1503 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1504 os.remove(path)
1505
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001506 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001507 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001508 'dir4', 'dir5', 'dir6')
1509 # If the tests failed, the bottom-most directory ('../dir6')
1510 # may not have been created, so we look for the outermost directory
1511 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001512 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001513 path = os.path.dirname(path)
1514
1515 os.removedirs(path)
1516
Andrew Svetlov405faed2012-12-25 12:18:09 +02001517
R David Murrayf2ad1732014-12-25 18:36:56 -05001518@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1519class ChownFileTests(unittest.TestCase):
1520
Berker Peksag036a71b2015-07-21 09:29:48 +03001521 @classmethod
1522 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001523 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001524
1525 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001526 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001527 uid = stat.st_uid
1528 gid = stat.st_gid
1529 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001530 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1531 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1532 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1533 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001534
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001535 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1536 def test_chown_gid(self):
1537 groups = os.getgroups()
1538 if len(groups) < 2:
1539 self.skipTest("test needs at least 2 groups")
1540
R David Murrayf2ad1732014-12-25 18:36:56 -05001541 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001542 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001543
Hai Shi0c4f0f32020-06-30 21:46:31 +08001544 os.chown(os_helper.TESTFN, uid, gid_1)
1545 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001546 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001547
Hai Shi0c4f0f32020-06-30 21:46:31 +08001548 os.chown(os_helper.TESTFN, uid, gid_2)
1549 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001550 self.assertEqual(gid, gid_2)
1551
1552 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1553 "test needs root privilege and more than one user")
1554 def test_chown_with_root(self):
1555 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001556 gid = os.stat(os_helper.TESTFN).st_gid
1557 os.chown(os_helper.TESTFN, uid_1, gid)
1558 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001559 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001560 os.chown(os_helper.TESTFN, uid_2, gid)
1561 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001562 self.assertEqual(uid, uid_2)
1563
1564 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1565 "test needs non-root account and more than one user")
1566 def test_chown_without_permission(self):
1567 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001568 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001569 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001570 os.chown(os_helper.TESTFN, uid_1, gid)
1571 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001572
Berker Peksag036a71b2015-07-21 09:29:48 +03001573 @classmethod
1574 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001575 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001576
1577
Andrew Svetlov405faed2012-12-25 12:18:09 +02001578class RemoveDirsTests(unittest.TestCase):
1579 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001580 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001581
1582 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001583 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001584
1585 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001586 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001587 os.mkdir(dira)
1588 dirb = os.path.join(dira, 'dirb')
1589 os.mkdir(dirb)
1590 os.removedirs(dirb)
1591 self.assertFalse(os.path.exists(dirb))
1592 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001593 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001594
1595 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001596 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001597 os.mkdir(dira)
1598 dirb = os.path.join(dira, 'dirb')
1599 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001600 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001601 os.removedirs(dirb)
1602 self.assertFalse(os.path.exists(dirb))
1603 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001604 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001605
1606 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001607 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001608 os.mkdir(dira)
1609 dirb = os.path.join(dira, 'dirb')
1610 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001611 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001612 with self.assertRaises(OSError):
1613 os.removedirs(dirb)
1614 self.assertTrue(os.path.exists(dirb))
1615 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001616 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001617
1618
Guido van Rossume7ba4952007-06-06 23:52:48 +00001619class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001620 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001621 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001622 f.write(b'hello')
1623 f.close()
1624 with open(os.devnull, 'rb') as f:
1625 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001626
Andrew Svetlov405faed2012-12-25 12:18:09 +02001627
Guido van Rossume7ba4952007-06-06 23:52:48 +00001628class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001629 def test_urandom_length(self):
1630 self.assertEqual(len(os.urandom(0)), 0)
1631 self.assertEqual(len(os.urandom(1)), 1)
1632 self.assertEqual(len(os.urandom(10)), 10)
1633 self.assertEqual(len(os.urandom(100)), 100)
1634 self.assertEqual(len(os.urandom(1000)), 1000)
1635
1636 def test_urandom_value(self):
1637 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001638 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001639 data2 = os.urandom(16)
1640 self.assertNotEqual(data1, data2)
1641
1642 def get_urandom_subprocess(self, count):
1643 code = '\n'.join((
1644 'import os, sys',
1645 'data = os.urandom(%s)' % count,
1646 'sys.stdout.buffer.write(data)',
1647 'sys.stdout.buffer.flush()'))
1648 out = assert_python_ok('-c', code)
1649 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001650 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001651 return stdout
1652
1653 def test_urandom_subprocess(self):
1654 data1 = self.get_urandom_subprocess(16)
1655 data2 = self.get_urandom_subprocess(16)
1656 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001657
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001658
Victor Stinner9b1f4742016-09-06 16:18:52 -07001659@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1660class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001661 @classmethod
1662 def setUpClass(cls):
1663 try:
1664 os.getrandom(1)
1665 except OSError as exc:
1666 if exc.errno == errno.ENOSYS:
1667 # Python compiled on a more recent Linux version
1668 # than the current Linux kernel
1669 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1670 else:
1671 raise
1672
Victor Stinner9b1f4742016-09-06 16:18:52 -07001673 def test_getrandom_type(self):
1674 data = os.getrandom(16)
1675 self.assertIsInstance(data, bytes)
1676 self.assertEqual(len(data), 16)
1677
1678 def test_getrandom0(self):
1679 empty = os.getrandom(0)
1680 self.assertEqual(empty, b'')
1681
1682 def test_getrandom_random(self):
1683 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1684
1685 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1686 # resource /dev/random
1687
1688 def test_getrandom_nonblock(self):
1689 # The call must not fail. Check also that the flag exists
1690 try:
1691 os.getrandom(1, os.GRND_NONBLOCK)
1692 except BlockingIOError:
1693 # System urandom is not initialized yet
1694 pass
1695
1696 def test_getrandom_value(self):
1697 data1 = os.getrandom(16)
1698 data2 = os.getrandom(16)
1699 self.assertNotEqual(data1, data2)
1700
1701
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001702# os.urandom() doesn't use a file descriptor when it is implemented with the
1703# getentropy() function, the getrandom() function or the getrandom() syscall
1704OS_URANDOM_DONT_USE_FD = (
1705 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1706 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1707 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001708
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001709@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1710 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001711@unittest.skipIf(sys.platform == "vxworks",
1712 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001713class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001714 @unittest.skipUnless(resource, "test requires the resource module")
1715 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001716 # Check urandom() failing when it is not able to open /dev/random.
1717 # We spawn a new process to make the test more robust (if getrlimit()
1718 # failed to restore the file descriptor limit after this, the whole
1719 # test suite would crash; this actually happened on the OS X Tiger
1720 # buildbot).
1721 code = """if 1:
1722 import errno
1723 import os
1724 import resource
1725
1726 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1727 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1728 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001729 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001730 except OSError as e:
1731 assert e.errno == errno.EMFILE, e.errno
1732 else:
1733 raise AssertionError("OSError not raised")
1734 """
1735 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001736
Antoine Pitroue472aea2014-04-26 14:33:03 +02001737 def test_urandom_fd_closed(self):
1738 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1739 # closed.
1740 code = """if 1:
1741 import os
1742 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001743 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001744 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001745 with test.support.SuppressCrashReport():
1746 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001747 sys.stdout.buffer.write(os.urandom(4))
1748 """
1749 rc, out, err = assert_python_ok('-Sc', code)
1750
1751 def test_urandom_fd_reopened(self):
1752 # Issue #21207: urandom() should detect its fd to /dev/urandom
1753 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001754 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1755 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001756
Antoine Pitroue472aea2014-04-26 14:33:03 +02001757 code = """if 1:
1758 import os
1759 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001760 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001761 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001762 with test.support.SuppressCrashReport():
1763 for fd in range(3, 256):
1764 try:
1765 os.close(fd)
1766 except OSError:
1767 pass
1768 else:
1769 # Found the urandom fd (XXX hopefully)
1770 break
1771 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001772 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001773 new_fd = f.fileno()
1774 # Issue #26935: posix allows new_fd and fd to be equal but
1775 # some libc implementations have dup2 return an error in this
1776 # case.
1777 if new_fd != fd:
1778 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001779 sys.stdout.buffer.write(os.urandom(4))
1780 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001781 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001782 rc, out, err = assert_python_ok('-Sc', code)
1783 self.assertEqual(len(out), 8)
1784 self.assertNotEqual(out[0:4], out[4:8])
1785 rc, out2, err2 = assert_python_ok('-Sc', code)
1786 self.assertEqual(len(out2), 8)
1787 self.assertNotEqual(out2, out)
1788
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001789
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001790@contextlib.contextmanager
1791def _execvpe_mockup(defpath=None):
1792 """
1793 Stubs out execv and execve functions when used as context manager.
1794 Records exec calls. The mock execv and execve functions always raise an
1795 exception as they would normally never return.
1796 """
1797 # A list of tuples containing (function name, first arg, args)
1798 # of calls to execv or execve that have been made.
1799 calls = []
1800
1801 def mock_execv(name, *args):
1802 calls.append(('execv', name, args))
1803 raise RuntimeError("execv called")
1804
1805 def mock_execve(name, *args):
1806 calls.append(('execve', name, args))
1807 raise OSError(errno.ENOTDIR, "execve called")
1808
1809 try:
1810 orig_execv = os.execv
1811 orig_execve = os.execve
1812 orig_defpath = os.defpath
1813 os.execv = mock_execv
1814 os.execve = mock_execve
1815 if defpath is not None:
1816 os.defpath = defpath
1817 yield calls
1818 finally:
1819 os.execv = orig_execv
1820 os.execve = orig_execve
1821 os.defpath = orig_defpath
1822
pxinwrf2d7ac72019-05-21 18:46:37 +08001823@unittest.skipUnless(hasattr(os, 'execv'),
1824 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001825class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001826 @unittest.skipIf(USING_LINUXTHREADS,
1827 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001828 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001829 self.assertRaises(OSError, os.execvpe, 'no such app-',
1830 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001831
Steve Dowerbce26262016-11-19 19:17:26 -08001832 def test_execv_with_bad_arglist(self):
1833 self.assertRaises(ValueError, os.execv, 'notepad', ())
1834 self.assertRaises(ValueError, os.execv, 'notepad', [])
1835 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1836 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1837
Thomas Heller6790d602007-08-30 17:15:14 +00001838 def test_execvpe_with_bad_arglist(self):
1839 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001840 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1841 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001842
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001843 @unittest.skipUnless(hasattr(os, '_execvpe'),
1844 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001845 def _test_internal_execvpe(self, test_type):
1846 program_path = os.sep + 'absolutepath'
1847 if test_type is bytes:
1848 program = b'executable'
1849 fullpath = os.path.join(os.fsencode(program_path), program)
1850 native_fullpath = fullpath
1851 arguments = [b'progname', 'arg1', 'arg2']
1852 else:
1853 program = 'executable'
1854 arguments = ['progname', 'arg1', 'arg2']
1855 fullpath = os.path.join(program_path, program)
1856 if os.name != "nt":
1857 native_fullpath = os.fsencode(fullpath)
1858 else:
1859 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001860 env = {'spam': 'beans'}
1861
Victor Stinnerb745a742010-05-18 17:17:23 +00001862 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001863 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001864 self.assertRaises(RuntimeError,
1865 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001866 self.assertEqual(len(calls), 1)
1867 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1868
Victor Stinnerb745a742010-05-18 17:17:23 +00001869 # test os._execvpe() with a relative path:
1870 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001871 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001872 self.assertRaises(OSError,
1873 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001874 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001875 self.assertSequenceEqual(calls[0],
1876 ('execve', native_fullpath, (arguments, env)))
1877
1878 # test os._execvpe() with a relative path:
1879 # os.get_exec_path() reads the 'PATH' variable
1880 with _execvpe_mockup() as calls:
1881 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001882 if test_type is bytes:
1883 env_path[b'PATH'] = program_path
1884 else:
1885 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001886 self.assertRaises(OSError,
1887 os._execvpe, program, arguments, env=env_path)
1888 self.assertEqual(len(calls), 1)
1889 self.assertSequenceEqual(calls[0],
1890 ('execve', native_fullpath, (arguments, env_path)))
1891
1892 def test_internal_execvpe_str(self):
1893 self._test_internal_execvpe(str)
1894 if os.name != "nt":
1895 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001896
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001897 def test_execve_invalid_env(self):
1898 args = [sys.executable, '-c', 'pass']
1899
Ville Skyttä49b27342017-08-03 09:00:59 +03001900 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001901 newenv = os.environ.copy()
1902 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1903 with self.assertRaises(ValueError):
1904 os.execve(args[0], args, newenv)
1905
Ville Skyttä49b27342017-08-03 09:00:59 +03001906 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001907 newenv = os.environ.copy()
1908 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1909 with self.assertRaises(ValueError):
1910 os.execve(args[0], args, newenv)
1911
Ville Skyttä49b27342017-08-03 09:00:59 +03001912 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001913 newenv = os.environ.copy()
1914 newenv["FRUIT=ORANGE"] = "lemon"
1915 with self.assertRaises(ValueError):
1916 os.execve(args[0], args, newenv)
1917
Alexey Izbyshev83460312018-10-20 03:28:22 +03001918 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1919 def test_execve_with_empty_path(self):
1920 # bpo-32890: Check GetLastError() misuse
1921 try:
1922 os.execve('', ['arg'], {})
1923 except OSError as e:
1924 self.assertTrue(e.winerror is None or e.winerror != 0)
1925 else:
1926 self.fail('No OSError raised')
1927
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001928
Serhiy Storchaka43767632013-11-03 21:31:38 +02001929@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001930class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001931 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001932 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001933 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01001934 except FileNotFoundError:
1935 exists = False
1936 except OSError as exc:
1937 exists = True
1938 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08001939 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01001940 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001941 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001942
Thomas Wouters477c8d52006-05-27 19:21:47 +00001943 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001944 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001945
1946 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001947 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001948
1949 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001950 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001951
1952 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001953 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01001954
Hai Shi0c4f0f32020-06-30 21:46:31 +08001955 with open(os_helper.TESTFN, "x") as f:
1956 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001957
1958 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001959 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001960
Thomas Wouters477c8d52006-05-27 19:21:47 +00001961 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001962 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001963
Victor Stinnere77c9742016-03-25 10:28:23 +01001964
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001965class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001966 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001967 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1968 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001969 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001970 def get_single(f):
1971 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001972 if hasattr(os, f):
1973 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001974 return helper
1975 for f in singles:
1976 locals()["test_"+f] = get_single(f)
1977
Benjamin Peterson7522c742009-01-19 21:00:09 +00001978 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001979 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001980 f(os_helper.make_bad_fd(), *args)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001981 except OSError as e:
1982 self.assertEqual(e.errno, errno.EBADF)
1983 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001984 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001985 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001986
Serhiy Storchaka43767632013-11-03 21:31:38 +02001987 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001988 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001989 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001990
Serhiy Storchaka43767632013-11-03 21:31:38 +02001991 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001992 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001993 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02001994 # Make sure none of the descriptors we are about to close are
1995 # currently valid (issue 6542).
1996 for i in range(10):
1997 try: os.fstat(fd+i)
1998 except OSError:
1999 pass
2000 else:
2001 break
2002 if i < 2:
2003 raise unittest.SkipTest(
2004 "Unable to acquire a range of invalid file descriptors")
2005 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002006
Serhiy Storchaka43767632013-11-03 21:31:38 +02002007 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002008 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002009 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002010
Serhiy Storchaka43767632013-11-03 21:31:38 +02002011 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002012 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002013 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002014
Serhiy Storchaka43767632013-11-03 21:31:38 +02002015 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002016 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002017 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002018
Serhiy Storchaka43767632013-11-03 21:31:38 +02002019 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002020 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002021 self.check(os.pathconf, "PC_NAME_MAX")
2022 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002023
Serhiy Storchaka43767632013-11-03 21:31:38 +02002024 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002025 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002026 self.check(os.truncate, 0)
2027 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002028
Serhiy Storchaka43767632013-11-03 21:31:38 +02002029 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002030 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002031 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002032
Serhiy Storchaka43767632013-11-03 21:31:38 +02002033 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002034 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002035 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002036
Victor Stinner57ddf782014-01-08 15:21:28 +01002037 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2038 def test_readv(self):
2039 buf = bytearray(10)
2040 self.check(os.readv, [buf])
2041
Serhiy Storchaka43767632013-11-03 21:31:38 +02002042 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002043 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002044 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002045
Serhiy Storchaka43767632013-11-03 21:31:38 +02002046 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002047 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002048 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002049
Victor Stinner57ddf782014-01-08 15:21:28 +01002050 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2051 def test_writev(self):
2052 self.check(os.writev, [b'abc'])
2053
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002054 def test_inheritable(self):
2055 self.check(os.get_inheritable)
2056 self.check(os.set_inheritable, True)
2057
2058 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2059 'needs os.get_blocking() and os.set_blocking()')
2060 def test_blocking(self):
2061 self.check(os.get_blocking)
2062 self.check(os.set_blocking, True)
2063
Brian Curtin1b9df392010-11-24 20:24:31 +00002064
2065class LinkTests(unittest.TestCase):
2066 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002067 self.file1 = os_helper.TESTFN
2068 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002069
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002070 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002071 for file in (self.file1, self.file2):
2072 if os.path.exists(file):
2073 os.unlink(file)
2074
Brian Curtin1b9df392010-11-24 20:24:31 +00002075 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002076 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002077
xdegaye6a55d092017-11-12 17:57:04 +01002078 try:
2079 os.link(file1, file2)
2080 except PermissionError as e:
2081 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002082 with open(file1, "r") as f1, open(file2, "r") as f2:
2083 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2084
2085 def test_link(self):
2086 self._test_link(self.file1, self.file2)
2087
2088 def test_link_bytes(self):
2089 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2090 bytes(self.file2, sys.getfilesystemencoding()))
2091
Brian Curtinf498b752010-11-30 15:54:04 +00002092 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002093 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002094 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002095 except UnicodeError:
2096 raise unittest.SkipTest("Unable to encode for this platform.")
2097
Brian Curtinf498b752010-11-30 15:54:04 +00002098 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002099 self.file2 = self.file1 + "2"
2100 self._test_link(self.file1, self.file2)
2101
Serhiy Storchaka43767632013-11-03 21:31:38 +02002102@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2103class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002104 # uid_t and gid_t are 32-bit unsigned integers on Linux
2105 UID_OVERFLOW = (1 << 32)
2106 GID_OVERFLOW = (1 << 32)
2107
Serhiy Storchaka43767632013-11-03 21:31:38 +02002108 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2109 def test_setuid(self):
2110 if os.getuid() != 0:
2111 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002112 self.assertRaises(TypeError, os.setuid, 'not an int')
2113 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002114
Serhiy Storchaka43767632013-11-03 21:31:38 +02002115 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2116 def test_setgid(self):
2117 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2118 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002119 self.assertRaises(TypeError, os.setgid, 'not an int')
2120 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002121
Serhiy Storchaka43767632013-11-03 21:31:38 +02002122 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2123 def test_seteuid(self):
2124 if os.getuid() != 0:
2125 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002126 self.assertRaises(TypeError, os.setegid, 'not an int')
2127 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002128
Serhiy Storchaka43767632013-11-03 21:31:38 +02002129 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2130 def test_setegid(self):
2131 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2132 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002133 self.assertRaises(TypeError, os.setegid, 'not an int')
2134 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002135
Serhiy Storchaka43767632013-11-03 21:31:38 +02002136 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2137 def test_setreuid(self):
2138 if os.getuid() != 0:
2139 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002140 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2141 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2142 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2143 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002144
Serhiy Storchaka43767632013-11-03 21:31:38 +02002145 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2146 def test_setreuid_neg1(self):
2147 # Needs to accept -1. We run this in a subprocess to avoid
2148 # altering the test runner's process state (issue8045).
2149 subprocess.check_call([
2150 sys.executable, '-c',
2151 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002152
Serhiy Storchaka43767632013-11-03 21:31:38 +02002153 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2154 def test_setregid(self):
2155 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2156 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002157 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2158 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2159 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2160 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002161
Serhiy Storchaka43767632013-11-03 21:31:38 +02002162 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2163 def test_setregid_neg1(self):
2164 # Needs to accept -1. We run this in a subprocess to avoid
2165 # altering the test runner's process state (issue8045).
2166 subprocess.check_call([
2167 sys.executable, '-c',
2168 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002169
Serhiy Storchaka43767632013-11-03 21:31:38 +02002170@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2171class Pep383Tests(unittest.TestCase):
2172 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002173 if os_helper.TESTFN_UNENCODABLE:
2174 self.dir = os_helper.TESTFN_UNENCODABLE
2175 elif os_helper.TESTFN_NONASCII:
2176 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002177 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002178 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002179 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002180
Serhiy Storchaka43767632013-11-03 21:31:38 +02002181 bytesfn = []
2182 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002183 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002184 fn = os.fsencode(fn)
2185 except UnicodeEncodeError:
2186 return
2187 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002188 add_filename(os_helper.TESTFN_UNICODE)
2189 if os_helper.TESTFN_UNENCODABLE:
2190 add_filename(os_helper.TESTFN_UNENCODABLE)
2191 if os_helper.TESTFN_NONASCII:
2192 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002193 if not bytesfn:
2194 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002195
Serhiy Storchaka43767632013-11-03 21:31:38 +02002196 self.unicodefn = set()
2197 os.mkdir(self.dir)
2198 try:
2199 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002200 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002201 fn = os.fsdecode(fn)
2202 if fn in self.unicodefn:
2203 raise ValueError("duplicate filename")
2204 self.unicodefn.add(fn)
2205 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002206 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002207 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002208
Serhiy Storchaka43767632013-11-03 21:31:38 +02002209 def tearDown(self):
2210 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002211
Serhiy Storchaka43767632013-11-03 21:31:38 +02002212 def test_listdir(self):
2213 expected = self.unicodefn
2214 found = set(os.listdir(self.dir))
2215 self.assertEqual(found, expected)
2216 # test listdir without arguments
2217 current_directory = os.getcwd()
2218 try:
2219 os.chdir(os.sep)
2220 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2221 finally:
2222 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002223
Serhiy Storchaka43767632013-11-03 21:31:38 +02002224 def test_open(self):
2225 for fn in self.unicodefn:
2226 f = open(os.path.join(self.dir, fn), 'rb')
2227 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002228
Serhiy Storchaka43767632013-11-03 21:31:38 +02002229 @unittest.skipUnless(hasattr(os, 'statvfs'),
2230 "need os.statvfs()")
2231 def test_statvfs(self):
2232 # issue #9645
2233 for fn in self.unicodefn:
2234 # should not fail with file not found error
2235 fullname = os.path.join(self.dir, fn)
2236 os.statvfs(fullname)
2237
2238 def test_stat(self):
2239 for fn in self.unicodefn:
2240 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002241
Brian Curtineb24d742010-04-12 17:16:38 +00002242@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2243class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002244 def _kill(self, sig):
2245 # Start sys.executable as a subprocess and communicate from the
2246 # subprocess to the parent that the interpreter is ready. When it
2247 # becomes ready, send *sig* via os.kill to the subprocess and check
2248 # that the return code is equal to *sig*.
2249 import ctypes
2250 from ctypes import wintypes
2251 import msvcrt
2252
2253 # Since we can't access the contents of the process' stdout until the
2254 # process has exited, use PeekNamedPipe to see what's inside stdout
2255 # without waiting. This is done so we can tell that the interpreter
2256 # is started and running at a point where it could handle a signal.
2257 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2258 PeekNamedPipe.restype = wintypes.BOOL
2259 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2260 ctypes.POINTER(ctypes.c_char), # stdout buf
2261 wintypes.DWORD, # Buffer size
2262 ctypes.POINTER(wintypes.DWORD), # bytes read
2263 ctypes.POINTER(wintypes.DWORD), # bytes avail
2264 ctypes.POINTER(wintypes.DWORD)) # bytes left
2265 msg = "running"
2266 proc = subprocess.Popen([sys.executable, "-c",
2267 "import sys;"
2268 "sys.stdout.write('{}');"
2269 "sys.stdout.flush();"
2270 "input()".format(msg)],
2271 stdout=subprocess.PIPE,
2272 stderr=subprocess.PIPE,
2273 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002274 self.addCleanup(proc.stdout.close)
2275 self.addCleanup(proc.stderr.close)
2276 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002277
2278 count, max = 0, 100
2279 while count < max and proc.poll() is None:
2280 # Create a string buffer to store the result of stdout from the pipe
2281 buf = ctypes.create_string_buffer(len(msg))
2282 # Obtain the text currently in proc.stdout
2283 # Bytes read/avail/left are left as NULL and unused
2284 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2285 buf, ctypes.sizeof(buf), None, None, None)
2286 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2287 if buf.value:
2288 self.assertEqual(msg, buf.value.decode())
2289 break
2290 time.sleep(0.1)
2291 count += 1
2292 else:
2293 self.fail("Did not receive communication from the subprocess")
2294
Brian Curtineb24d742010-04-12 17:16:38 +00002295 os.kill(proc.pid, sig)
2296 self.assertEqual(proc.wait(), sig)
2297
2298 def test_kill_sigterm(self):
2299 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002300 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002301
2302 def test_kill_int(self):
2303 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002304 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002305
2306 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002307 tagname = "test_os_%s" % uuid.uuid1()
2308 m = mmap.mmap(-1, 1, tagname)
2309 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002310 # Run a script which has console control handling enabled.
2311 proc = subprocess.Popen([sys.executable,
2312 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002313 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002314 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2315 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002316 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002317 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002318 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002319 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002320 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002321 count += 1
2322 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002323 # Forcefully kill the process if we weren't able to signal it.
2324 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002325 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002326 os.kill(proc.pid, event)
2327 # proc.send_signal(event) could also be done here.
2328 # Allow time for the signal to be passed and the process to exit.
2329 time.sleep(0.5)
2330 if not proc.poll():
2331 # Forcefully kill the process if we weren't able to signal it.
2332 os.kill(proc.pid, signal.SIGINT)
2333 self.fail("subprocess did not stop on {}".format(name))
2334
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002335 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002336 def test_CTRL_C_EVENT(self):
2337 from ctypes import wintypes
2338 import ctypes
2339
2340 # Make a NULL value by creating a pointer with no argument.
2341 NULL = ctypes.POINTER(ctypes.c_int)()
2342 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2343 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2344 wintypes.BOOL)
2345 SetConsoleCtrlHandler.restype = wintypes.BOOL
2346
2347 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002348 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002349 # by subprocesses.
2350 SetConsoleCtrlHandler(NULL, 0)
2351
2352 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2353
2354 def test_CTRL_BREAK_EVENT(self):
2355 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2356
2357
Brian Curtind40e6f72010-07-08 21:39:08 +00002358@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002359class Win32ListdirTests(unittest.TestCase):
2360 """Test listdir on Windows."""
2361
2362 def setUp(self):
2363 self.created_paths = []
2364 for i in range(2):
2365 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002366 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002367 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002368 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002369 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002370 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002371 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2372 self.created_paths.extend([dir_name, file_name])
2373 self.created_paths.sort()
2374
2375 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002376 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002377
2378 def test_listdir_no_extended_path(self):
2379 """Test when the path is not an "extended" path."""
2380 # unicode
2381 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002382 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002383 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002384
Tim Golden781bbeb2013-10-25 20:24:06 +01002385 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002386 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002387 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002388 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002389
2390 def test_listdir_extended_path(self):
2391 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002392 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002393 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002394 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002395 self.assertEqual(
2396 sorted(os.listdir(path)),
2397 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002398
Tim Golden781bbeb2013-10-25 20:24:06 +01002399 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002400 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002401 self.assertEqual(
2402 sorted(os.listdir(path)),
2403 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002404
2405
Berker Peksage0b5b202018-08-15 13:03:41 +03002406@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2407class ReadlinkTests(unittest.TestCase):
2408 filelink = 'readlinktest'
2409 filelink_target = os.path.abspath(__file__)
2410 filelinkb = os.fsencode(filelink)
2411 filelinkb_target = os.fsencode(filelink_target)
2412
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002413 def assertPathEqual(self, left, right):
2414 left = os.path.normcase(left)
2415 right = os.path.normcase(right)
2416 if sys.platform == 'win32':
2417 # Bad practice to blindly strip the prefix as it may be required to
2418 # correctly refer to the file, but we're only comparing paths here.
2419 has_prefix = lambda p: p.startswith(
2420 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2421 if has_prefix(left):
2422 left = left[4:]
2423 if has_prefix(right):
2424 right = right[4:]
2425 self.assertEqual(left, right)
2426
Berker Peksage0b5b202018-08-15 13:03:41 +03002427 def setUp(self):
2428 self.assertTrue(os.path.exists(self.filelink_target))
2429 self.assertTrue(os.path.exists(self.filelinkb_target))
2430 self.assertFalse(os.path.exists(self.filelink))
2431 self.assertFalse(os.path.exists(self.filelinkb))
2432
2433 def test_not_symlink(self):
2434 filelink_target = FakePath(self.filelink_target)
2435 self.assertRaises(OSError, os.readlink, self.filelink_target)
2436 self.assertRaises(OSError, os.readlink, filelink_target)
2437
2438 def test_missing_link(self):
2439 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2440 self.assertRaises(FileNotFoundError, os.readlink,
2441 FakePath('missing-link'))
2442
Hai Shi0c4f0f32020-06-30 21:46:31 +08002443 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002444 def test_pathlike(self):
2445 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002446 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002447 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002448 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002449
Hai Shi0c4f0f32020-06-30 21:46:31 +08002450 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002451 def test_pathlike_bytes(self):
2452 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002453 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002454 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002455 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002456 self.assertIsInstance(path, bytes)
2457
Hai Shi0c4f0f32020-06-30 21:46:31 +08002458 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002459 def test_bytes(self):
2460 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002461 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002462 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002463 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002464 self.assertIsInstance(path, bytes)
2465
2466
Tim Golden781bbeb2013-10-25 20:24:06 +01002467@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002468@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002469class Win32SymlinkTests(unittest.TestCase):
2470 filelink = 'filelinktest'
2471 filelink_target = os.path.abspath(__file__)
2472 dirlink = 'dirlinktest'
2473 dirlink_target = os.path.dirname(filelink_target)
2474 missing_link = 'missing link'
2475
2476 def setUp(self):
2477 assert os.path.exists(self.dirlink_target)
2478 assert os.path.exists(self.filelink_target)
2479 assert not os.path.exists(self.dirlink)
2480 assert not os.path.exists(self.filelink)
2481 assert not os.path.exists(self.missing_link)
2482
2483 def tearDown(self):
2484 if os.path.exists(self.filelink):
2485 os.remove(self.filelink)
2486 if os.path.exists(self.dirlink):
2487 os.rmdir(self.dirlink)
2488 if os.path.lexists(self.missing_link):
2489 os.remove(self.missing_link)
2490
2491 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002492 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002493 self.assertTrue(os.path.exists(self.dirlink))
2494 self.assertTrue(os.path.isdir(self.dirlink))
2495 self.assertTrue(os.path.islink(self.dirlink))
2496 self.check_stat(self.dirlink, self.dirlink_target)
2497
2498 def test_file_link(self):
2499 os.symlink(self.filelink_target, self.filelink)
2500 self.assertTrue(os.path.exists(self.filelink))
2501 self.assertTrue(os.path.isfile(self.filelink))
2502 self.assertTrue(os.path.islink(self.filelink))
2503 self.check_stat(self.filelink, self.filelink_target)
2504
2505 def _create_missing_dir_link(self):
2506 'Create a "directory" link to a non-existent target'
2507 linkname = self.missing_link
2508 if os.path.lexists(linkname):
2509 os.remove(linkname)
2510 target = r'c:\\target does not exist.29r3c740'
2511 assert not os.path.exists(target)
2512 target_is_dir = True
2513 os.symlink(target, linkname, target_is_dir)
2514
2515 def test_remove_directory_link_to_missing_target(self):
2516 self._create_missing_dir_link()
2517 # For compatibility with Unix, os.remove will check the
2518 # directory status and call RemoveDirectory if the symlink
2519 # was created with target_is_dir==True.
2520 os.remove(self.missing_link)
2521
Brian Curtind40e6f72010-07-08 21:39:08 +00002522 def test_isdir_on_directory_link_to_missing_target(self):
2523 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002524 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002525
Brian Curtind40e6f72010-07-08 21:39:08 +00002526 def test_rmdir_on_directory_link_to_missing_target(self):
2527 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002528 os.rmdir(self.missing_link)
2529
2530 def check_stat(self, link, target):
2531 self.assertEqual(os.stat(link), os.stat(target))
2532 self.assertNotEqual(os.lstat(link), os.stat(link))
2533
Brian Curtind25aef52011-06-13 15:16:04 -05002534 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002535 self.assertEqual(os.stat(bytes_link), os.stat(target))
2536 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002537
2538 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002539 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002540 level2 = os.path.join(level1, "level2")
2541 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002542 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002543
2544 os.mkdir(level1)
2545 os.mkdir(level2)
2546 os.mkdir(level3)
2547
2548 file1 = os.path.abspath(os.path.join(level1, "file1"))
2549 create_file(file1)
2550
2551 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002552 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002553 os.chdir(level2)
2554 link = os.path.join(level2, "link")
2555 os.symlink(os.path.relpath(file1), "link")
2556 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002557
Victor Stinnerae39d232016-03-24 17:12:55 +01002558 # Check os.stat calls from the same dir as the link
2559 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002560
Victor Stinnerae39d232016-03-24 17:12:55 +01002561 # Check os.stat calls from a dir below the link
2562 os.chdir(level1)
2563 self.assertEqual(os.stat(file1),
2564 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002565
Victor Stinnerae39d232016-03-24 17:12:55 +01002566 # Check os.stat calls from a dir above the link
2567 os.chdir(level3)
2568 self.assertEqual(os.stat(file1),
2569 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002570 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002571 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002572
SSE43c34aad2018-02-13 00:10:35 +07002573 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2574 and os.path.exists(r'C:\ProgramData'),
2575 'Test directories not found')
2576 def test_29248(self):
2577 # os.symlink() calls CreateSymbolicLink, which creates
2578 # the reparse data buffer with the print name stored
2579 # first, so the offset is always 0. CreateSymbolicLink
2580 # stores the "PrintName" DOS path (e.g. "C:\") first,
2581 # with an offset of 0, followed by the "SubstituteName"
2582 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2583 # the other hand, seems to have been created manually
2584 # with an inverted order.
2585 target = os.readlink(r'C:\Users\All Users')
2586 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2587
Steve Dower6921e732018-03-05 14:26:08 -08002588 def test_buffer_overflow(self):
2589 # Older versions would have a buffer overflow when detecting
2590 # whether a link source was a directory. This test ensures we
2591 # no longer crash, but does not otherwise validate the behavior
2592 segment = 'X' * 27
2593 path = os.path.join(*[segment] * 10)
2594 test_cases = [
2595 # overflow with absolute src
2596 ('\\' + path, segment),
2597 # overflow dest with relative src
2598 (segment, path),
2599 # overflow when joining src
2600 (path[:180], path[:180]),
2601 ]
2602 for src, dest in test_cases:
2603 try:
2604 os.symlink(src, dest)
2605 except FileNotFoundError:
2606 pass
2607 else:
2608 try:
2609 os.remove(dest)
2610 except OSError:
2611 pass
2612 # Also test with bytes, since that is a separate code path.
2613 try:
2614 os.symlink(os.fsencode(src), os.fsencode(dest))
2615 except FileNotFoundError:
2616 pass
2617 else:
2618 try:
2619 os.remove(dest)
2620 except OSError:
2621 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002622
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002623 def test_appexeclink(self):
2624 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002625 if not os.path.isdir(root):
2626 self.skipTest("test requires a WindowsApps directory")
2627
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002628 aliases = [os.path.join(root, a)
2629 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2630
2631 for alias in aliases:
2632 if support.verbose:
2633 print()
2634 print("Testing with", alias)
2635 st = os.lstat(alias)
2636 self.assertEqual(st, os.stat(alias))
2637 self.assertFalse(stat.S_ISLNK(st.st_mode))
2638 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2639 # testing the first one we see is sufficient
2640 break
2641 else:
2642 self.skipTest("test requires an app execution alias")
2643
Tim Golden0321cf22014-05-05 19:46:17 +01002644@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2645class Win32JunctionTests(unittest.TestCase):
2646 junction = 'junctiontest'
2647 junction_target = os.path.dirname(os.path.abspath(__file__))
2648
2649 def setUp(self):
2650 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002651 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002652
2653 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002654 if os.path.lexists(self.junction):
2655 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002656
2657 def test_create_junction(self):
2658 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002659 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002660 self.assertTrue(os.path.exists(self.junction))
2661 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002662 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2663 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002664
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002665 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002666 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002667 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2668 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002669
2670 def test_unlink_removes_junction(self):
2671 _winapi.CreateJunction(self.junction_target, self.junction)
2672 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002673 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002674
2675 os.unlink(self.junction)
2676 self.assertFalse(os.path.exists(self.junction))
2677
Mark Becwarb82bfac2019-02-02 16:08:23 -05002678@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2679class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002680 def test_getfinalpathname_handles(self):
Hai Shid94af3f2020-08-08 17:32:41 +08002681 nt = import_helper.import_module('nt')
2682 ctypes = import_helper.import_module('ctypes')
Berker Peksag6ef726a2019-04-22 18:46:28 +03002683 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002684
2685 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2686 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2687
2688 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2689 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2690 ctypes.wintypes.LPDWORD)
2691
2692 # This is a pseudo-handle that doesn't need to be closed
2693 hproc = kernel.GetCurrentProcess()
2694
2695 handle_count = ctypes.wintypes.DWORD()
2696 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2697 self.assertEqual(1, ok)
2698
2699 before_count = handle_count.value
2700
2701 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002702 filenames = [
2703 r'\\?\C:',
2704 r'\\?\NUL',
2705 r'\\?\CONIN',
2706 __file__,
2707 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002708
Berker Peksag6ef726a2019-04-22 18:46:28 +03002709 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002710 for name in filenames:
2711 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002712 nt._getfinalpathname(name)
2713 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002714 # Failure is expected
2715 pass
2716 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002717 os.stat(name)
2718 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002719 pass
2720
2721 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2722 self.assertEqual(1, ok)
2723
2724 handle_delta = handle_count.value - before_count
2725
2726 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002727
Hai Shi0c4f0f32020-06-30 21:46:31 +08002728@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002729class NonLocalSymlinkTests(unittest.TestCase):
2730
2731 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002732 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002733 Create this structure:
2734
2735 base
2736 \___ some_dir
2737 """
2738 os.makedirs('base/some_dir')
2739
2740 def tearDown(self):
2741 shutil.rmtree('base')
2742
2743 def test_directory_link_nonlocal(self):
2744 """
2745 The symlink target should resolve relative to the link, not relative
2746 to the current directory.
2747
2748 Then, link base/some_link -> base/some_dir and ensure that some_link
2749 is resolved as a directory.
2750
2751 In issue13772, it was discovered that directory detection failed if
2752 the symlink target was not specified relative to the current
2753 directory, which was a defect in the implementation.
2754 """
2755 src = os.path.join('base', 'some_link')
2756 os.symlink('some_dir', src)
2757 assert os.path.isdir(src)
2758
2759
Victor Stinnere8d51452010-08-19 01:05:19 +00002760class FSEncodingTests(unittest.TestCase):
2761 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002762 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2763 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002764
Victor Stinnere8d51452010-08-19 01:05:19 +00002765 def test_identity(self):
2766 # assert fsdecode(fsencode(x)) == x
2767 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2768 try:
2769 bytesfn = os.fsencode(fn)
2770 except UnicodeEncodeError:
2771 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002772 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002773
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002774
Brett Cannonefb00c02012-02-29 18:31:31 -05002775
2776class DeviceEncodingTests(unittest.TestCase):
2777
2778 def test_bad_fd(self):
2779 # Return None when an fd doesn't actually exist.
2780 self.assertIsNone(os.device_encoding(123456))
2781
Paul Monson62dfd7d2019-04-25 11:36:45 -07002782 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002783 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002784 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002785 def test_device_encoding(self):
2786 encoding = os.device_encoding(0)
2787 self.assertIsNotNone(encoding)
2788 self.assertTrue(codecs.lookup(encoding))
2789
2790
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002791class PidTests(unittest.TestCase):
2792 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2793 def test_getppid(self):
2794 p = subprocess.Popen([sys.executable, '-c',
2795 'import os; print(os.getppid())'],
2796 stdout=subprocess.PIPE)
2797 stdout, _ = p.communicate()
2798 # We are the parent of our subprocess
2799 self.assertEqual(int(stdout), os.getpid())
2800
Victor Stinner9bee32b2020-04-22 16:30:35 +02002801 def check_waitpid(self, code, exitcode, callback=None):
2802 if sys.platform == 'win32':
2803 # On Windows, os.spawnv() simply joins arguments with spaces:
2804 # arguments need to be quoted
2805 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2806 else:
2807 args = [sys.executable, '-c', code]
2808 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002809
Victor Stinner9bee32b2020-04-22 16:30:35 +02002810 if callback is not None:
2811 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002812
Victor Stinner9bee32b2020-04-22 16:30:35 +02002813 # don't use support.wait_process() to test directly os.waitpid()
2814 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002815 pid2, status = os.waitpid(pid, 0)
2816 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2817 self.assertEqual(pid2, pid)
2818
Victor Stinner9bee32b2020-04-22 16:30:35 +02002819 def test_waitpid(self):
2820 self.check_waitpid(code='pass', exitcode=0)
2821
2822 def test_waitstatus_to_exitcode(self):
2823 exitcode = 23
2824 code = f'import sys; sys.exit({exitcode})'
2825 self.check_waitpid(code, exitcode=exitcode)
2826
2827 with self.assertRaises(TypeError):
2828 os.waitstatus_to_exitcode(0.0)
2829
2830 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2831 def test_waitpid_windows(self):
2832 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2833 # with exit code larger than INT_MAX.
2834 STATUS_CONTROL_C_EXIT = 0xC000013A
2835 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2836 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2837
2838 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2839 def test_waitstatus_to_exitcode_windows(self):
2840 max_exitcode = 2 ** 32 - 1
2841 for exitcode in (0, 1, 5, max_exitcode):
2842 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2843 exitcode)
2844
2845 # invalid values
2846 with self.assertRaises(ValueError):
2847 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2848 with self.assertRaises(OverflowError):
2849 os.waitstatus_to_exitcode(-1)
2850
Victor Stinner65a796e2020-04-01 18:49:29 +02002851 # Skip the test on Windows
2852 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2853 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002854 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002855 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002856
Victor Stinner9bee32b2020-04-22 16:30:35 +02002857 def kill_process(pid):
2858 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002859
Victor Stinner9bee32b2020-04-22 16:30:35 +02002860 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002861
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002862
Victor Stinner4659ccf2016-09-14 10:57:00 +02002863class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002864 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002865 self.exitcode = 17
2866
Hai Shi0c4f0f32020-06-30 21:46:31 +08002867 filename = os_helper.TESTFN
2868 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002869
2870 if not with_env:
2871 code = 'import sys; sys.exit(%s)' % self.exitcode
2872 else:
2873 self.env = dict(os.environ)
2874 # create an unique key
2875 self.key = str(uuid.uuid4())
2876 self.env[self.key] = self.key
2877 # read the variable from os.environ to check that it exists
2878 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2879 % (self.key, self.exitcode))
2880
2881 with open(filename, "w") as fp:
2882 fp.write(code)
2883
Berker Peksag81816462016-09-15 20:19:47 +03002884 args = [sys.executable, filename]
2885 if use_bytes:
2886 args = [os.fsencode(a) for a in args]
2887 self.env = {os.fsencode(k): os.fsencode(v)
2888 for k, v in self.env.items()}
2889
2890 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002891
Berker Peksag4af23d72016-09-15 20:32:44 +03002892 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002893 def test_spawnl(self):
2894 args = self.create_args()
2895 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2896 self.assertEqual(exitcode, self.exitcode)
2897
Berker Peksag4af23d72016-09-15 20:32:44 +03002898 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002899 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002900 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002901 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2902 self.assertEqual(exitcode, self.exitcode)
2903
Berker Peksag4af23d72016-09-15 20:32:44 +03002904 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002905 def test_spawnlp(self):
2906 args = self.create_args()
2907 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2908 self.assertEqual(exitcode, self.exitcode)
2909
Berker Peksag4af23d72016-09-15 20:32:44 +03002910 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002911 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002912 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002913 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2914 self.assertEqual(exitcode, self.exitcode)
2915
Berker Peksag4af23d72016-09-15 20:32:44 +03002916 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002917 def test_spawnv(self):
2918 args = self.create_args()
2919 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2920 self.assertEqual(exitcode, self.exitcode)
2921
Victor Stinner9bee32b2020-04-22 16:30:35 +02002922 # Test for PyUnicode_FSConverter()
2923 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
2924 self.assertEqual(exitcode, self.exitcode)
2925
Berker Peksag4af23d72016-09-15 20:32:44 +03002926 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002927 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002928 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002929 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2930 self.assertEqual(exitcode, self.exitcode)
2931
Berker Peksag4af23d72016-09-15 20:32:44 +03002932 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002933 def test_spawnvp(self):
2934 args = self.create_args()
2935 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2936 self.assertEqual(exitcode, self.exitcode)
2937
Berker Peksag4af23d72016-09-15 20:32:44 +03002938 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002939 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002940 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002941 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2942 self.assertEqual(exitcode, self.exitcode)
2943
Berker Peksag4af23d72016-09-15 20:32:44 +03002944 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002945 def test_nowait(self):
2946 args = self.create_args()
2947 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02002948 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002949
Berker Peksag4af23d72016-09-15 20:32:44 +03002950 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002951 def test_spawnve_bytes(self):
2952 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2953 args = self.create_args(with_env=True, use_bytes=True)
2954 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2955 self.assertEqual(exitcode, self.exitcode)
2956
Steve Dower859fd7b2016-11-19 18:53:19 -08002957 @requires_os_func('spawnl')
2958 def test_spawnl_noargs(self):
2959 args = self.create_args()
2960 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002961 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002962
2963 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002964 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002965 args = self.create_args()
2966 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002967 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002968
2969 @requires_os_func('spawnv')
2970 def test_spawnv_noargs(self):
2971 args = self.create_args()
2972 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2973 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002974 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2975 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002976
2977 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002978 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002979 args = self.create_args()
2980 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2981 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002982 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2983 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002984
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002985 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002986 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002987
Ville Skyttä49b27342017-08-03 09:00:59 +03002988 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002989 newenv = os.environ.copy()
2990 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2991 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002992 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002993 except ValueError:
2994 pass
2995 else:
2996 self.assertEqual(exitcode, 127)
2997
Ville Skyttä49b27342017-08-03 09:00:59 +03002998 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002999 newenv = os.environ.copy()
3000 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3001 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003002 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003003 except ValueError:
3004 pass
3005 else:
3006 self.assertEqual(exitcode, 127)
3007
Ville Skyttä49b27342017-08-03 09:00:59 +03003008 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003009 newenv = os.environ.copy()
3010 newenv["FRUIT=ORANGE"] = "lemon"
3011 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003012 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003013 except ValueError:
3014 pass
3015 else:
3016 self.assertEqual(exitcode, 127)
3017
Ville Skyttä49b27342017-08-03 09:00:59 +03003018 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003019 filename = os_helper.TESTFN
3020 self.addCleanup(os_helper.unlink, filename)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003021 with open(filename, "w") as fp:
3022 fp.write('import sys, os\n'
3023 'if os.getenv("FRUIT") != "orange=lemon":\n'
3024 ' raise AssertionError')
3025 args = [sys.executable, filename]
3026 newenv = os.environ.copy()
3027 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003028 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003029 self.assertEqual(exitcode, 0)
3030
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003031 @requires_os_func('spawnve')
3032 def test_spawnve_invalid_env(self):
3033 self._test_invalid_env(os.spawnve)
3034
3035 @requires_os_func('spawnvpe')
3036 def test_spawnvpe_invalid_env(self):
3037 self._test_invalid_env(os.spawnvpe)
3038
Serhiy Storchaka77703942017-06-25 07:33:01 +03003039
Brian Curtin0151b8e2010-09-24 13:43:43 +00003040# The introduction of this TestCase caused at least two different errors on
3041# *nix buildbots. Temporarily skip this to let the buildbots move along.
3042@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003043@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3044class LoginTests(unittest.TestCase):
3045 def test_getlogin(self):
3046 user_name = os.getlogin()
3047 self.assertNotEqual(len(user_name), 0)
3048
3049
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003050@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3051 "needs os.getpriority and os.setpriority")
3052class ProgramPriorityTests(unittest.TestCase):
3053 """Tests for os.getpriority() and os.setpriority()."""
3054
3055 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003056
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003057 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3058 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3059 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003060 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3061 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003062 raise unittest.SkipTest("unable to reliably test setpriority "
3063 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003064 else:
3065 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003066 finally:
3067 try:
3068 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3069 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003070 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003071 raise
3072
3073
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003075
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003076 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003077
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003078 def __init__(self, conn):
3079 asynchat.async_chat.__init__(self, conn)
3080 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003081 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003082 self.closed = False
3083 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003084
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 def handle_read(self):
3086 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003087 if self.accumulate:
3088 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003089
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003090 def get_data(self):
3091 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003092
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003094 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003095 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003096
3097 def handle_error(self):
3098 raise
3099
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003100 def __init__(self, address):
3101 threading.Thread.__init__(self)
3102 asyncore.dispatcher.__init__(self)
3103 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3104 self.bind(address)
3105 self.listen(5)
3106 self.host, self.port = self.socket.getsockname()[:2]
3107 self.handler_instance = None
3108 self._active = False
3109 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003110
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003111 # --- public API
3112
3113 @property
3114 def running(self):
3115 return self._active
3116
3117 def start(self):
3118 assert not self.running
3119 self.__flag = threading.Event()
3120 threading.Thread.start(self)
3121 self.__flag.wait()
3122
3123 def stop(self):
3124 assert self.running
3125 self._active = False
3126 self.join()
3127
3128 def wait(self):
3129 # wait for handler connection to be closed, then stop the server
3130 while not getattr(self.handler_instance, "closed", False):
3131 time.sleep(0.001)
3132 self.stop()
3133
3134 # --- internals
3135
3136 def run(self):
3137 self._active = True
3138 self.__flag.set()
3139 while self._active and asyncore.socket_map:
3140 self._active_lock.acquire()
3141 asyncore.loop(timeout=0.001, count=1)
3142 self._active_lock.release()
3143 asyncore.close_all()
3144
3145 def handle_accept(self):
3146 conn, addr = self.accept()
3147 self.handler_instance = self.Handler(conn)
3148
3149 def handle_connect(self):
3150 self.close()
3151 handle_read = handle_connect
3152
3153 def writable(self):
3154 return 0
3155
3156 def handle_error(self):
3157 raise
3158
3159
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003160@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3161class TestSendfile(unittest.TestCase):
3162
Victor Stinner8c663fd2017-11-08 14:44:44 -08003163 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003164 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003165 not sys.platform.startswith("solaris") and \
3166 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003167 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3168 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003169 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3170 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003171
3172 @classmethod
3173 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003174 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003175 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003176
3177 @classmethod
3178 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003179 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003180 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003181
3182 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003183 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003184 self.server.start()
3185 self.client = socket.socket()
3186 self.client.connect((self.server.host, self.server.port))
3187 self.client.settimeout(1)
3188 # synchronize by waiting for "220 ready" response
3189 self.client.recv(1024)
3190 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003191 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003192 self.fileno = self.file.fileno()
3193
3194 def tearDown(self):
3195 self.file.close()
3196 self.client.close()
3197 if self.server.running:
3198 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003199 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003200
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003201 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003202 """A higher level wrapper representing how an application is
3203 supposed to use sendfile().
3204 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003205 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003206 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003207 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003208 except OSError as err:
3209 if err.errno == errno.ECONNRESET:
3210 # disconnected
3211 raise
3212 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3213 # we have to retry send data
3214 continue
3215 else:
3216 raise
3217
3218 def test_send_whole_file(self):
3219 # normal send
3220 total_sent = 0
3221 offset = 0
3222 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003223 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003224 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3225 if sent == 0:
3226 break
3227 offset += sent
3228 total_sent += sent
3229 self.assertTrue(sent <= nbytes)
3230 self.assertEqual(offset, total_sent)
3231
3232 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003233 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003234 self.client.close()
3235 self.server.wait()
3236 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003237 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003238 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003239
3240 def test_send_at_certain_offset(self):
3241 # start sending a file at a certain offset
3242 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003243 offset = len(self.DATA) // 2
3244 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003245 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003246 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003247 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3248 if sent == 0:
3249 break
3250 offset += sent
3251 total_sent += sent
3252 self.assertTrue(sent <= nbytes)
3253
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003254 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003255 self.client.close()
3256 self.server.wait()
3257 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003258 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003259 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003260 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003261 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003262
3263 def test_offset_overflow(self):
3264 # specify an offset > file size
3265 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003266 try:
3267 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3268 except OSError as e:
3269 # Solaris can raise EINVAL if offset >= file length, ignore.
3270 if e.errno != errno.EINVAL:
3271 raise
3272 else:
3273 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003274 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003275 self.client.close()
3276 self.server.wait()
3277 data = self.server.handler_instance.get_data()
3278 self.assertEqual(data, b'')
3279
3280 def test_invalid_offset(self):
3281 with self.assertRaises(OSError) as cm:
3282 os.sendfile(self.sockno, self.fileno, -1, 4096)
3283 self.assertEqual(cm.exception.errno, errno.EINVAL)
3284
Martin Panterbf19d162015-09-09 01:01:13 +00003285 def test_keywords(self):
3286 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003287 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3288 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003289 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003290 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3291 offset=0, count=4096,
3292 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003293
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003294 # --- headers / trailers tests
3295
Serhiy Storchaka43767632013-11-03 21:31:38 +02003296 @requires_headers_trailers
3297 def test_headers(self):
3298 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003299 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003300 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003301 headers=[b"x" * 512, b"y" * 256])
3302 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003303 total_sent += sent
3304 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003305 while total_sent < len(expected_data):
3306 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003307 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3308 offset, nbytes)
3309 if sent == 0:
3310 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003311 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003312 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003313 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003314
Serhiy Storchaka43767632013-11-03 21:31:38 +02003315 self.assertEqual(total_sent, len(expected_data))
3316 self.client.close()
3317 self.server.wait()
3318 data = self.server.handler_instance.get_data()
3319 self.assertEqual(hash(data), hash(expected_data))
3320
3321 @requires_headers_trailers
3322 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003323 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003324 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003325
Hai Shi0c4f0f32020-06-30 21:46:31 +08003326 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003327 create_file(TESTFN2, file_data)
3328
3329 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003330 os.sendfile(self.sockno, f.fileno(), 0, 5,
3331 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003332 self.client.close()
3333 self.server.wait()
3334 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003335 self.assertEqual(data, b"abcde123456789")
3336
3337 @requires_headers_trailers
3338 @requires_32b
3339 def test_headers_overflow_32bits(self):
3340 self.server.handler_instance.accumulate = False
3341 with self.assertRaises(OSError) as cm:
3342 os.sendfile(self.sockno, self.fileno, 0, 0,
3343 headers=[b"x" * 2**16] * 2**15)
3344 self.assertEqual(cm.exception.errno, errno.EINVAL)
3345
3346 @requires_headers_trailers
3347 @requires_32b
3348 def test_trailers_overflow_32bits(self):
3349 self.server.handler_instance.accumulate = False
3350 with self.assertRaises(OSError) as cm:
3351 os.sendfile(self.sockno, self.fileno, 0, 0,
3352 trailers=[b"x" * 2**16] * 2**15)
3353 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003354
Serhiy Storchaka43767632013-11-03 21:31:38 +02003355 @requires_headers_trailers
3356 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3357 'test needs os.SF_NODISKIO')
3358 def test_flags(self):
3359 try:
3360 os.sendfile(self.sockno, self.fileno, 0, 4096,
3361 flags=os.SF_NODISKIO)
3362 except OSError as err:
3363 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3364 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003365
3366
Larry Hastings9cf065c2012-06-22 16:30:09 -07003367def supports_extended_attributes():
3368 if not hasattr(os, "setxattr"):
3369 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003370
Larry Hastings9cf065c2012-06-22 16:30:09 -07003371 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003372 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003373 try:
3374 os.setxattr(fp.fileno(), b"user.test", b"")
3375 except OSError:
3376 return False
3377 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003378 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003379
3380 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003381
3382
3383@unittest.skipUnless(supports_extended_attributes(),
3384 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003385# Kernels < 2.6.39 don't respect setxattr flags.
3386@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003387class ExtendedAttributeTests(unittest.TestCase):
3388
Larry Hastings9cf065c2012-06-22 16:30:09 -07003389 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003390 fn = os_helper.TESTFN
3391 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003392 create_file(fn)
3393
Benjamin Peterson799bd802011-08-31 22:15:17 -04003394 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003395 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003396 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003397
Victor Stinnerf12e5062011-10-16 22:12:03 +02003398 init_xattr = listxattr(fn)
3399 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003400
Larry Hastings9cf065c2012-06-22 16:30:09 -07003401 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003402 xattr = set(init_xattr)
3403 xattr.add("user.test")
3404 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003405 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3406 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3407 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003408
Benjamin Peterson799bd802011-08-31 22:15:17 -04003409 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003410 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003411 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003412
Benjamin Peterson799bd802011-08-31 22:15:17 -04003413 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003414 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003415 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003416
Larry Hastings9cf065c2012-06-22 16:30:09 -07003417 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003418 xattr.add("user.test2")
3419 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003420 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003421
Benjamin Peterson799bd802011-08-31 22:15:17 -04003422 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003423 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003424 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003425
Victor Stinnerf12e5062011-10-16 22:12:03 +02003426 xattr.remove("user.test")
3427 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003428 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3429 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3430 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3431 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003432 many = sorted("user.test{}".format(i) for i in range(100))
3433 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003434 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003435 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003436
Larry Hastings9cf065c2012-06-22 16:30:09 -07003437 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003438 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003439 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003440
3441 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003442 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003443
3444 def test_simple(self):
3445 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3446 os.listxattr)
3447
3448 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003449 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3450 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003451
3452 def test_fds(self):
3453 def getxattr(path, *args):
3454 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003455 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003456 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003457 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003458 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003459 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003460 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003461 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003462 def listxattr(path, *args):
3463 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003464 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003465 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3466
3467
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003468@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3469class TermsizeTests(unittest.TestCase):
3470 def test_does_not_crash(self):
3471 """Check if get_terminal_size() returns a meaningful value.
3472
3473 There's no easy portable way to actually check the size of the
3474 terminal, so let's check if it returns something sensible instead.
3475 """
3476 try:
3477 size = os.get_terminal_size()
3478 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003479 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003480 # Under win32 a generic OSError can be thrown if the
3481 # handle cannot be retrieved
3482 self.skipTest("failed to query terminal size")
3483 raise
3484
Antoine Pitroucfade362012-02-08 23:48:59 +01003485 self.assertGreaterEqual(size.columns, 0)
3486 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003487
3488 def test_stty_match(self):
3489 """Check if stty returns the same results
3490
3491 stty actually tests stdin, so get_terminal_size is invoked on
3492 stdin explicitly. If stty succeeded, then get_terminal_size()
3493 should work too.
3494 """
3495 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003496 size = (
3497 subprocess.check_output(
3498 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3499 ).split()
3500 )
xdegaye6a55d092017-11-12 17:57:04 +01003501 except (FileNotFoundError, subprocess.CalledProcessError,
3502 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003503 self.skipTest("stty invocation failed")
3504 expected = (int(size[1]), int(size[0])) # reversed order
3505
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003506 try:
3507 actual = os.get_terminal_size(sys.__stdin__.fileno())
3508 except OSError as e:
3509 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3510 # Under win32 a generic OSError can be thrown if the
3511 # handle cannot be retrieved
3512 self.skipTest("failed to query terminal size")
3513 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003514 self.assertEqual(expected, actual)
3515
3516
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003517@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003518@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003519class MemfdCreateTests(unittest.TestCase):
3520 def test_memfd_create(self):
3521 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3522 self.assertNotEqual(fd, -1)
3523 self.addCleanup(os.close, fd)
3524 self.assertFalse(os.get_inheritable(fd))
3525 with open(fd, "wb", closefd=False) as f:
3526 f.write(b'memfd_create')
3527 self.assertEqual(f.tell(), 12)
3528
3529 fd2 = os.memfd_create("Hi")
3530 self.addCleanup(os.close, fd2)
3531 self.assertFalse(os.get_inheritable(fd2))
3532
3533
Christian Heimescd9fed62020-11-13 19:48:52 +01003534@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3535@support.requires_linux_version(2, 6, 30)
3536class EventfdTests(unittest.TestCase):
3537 def test_eventfd_initval(self):
3538 def pack(value):
3539 """Pack as native uint64_t
3540 """
3541 return struct.pack("@Q", value)
3542 size = 8 # read/write 8 bytes
3543 initval = 42
3544 fd = os.eventfd(initval)
3545 self.assertNotEqual(fd, -1)
3546 self.addCleanup(os.close, fd)
3547 self.assertFalse(os.get_inheritable(fd))
3548
3549 # test with raw read/write
3550 res = os.read(fd, size)
3551 self.assertEqual(res, pack(initval))
3552
3553 os.write(fd, pack(23))
3554 res = os.read(fd, size)
3555 self.assertEqual(res, pack(23))
3556
3557 os.write(fd, pack(40))
3558 os.write(fd, pack(2))
3559 res = os.read(fd, size)
3560 self.assertEqual(res, pack(42))
3561
3562 # test with eventfd_read/eventfd_write
3563 os.eventfd_write(fd, 20)
3564 os.eventfd_write(fd, 3)
3565 res = os.eventfd_read(fd)
3566 self.assertEqual(res, 23)
3567
3568 def test_eventfd_semaphore(self):
3569 initval = 2
3570 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3571 fd = os.eventfd(initval, flags)
3572 self.assertNotEqual(fd, -1)
3573 self.addCleanup(os.close, fd)
3574
3575 # semaphore starts has initval 2, two reads return '1'
3576 res = os.eventfd_read(fd)
3577 self.assertEqual(res, 1)
3578 res = os.eventfd_read(fd)
3579 self.assertEqual(res, 1)
3580 # third read would block
3581 with self.assertRaises(BlockingIOError):
3582 os.eventfd_read(fd)
3583 with self.assertRaises(BlockingIOError):
3584 os.read(fd, 8)
3585
3586 # increase semaphore counter, read one
3587 os.eventfd_write(fd, 1)
3588 res = os.eventfd_read(fd)
3589 self.assertEqual(res, 1)
3590 # next read would block, too
3591 with self.assertRaises(BlockingIOError):
3592 os.eventfd_read(fd)
3593
3594 def test_eventfd_select(self):
3595 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3596 fd = os.eventfd(0, flags)
3597 self.assertNotEqual(fd, -1)
3598 self.addCleanup(os.close, fd)
3599
3600 # counter is zero, only writeable
3601 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3602 self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3603
3604 # counter is non-zero, read and writeable
3605 os.eventfd_write(fd, 23)
3606 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3607 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3608 self.assertEqual(os.eventfd_read(fd), 23)
3609
3610 # counter at max, only readable
3611 os.eventfd_write(fd, (2**64) - 2)
3612 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3613 self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3614 os.eventfd_read(fd)
3615
3616
Victor Stinner292c8352012-10-30 02:17:38 +01003617class OSErrorTests(unittest.TestCase):
3618 def setUp(self):
3619 class Str(str):
3620 pass
3621
Victor Stinnerafe17062012-10-31 22:47:43 +01003622 self.bytes_filenames = []
3623 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003624 if os_helper.TESTFN_UNENCODABLE is not None:
3625 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003626 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003627 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003628 self.unicode_filenames.append(decoded)
3629 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003630 if os_helper.TESTFN_UNDECODABLE is not None:
3631 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003632 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003633 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003634 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003635 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003636 self.bytes_filenames.append(memoryview(encoded))
3637
3638 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003639
3640 def test_oserror_filename(self):
3641 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003642 (self.filenames, os.chdir,),
3643 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003644 (self.filenames, os.lstat,),
3645 (self.filenames, os.open, os.O_RDONLY),
3646 (self.filenames, os.rmdir,),
3647 (self.filenames, os.stat,),
3648 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003649 ]
3650 if sys.platform == "win32":
3651 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003652 (self.bytes_filenames, os.rename, b"dst"),
3653 (self.bytes_filenames, os.replace, b"dst"),
3654 (self.unicode_filenames, os.rename, "dst"),
3655 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003656 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003657 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003658 else:
3659 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003660 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003661 (self.filenames, os.rename, "dst"),
3662 (self.filenames, os.replace, "dst"),
3663 ))
3664 if hasattr(os, "chown"):
3665 funcs.append((self.filenames, os.chown, 0, 0))
3666 if hasattr(os, "lchown"):
3667 funcs.append((self.filenames, os.lchown, 0, 0))
3668 if hasattr(os, "truncate"):
3669 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003670 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003671 funcs.append((self.filenames, os.chflags, 0))
3672 if hasattr(os, "lchflags"):
3673 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003674 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003675 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003676 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003677 if sys.platform == "win32":
3678 funcs.append((self.bytes_filenames, os.link, b"dst"))
3679 funcs.append((self.unicode_filenames, os.link, "dst"))
3680 else:
3681 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003682 if hasattr(os, "listxattr"):
3683 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003684 (self.filenames, os.listxattr,),
3685 (self.filenames, os.getxattr, "user.test"),
3686 (self.filenames, os.setxattr, "user.test", b'user'),
3687 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003688 ))
3689 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003690 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003691 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003692 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003693
Steve Dowercc16be82016-09-08 10:35:16 -07003694
Victor Stinnerafe17062012-10-31 22:47:43 +01003695 for filenames, func, *func_args in funcs:
3696 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003697 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003698 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003699 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003700 else:
3701 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3702 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003703 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003704 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003705 except UnicodeDecodeError:
3706 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003707 else:
3708 self.fail("No exception thrown by {}".format(func))
3709
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003710class CPUCountTests(unittest.TestCase):
3711 def test_cpu_count(self):
3712 cpus = os.cpu_count()
3713 if cpus is not None:
3714 self.assertIsInstance(cpus, int)
3715 self.assertGreater(cpus, 0)
3716 else:
3717 self.skipTest("Could not determine the number of CPUs")
3718
Victor Stinnerdaf45552013-08-28 00:53:59 +02003719
3720class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003721 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003722 fd = os.open(__file__, os.O_RDONLY)
3723 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003724 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003725
Victor Stinnerdaf45552013-08-28 00:53:59 +02003726 os.set_inheritable(fd, True)
3727 self.assertEqual(os.get_inheritable(fd), True)
3728
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003729 @unittest.skipIf(fcntl is None, "need fcntl")
3730 def test_get_inheritable_cloexec(self):
3731 fd = os.open(__file__, os.O_RDONLY)
3732 self.addCleanup(os.close, fd)
3733 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003734
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003735 # clear FD_CLOEXEC flag
3736 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3737 flags &= ~fcntl.FD_CLOEXEC
3738 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003739
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003740 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003741
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003742 @unittest.skipIf(fcntl is None, "need fcntl")
3743 def test_set_inheritable_cloexec(self):
3744 fd = os.open(__file__, os.O_RDONLY)
3745 self.addCleanup(os.close, fd)
3746 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3747 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003748
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003749 os.set_inheritable(fd, True)
3750 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3751 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003752
Victor Stinnerdaf45552013-08-28 00:53:59 +02003753 def test_open(self):
3754 fd = os.open(__file__, os.O_RDONLY)
3755 self.addCleanup(os.close, fd)
3756 self.assertEqual(os.get_inheritable(fd), False)
3757
3758 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3759 def test_pipe(self):
3760 rfd, wfd = os.pipe()
3761 self.addCleanup(os.close, rfd)
3762 self.addCleanup(os.close, wfd)
3763 self.assertEqual(os.get_inheritable(rfd), False)
3764 self.assertEqual(os.get_inheritable(wfd), False)
3765
3766 def test_dup(self):
3767 fd1 = os.open(__file__, os.O_RDONLY)
3768 self.addCleanup(os.close, fd1)
3769
3770 fd2 = os.dup(fd1)
3771 self.addCleanup(os.close, fd2)
3772 self.assertEqual(os.get_inheritable(fd2), False)
3773
Zackery Spytz5be66602019-08-23 12:38:41 -06003774 def test_dup_standard_stream(self):
3775 fd = os.dup(1)
3776 self.addCleanup(os.close, fd)
3777 self.assertGreater(fd, 0)
3778
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003779 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3780 def test_dup_nul(self):
3781 # os.dup() was creating inheritable fds for character files.
3782 fd1 = os.open('NUL', os.O_RDONLY)
3783 self.addCleanup(os.close, fd1)
3784 fd2 = os.dup(fd1)
3785 self.addCleanup(os.close, fd2)
3786 self.assertFalse(os.get_inheritable(fd2))
3787
Victor Stinnerdaf45552013-08-28 00:53:59 +02003788 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3789 def test_dup2(self):
3790 fd = os.open(__file__, os.O_RDONLY)
3791 self.addCleanup(os.close, fd)
3792
3793 # inheritable by default
3794 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003795 self.addCleanup(os.close, fd2)
3796 self.assertEqual(os.dup2(fd, fd2), fd2)
3797 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003798
3799 # force non-inheritable
3800 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003801 self.addCleanup(os.close, fd3)
3802 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3803 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003804
3805 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3806 def test_openpty(self):
3807 master_fd, slave_fd = os.openpty()
3808 self.addCleanup(os.close, master_fd)
3809 self.addCleanup(os.close, slave_fd)
3810 self.assertEqual(os.get_inheritable(master_fd), False)
3811 self.assertEqual(os.get_inheritable(slave_fd), False)
3812
3813
Brett Cannon3f9183b2016-08-26 14:44:48 -07003814class PathTConverterTests(unittest.TestCase):
3815 # tuples of (function name, allows fd arguments, additional arguments to
3816 # function, cleanup function)
3817 functions = [
3818 ('stat', True, (), None),
3819 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003820 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003821 ('chflags', False, (0,), None),
3822 ('lchflags', False, (0,), None),
3823 ('open', False, (0,), getattr(os, 'close', None)),
3824 ]
3825
3826 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003827 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003828 if os.name == 'nt':
3829 bytes_fspath = bytes_filename = None
3830 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003831 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003832 bytes_fspath = FakePath(bytes_filename)
3833 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003834 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003835 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003836
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003837 int_fspath = FakePath(fd)
3838 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003839
3840 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3841 with self.subTest(name=name):
3842 try:
3843 fn = getattr(os, name)
3844 except AttributeError:
3845 continue
3846
Brett Cannon8f96a302016-08-26 19:30:11 -07003847 for path in (str_filename, bytes_filename, str_fspath,
3848 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003849 if path is None:
3850 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003851 with self.subTest(name=name, path=path):
3852 result = fn(path, *extra_args)
3853 if cleanup_fn is not None:
3854 cleanup_fn(result)
3855
3856 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003857 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003858 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003859
3860 if allow_fd:
3861 result = fn(fd, *extra_args) # should not fail
3862 if cleanup_fn is not None:
3863 cleanup_fn(result)
3864 else:
3865 with self.assertRaisesRegex(
3866 TypeError,
3867 'os.PathLike'):
3868 fn(fd, *extra_args)
3869
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003870 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003871 msg = r'__fspath__\(\) to return str or bytes, not %s'
3872 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003873 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003874 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003875 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003876 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003877 os.stat(FakePath(object()))
3878
Brett Cannon3f9183b2016-08-26 14:44:48 -07003879
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003880@unittest.skipUnless(hasattr(os, 'get_blocking'),
3881 'needs os.get_blocking() and os.set_blocking()')
3882class BlockingTests(unittest.TestCase):
3883 def test_blocking(self):
3884 fd = os.open(__file__, os.O_RDONLY)
3885 self.addCleanup(os.close, fd)
3886 self.assertEqual(os.get_blocking(fd), True)
3887
3888 os.set_blocking(fd, False)
3889 self.assertEqual(os.get_blocking(fd), False)
3890
3891 os.set_blocking(fd, True)
3892 self.assertEqual(os.get_blocking(fd), True)
3893
3894
Yury Selivanov97e2e062014-09-26 12:33:06 -04003895
3896class ExportsTests(unittest.TestCase):
3897 def test_os_all(self):
3898 self.assertIn('open', os.__all__)
3899 self.assertIn('walk', os.__all__)
3900
3901
Eddie Elizondob3966632019-11-05 07:16:14 -08003902class TestDirEntry(unittest.TestCase):
3903 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003904 self.path = os.path.realpath(os_helper.TESTFN)
3905 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08003906 os.mkdir(self.path)
3907
3908 def test_uninstantiable(self):
3909 self.assertRaises(TypeError, os.DirEntry)
3910
3911 def test_unpickable(self):
3912 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3913 entry = [entry for entry in os.scandir(self.path)].pop()
3914 self.assertIsInstance(entry, os.DirEntry)
3915 self.assertEqual(entry.name, "file.txt")
3916 import pickle
3917 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3918
3919
Victor Stinner6036e442015-03-08 01:58:04 +01003920class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003921 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003922
Victor Stinner6036e442015-03-08 01:58:04 +01003923 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003924 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003925 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003926 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003927 os.mkdir(self.path)
3928
3929 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003930 path = self.bytes_path if isinstance(name, bytes) else self.path
3931 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003932 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003933 return filename
3934
3935 def get_entries(self, names):
3936 entries = dict((entry.name, entry)
3937 for entry in os.scandir(self.path))
3938 self.assertEqual(sorted(entries.keys()), names)
3939 return entries
3940
3941 def assert_stat_equal(self, stat1, stat2, skip_fields):
3942 if skip_fields:
3943 for attr in dir(stat1):
3944 if not attr.startswith("st_"):
3945 continue
3946 if attr in ("st_dev", "st_ino", "st_nlink"):
3947 continue
3948 self.assertEqual(getattr(stat1, attr),
3949 getattr(stat2, attr),
3950 (stat1, stat2, attr))
3951 else:
3952 self.assertEqual(stat1, stat2)
3953
Eddie Elizondob3966632019-11-05 07:16:14 -08003954 def test_uninstantiable(self):
3955 scandir_iter = os.scandir(self.path)
3956 self.assertRaises(TypeError, type(scandir_iter))
3957 scandir_iter.close()
3958
3959 def test_unpickable(self):
3960 filename = self.create_file("file.txt")
3961 scandir_iter = os.scandir(self.path)
3962 import pickle
3963 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3964 scandir_iter.close()
3965
Victor Stinner6036e442015-03-08 01:58:04 +01003966 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003967 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003968 self.assertEqual(entry.name, name)
3969 self.assertEqual(entry.path, os.path.join(self.path, name))
3970 self.assertEqual(entry.inode(),
3971 os.stat(entry.path, follow_symlinks=False).st_ino)
3972
3973 entry_stat = os.stat(entry.path)
3974 self.assertEqual(entry.is_dir(),
3975 stat.S_ISDIR(entry_stat.st_mode))
3976 self.assertEqual(entry.is_file(),
3977 stat.S_ISREG(entry_stat.st_mode))
3978 self.assertEqual(entry.is_symlink(),
3979 os.path.islink(entry.path))
3980
3981 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3982 self.assertEqual(entry.is_dir(follow_symlinks=False),
3983 stat.S_ISDIR(entry_lstat.st_mode))
3984 self.assertEqual(entry.is_file(follow_symlinks=False),
3985 stat.S_ISREG(entry_lstat.st_mode))
3986
3987 self.assert_stat_equal(entry.stat(),
3988 entry_stat,
3989 os.name == 'nt' and not is_symlink)
3990 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3991 entry_lstat,
3992 os.name == 'nt')
3993
3994 def test_attributes(self):
3995 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08003996 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01003997
3998 dirname = os.path.join(self.path, "dir")
3999 os.mkdir(dirname)
4000 filename = self.create_file("file.txt")
4001 if link:
xdegaye6a55d092017-11-12 17:57:04 +01004002 try:
4003 os.link(filename, os.path.join(self.path, "link_file.txt"))
4004 except PermissionError as e:
4005 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01004006 if symlink:
4007 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4008 target_is_directory=True)
4009 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4010
4011 names = ['dir', 'file.txt']
4012 if link:
4013 names.append('link_file.txt')
4014 if symlink:
4015 names.extend(('symlink_dir', 'symlink_file.txt'))
4016 entries = self.get_entries(names)
4017
4018 entry = entries['dir']
4019 self.check_entry(entry, 'dir', True, False, False)
4020
4021 entry = entries['file.txt']
4022 self.check_entry(entry, 'file.txt', False, True, False)
4023
4024 if link:
4025 entry = entries['link_file.txt']
4026 self.check_entry(entry, 'link_file.txt', False, True, False)
4027
4028 if symlink:
4029 entry = entries['symlink_dir']
4030 self.check_entry(entry, 'symlink_dir', True, False, True)
4031
4032 entry = entries['symlink_file.txt']
4033 self.check_entry(entry, 'symlink_file.txt', False, True, True)
4034
4035 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07004036 path = self.bytes_path if isinstance(name, bytes) else self.path
4037 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01004038 self.assertEqual(len(entries), 1)
4039
4040 entry = entries[0]
4041 self.assertEqual(entry.name, name)
4042 return entry
4043
Brett Cannon96881cd2016-06-10 14:37:21 -07004044 def create_file_entry(self, name='file.txt'):
4045 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01004046 return self.get_entry(os.path.basename(filename))
4047
4048 def test_current_directory(self):
4049 filename = self.create_file()
4050 old_dir = os.getcwd()
4051 try:
4052 os.chdir(self.path)
4053
4054 # call scandir() without parameter: it must list the content
4055 # of the current directory
4056 entries = dict((entry.name, entry) for entry in os.scandir())
4057 self.assertEqual(sorted(entries.keys()),
4058 [os.path.basename(filename)])
4059 finally:
4060 os.chdir(old_dir)
4061
4062 def test_repr(self):
4063 entry = self.create_file_entry()
4064 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4065
Brett Cannon96881cd2016-06-10 14:37:21 -07004066 def test_fspath_protocol(self):
4067 entry = self.create_file_entry()
4068 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4069
4070 def test_fspath_protocol_bytes(self):
4071 bytes_filename = os.fsencode('bytesfile.txt')
4072 bytes_entry = self.create_file_entry(name=bytes_filename)
4073 fspath = os.fspath(bytes_entry)
4074 self.assertIsInstance(fspath, bytes)
4075 self.assertEqual(fspath,
4076 os.path.join(os.fsencode(self.path),bytes_filename))
4077
Victor Stinner6036e442015-03-08 01:58:04 +01004078 def test_removed_dir(self):
4079 path = os.path.join(self.path, 'dir')
4080
4081 os.mkdir(path)
4082 entry = self.get_entry('dir')
4083 os.rmdir(path)
4084
4085 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4086 if os.name == 'nt':
4087 self.assertTrue(entry.is_dir())
4088 self.assertFalse(entry.is_file())
4089 self.assertFalse(entry.is_symlink())
4090 if os.name == 'nt':
4091 self.assertRaises(FileNotFoundError, entry.inode)
4092 # don't fail
4093 entry.stat()
4094 entry.stat(follow_symlinks=False)
4095 else:
4096 self.assertGreater(entry.inode(), 0)
4097 self.assertRaises(FileNotFoundError, entry.stat)
4098 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4099
4100 def test_removed_file(self):
4101 entry = self.create_file_entry()
4102 os.unlink(entry.path)
4103
4104 self.assertFalse(entry.is_dir())
4105 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4106 if os.name == 'nt':
4107 self.assertTrue(entry.is_file())
4108 self.assertFalse(entry.is_symlink())
4109 if os.name == 'nt':
4110 self.assertRaises(FileNotFoundError, entry.inode)
4111 # don't fail
4112 entry.stat()
4113 entry.stat(follow_symlinks=False)
4114 else:
4115 self.assertGreater(entry.inode(), 0)
4116 self.assertRaises(FileNotFoundError, entry.stat)
4117 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4118
4119 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004120 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004121 return self.skipTest('cannot create symbolic link')
4122
4123 filename = self.create_file("file.txt")
4124 os.symlink(filename,
4125 os.path.join(self.path, "symlink.txt"))
4126 entries = self.get_entries(['file.txt', 'symlink.txt'])
4127 entry = entries['symlink.txt']
4128 os.unlink(filename)
4129
4130 self.assertGreater(entry.inode(), 0)
4131 self.assertFalse(entry.is_dir())
4132 self.assertFalse(entry.is_file()) # broken symlink returns False
4133 self.assertFalse(entry.is_dir(follow_symlinks=False))
4134 self.assertFalse(entry.is_file(follow_symlinks=False))
4135 self.assertTrue(entry.is_symlink())
4136 self.assertRaises(FileNotFoundError, entry.stat)
4137 # don't fail
4138 entry.stat(follow_symlinks=False)
4139
4140 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004141 self.create_file("file.txt")
4142
4143 path_bytes = os.fsencode(self.path)
4144 entries = list(os.scandir(path_bytes))
4145 self.assertEqual(len(entries), 1, entries)
4146 entry = entries[0]
4147
4148 self.assertEqual(entry.name, b'file.txt')
4149 self.assertEqual(entry.path,
4150 os.fsencode(os.path.join(self.path, 'file.txt')))
4151
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004152 def test_bytes_like(self):
4153 self.create_file("file.txt")
4154
4155 for cls in bytearray, memoryview:
4156 path_bytes = cls(os.fsencode(self.path))
4157 with self.assertWarns(DeprecationWarning):
4158 entries = list(os.scandir(path_bytes))
4159 self.assertEqual(len(entries), 1, entries)
4160 entry = entries[0]
4161
4162 self.assertEqual(entry.name, b'file.txt')
4163 self.assertEqual(entry.path,
4164 os.fsencode(os.path.join(self.path, 'file.txt')))
4165 self.assertIs(type(entry.name), bytes)
4166 self.assertIs(type(entry.path), bytes)
4167
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004168 @unittest.skipUnless(os.listdir in os.supports_fd,
4169 'fd support for listdir required for this test.')
4170 def test_fd(self):
4171 self.assertIn(os.scandir, os.supports_fd)
4172 self.create_file('file.txt')
4173 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004174 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004175 os.symlink('file.txt', os.path.join(self.path, 'link'))
4176 expected_names.append('link')
4177
4178 fd = os.open(self.path, os.O_RDONLY)
4179 try:
4180 with os.scandir(fd) as it:
4181 entries = list(it)
4182 names = [entry.name for entry in entries]
4183 self.assertEqual(sorted(names), expected_names)
4184 self.assertEqual(names, os.listdir(fd))
4185 for entry in entries:
4186 self.assertEqual(entry.path, entry.name)
4187 self.assertEqual(os.fspath(entry), entry.name)
4188 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4189 if os.stat in os.supports_dir_fd:
4190 st = os.stat(entry.name, dir_fd=fd)
4191 self.assertEqual(entry.stat(), st)
4192 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4193 self.assertEqual(entry.stat(follow_symlinks=False), st)
4194 finally:
4195 os.close(fd)
4196
Victor Stinner6036e442015-03-08 01:58:04 +01004197 def test_empty_path(self):
4198 self.assertRaises(FileNotFoundError, os.scandir, '')
4199
4200 def test_consume_iterator_twice(self):
4201 self.create_file("file.txt")
4202 iterator = os.scandir(self.path)
4203
4204 entries = list(iterator)
4205 self.assertEqual(len(entries), 1, entries)
4206
4207 # check than consuming the iterator twice doesn't raise exception
4208 entries2 = list(iterator)
4209 self.assertEqual(len(entries2), 0, entries2)
4210
4211 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004212 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004213 self.assertRaises(TypeError, os.scandir, obj)
4214
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004215 def test_close(self):
4216 self.create_file("file.txt")
4217 self.create_file("file2.txt")
4218 iterator = os.scandir(self.path)
4219 next(iterator)
4220 iterator.close()
4221 # multiple closes
4222 iterator.close()
4223 with self.check_no_resource_warning():
4224 del iterator
4225
4226 def test_context_manager(self):
4227 self.create_file("file.txt")
4228 self.create_file("file2.txt")
4229 with os.scandir(self.path) as iterator:
4230 next(iterator)
4231 with self.check_no_resource_warning():
4232 del iterator
4233
4234 def test_context_manager_close(self):
4235 self.create_file("file.txt")
4236 self.create_file("file2.txt")
4237 with os.scandir(self.path) as iterator:
4238 next(iterator)
4239 iterator.close()
4240
4241 def test_context_manager_exception(self):
4242 self.create_file("file.txt")
4243 self.create_file("file2.txt")
4244 with self.assertRaises(ZeroDivisionError):
4245 with os.scandir(self.path) as iterator:
4246 next(iterator)
4247 1/0
4248 with self.check_no_resource_warning():
4249 del iterator
4250
4251 def test_resource_warning(self):
4252 self.create_file("file.txt")
4253 self.create_file("file2.txt")
4254 iterator = os.scandir(self.path)
4255 next(iterator)
4256 with self.assertWarns(ResourceWarning):
4257 del iterator
4258 support.gc_collect()
4259 # exhausted iterator
4260 iterator = os.scandir(self.path)
4261 list(iterator)
4262 with self.check_no_resource_warning():
4263 del iterator
4264
Victor Stinner6036e442015-03-08 01:58:04 +01004265
Ethan Furmancdc08792016-06-02 15:06:09 -07004266class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004267
4268 # Abstracted so it can be overridden to test pure Python implementation
4269 # if a C version is provided.
4270 fspath = staticmethod(os.fspath)
4271
Ethan Furmancdc08792016-06-02 15:06:09 -07004272 def test_return_bytes(self):
4273 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004274 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004275
4276 def test_return_string(self):
4277 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004278 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004279
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004280 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004281 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004282 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004283
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004284 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004285 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4286 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4287
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004288 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004289 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4290 self.assertTrue(issubclass(FakePath, os.PathLike))
4291 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004292
Ethan Furmancdc08792016-06-02 15:06:09 -07004293 def test_garbage_in_exception_out(self):
4294 vapor = type('blah', (), {})
4295 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004296 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004297
4298 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004299 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004300
Brett Cannon044283a2016-07-15 10:41:49 -07004301 def test_bad_pathlike(self):
4302 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004303 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004304 # __fspath__ attribute that is not callable.
4305 c = type('foo', (), {})
4306 c.__fspath__ = 1
4307 self.assertRaises(TypeError, self.fspath, c())
4308 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004309 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004310 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004311
Bar Hareleae87e32019-12-22 11:57:27 +02004312 def test_pathlike_subclasshook(self):
4313 # bpo-38878: subclasshook causes subclass checks
4314 # true on abstract implementation.
4315 class A(os.PathLike):
4316 pass
4317 self.assertFalse(issubclass(FakePath, A))
4318 self.assertTrue(issubclass(FakePath, os.PathLike))
4319
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004320 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004321 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004322
Victor Stinnerc29b5852017-11-02 07:28:27 -07004323
4324class TimesTests(unittest.TestCase):
4325 def test_times(self):
4326 times = os.times()
4327 self.assertIsInstance(times, os.times_result)
4328
4329 for field in ('user', 'system', 'children_user', 'children_system',
4330 'elapsed'):
4331 value = getattr(times, field)
4332 self.assertIsInstance(value, float)
4333
4334 if os.name == 'nt':
4335 self.assertEqual(times.children_user, 0)
4336 self.assertEqual(times.children_system, 0)
4337 self.assertEqual(times.elapsed, 0)
4338
4339
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004340# Only test if the C version is provided, otherwise TestPEP519 already tested
4341# the pure Python implementation.
4342if hasattr(os, "_fspath"):
4343 class TestPEP519PurePython(TestPEP519):
4344
4345 """Explicitly test the pure Python implementation of os.fspath()."""
4346
4347 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004348
4349
Fred Drake2e2be372001-09-20 21:33:42 +00004350if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004351 unittest.main()