blob: 5126c84cf30c68b89380176e2fb557bb09349d7e [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070028import types
Victor Stinner47aacc82015-06-12 17:26:23 +020029import unittest
30import uuid
31import warnings
32from test import support
Hai Shid94af3f2020-08-08 17:32:41 +080033from test.support import import_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080034from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030035from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080036from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080037from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070038from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020039
Antoine Pitrouec34ab52013-08-16 20:44:38 +020040try:
41 import resource
42except ImportError:
43 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020044try:
45 import fcntl
46except ImportError:
47 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010048try:
49 import _winapi
50except ImportError:
51 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020052try:
R David Murrayf2ad1732014-12-25 18:36:56 -050053 import pwd
54 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010055except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050056 all_users = []
57try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020058 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020059except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020061
Berker Peksagce643912015-05-06 06:33:17 +030062from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080063from test.support import unix_shell
64from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Victor Stinnerae39d232016-03-24 17:12:55 +010088def create_file(filename, content=b'content'):
89 with open(filename, "xb", 0) as fp:
90 fp.write(content)
91
92
Victor Stinner689830e2019-06-26 17:31:12 +020093class MiscTests(unittest.TestCase):
94 def test_getcwd(self):
95 cwd = os.getcwd()
96 self.assertIsInstance(cwd, str)
97
Victor Stinnerec3e20a2019-06-28 18:01:59 +020098 def test_getcwd_long_path(self):
99 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
100 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
101 # longer path if longer paths support is enabled. Internally, the os
102 # module uses MAXPATHLEN which is at least 1024.
103 #
104 # Use a directory name of 200 characters to fit into Windows MAX_PATH
105 # limit.
106 #
107 # On Windows, the test can stop when trying to create a path longer
108 # than MAX_PATH if long paths support is disabled:
109 # see RtlAreLongPathsEnabled().
110 min_len = 2000 # characters
111 dirlen = 200 # characters
112 dirname = 'python_test_dir_'
113 dirname = dirname + ('a' * (dirlen - len(dirname)))
114
115 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800116 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200117 expected = path
118
119 while True:
120 cwd = os.getcwd()
121 self.assertEqual(cwd, expected)
122
123 need = min_len - (len(cwd) + len(os.path.sep))
124 if need <= 0:
125 break
126 if len(dirname) > need and need > 0:
127 dirname = dirname[:need]
128
129 path = os.path.join(path, dirname)
130 try:
131 os.mkdir(path)
132 # On Windows, chdir() can fail
133 # even if mkdir() succeeded
134 os.chdir(path)
135 except FileNotFoundError:
136 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
137 # ERROR_FILENAME_EXCED_RANGE (206) errors
138 # ("The filename or extension is too long")
139 break
140 except OSError as exc:
141 if exc.errno == errno.ENAMETOOLONG:
142 break
143 else:
144 raise
145
146 expected = path
147
148 if support.verbose:
149 print(f"Tested current directory length: {len(cwd)}")
150
Victor Stinner689830e2019-06-26 17:31:12 +0200151 def test_getcwdb(self):
152 cwd = os.getcwdb()
153 self.assertIsInstance(cwd, bytes)
154 self.assertEqual(os.fsdecode(cwd), os.getcwd())
155
156
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000157# Tests creating TESTFN
158class FileTests(unittest.TestCase):
159 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800160 if os.path.lexists(os_helper.TESTFN):
161 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000162 tearDown = setUp
163
164 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800165 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000166 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800167 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000168
Christian Heimesfdab48e2008-01-20 09:06:41 +0000169 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800170 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000171 # We must allocate two consecutive file descriptors, otherwise
172 # it will mess up other file descriptors (perhaps even the three
173 # standard ones).
174 second = os.dup(first)
175 try:
176 retries = 0
177 while second != first + 1:
178 os.close(first)
179 retries += 1
180 if retries > 10:
181 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000182 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000183 first, second = second, os.dup(second)
184 finally:
185 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000186 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000187 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000188 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000189
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000190 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000191 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800192 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000193 old = sys.getrefcount(path)
194 self.assertRaises(TypeError, os.rename, path, 0)
195 new = sys.getrefcount(path)
196 self.assertEqual(old, new)
197
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000198 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800199 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000200 fobj.write(b"spam")
201 fobj.flush()
202 fd = fobj.fileno()
203 os.lseek(fd, 0, 0)
204 s = os.read(fd, 4)
205 self.assertEqual(type(s), bytes)
206 self.assertEqual(s, b"spam")
207
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200208 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200209 # Skip the test on 32-bit platforms: the number of bytes must fit in a
210 # Py_ssize_t type
211 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
212 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200213 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
214 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800215 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
216 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217
218 # Issue #21932: Make sure that os.read() does not raise an
219 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800220 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200221 data = os.read(fp.fileno(), size)
222
Victor Stinner8c663fd2017-11-08 14:44:44 -0800223 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200224 # operating system is free to return less bytes than requested.
225 self.assertEqual(data, b'test')
226
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000227 def test_write(self):
228 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800229 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000230 self.assertRaises(TypeError, os.write, fd, "beans")
231 os.write(fd, b"bacon\n")
232 os.write(fd, bytearray(b"eggs\n"))
233 os.write(fd, memoryview(b"spam\n"))
234 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800235 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000236 self.assertEqual(fobj.read().splitlines(),
237 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000238
Victor Stinnere0daff12011-03-20 23:36:35 +0100239 def write_windows_console(self, *args):
240 retcode = subprocess.call(args,
241 # use a new console to not flood the test output
242 creationflags=subprocess.CREATE_NEW_CONSOLE,
243 # use a shell to hide the console window (SW_HIDE)
244 shell=True)
245 self.assertEqual(retcode, 0)
246
247 @unittest.skipUnless(sys.platform == 'win32',
248 'test specific to the Windows console')
249 def test_write_windows_console(self):
250 # Issue #11395: the Windows console returns an error (12: not enough
251 # space error) on writing into stdout if stdout mode is binary and the
252 # length is greater than 66,000 bytes (or less, depending on heap
253 # usage).
254 code = "print('x' * 100000)"
255 self.write_windows_console(sys.executable, "-c", code)
256 self.write_windows_console(sys.executable, "-u", "-c", code)
257
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000258 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800259 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200260 f = os.fdopen(fd, *args)
261 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000262
263 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800264 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200265 os.close(fd)
266
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000267 self.fdopen_helper()
268 self.fdopen_helper('r')
269 self.fdopen_helper('r', 100)
270
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100271 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800272 TESTFN2 = os_helper.TESTFN + ".2"
273 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
274 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100275
Hai Shi0c4f0f32020-06-30 21:46:31 +0800276 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100277 create_file(TESTFN2, b"2")
278
Hai Shi0c4f0f32020-06-30 21:46:31 +0800279 os.replace(os_helper.TESTFN, TESTFN2)
280 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100281 with open(TESTFN2, 'r') as f:
282 self.assertEqual(f.read(), "1")
283
Martin Panterbf19d162015-09-09 01:01:13 +0000284 def test_open_keywords(self):
285 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
286 dir_fd=None)
287 os.close(f)
288
289 def test_symlink_keywords(self):
290 symlink = support.get_attribute(os, "symlink")
291 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800292 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000293 target_is_directory=False, dir_fd=None)
294 except (NotImplementedError, OSError):
295 pass # No OS support or unprivileged user
296
Pablo Galindoaac4d032019-05-31 19:39:47 +0100297 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
298 def test_copy_file_range_invalid_values(self):
299 with self.assertRaises(ValueError):
300 os.copy_file_range(0, 1, -10)
301
302 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
303 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800304 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100305 data = b'0123456789'
306
Hai Shi0c4f0f32020-06-30 21:46:31 +0800307 create_file(os_helper.TESTFN, data)
308 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100309
Hai Shi0c4f0f32020-06-30 21:46:31 +0800310 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100311 self.addCleanup(in_file.close)
312 in_fd = in_file.fileno()
313
314 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800315 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100316 self.addCleanup(out_file.close)
317 out_fd = out_file.fileno()
318
319 try:
320 i = os.copy_file_range(in_fd, out_fd, 5)
321 except OSError as e:
322 # Handle the case in which Python was compiled
323 # in a system with the syscall but without support
324 # in the kernel.
325 if e.errno != errno.ENOSYS:
326 raise
327 self.skipTest(e)
328 else:
329 # The number of copied bytes can be less than
330 # the number of bytes originally requested.
331 self.assertIn(i, range(0, 6));
332
333 with open(TESTFN2, 'rb') as in_file:
334 self.assertEqual(in_file.read(), data[:i])
335
336 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
337 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800338 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100339 data = b'0123456789'
340 bytes_to_copy = 6
341 in_skip = 3
342 out_seek = 5
343
Hai Shi0c4f0f32020-06-30 21:46:31 +0800344 create_file(os_helper.TESTFN, data)
345 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100346
Hai Shi0c4f0f32020-06-30 21:46:31 +0800347 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100348 self.addCleanup(in_file.close)
349 in_fd = in_file.fileno()
350
351 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800352 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100353 self.addCleanup(out_file.close)
354 out_fd = out_file.fileno()
355
356 try:
357 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
358 offset_src=in_skip,
359 offset_dst=out_seek)
360 except OSError as e:
361 # Handle the case in which Python was compiled
362 # in a system with the syscall but without support
363 # in the kernel.
364 if e.errno != errno.ENOSYS:
365 raise
366 self.skipTest(e)
367 else:
368 # The number of copied bytes can be less than
369 # the number of bytes originally requested.
370 self.assertIn(i, range(0, bytes_to_copy+1));
371
372 with open(TESTFN4, 'rb') as in_file:
373 read = in_file.read()
374 # seeked bytes (5) are zero'ed
375 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
376 # 012 are skipped (in_skip)
377 # 345678 are copied in the file (in_skip + bytes_to_copy)
378 self.assertEqual(read[out_seek:],
379 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200380
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381# Test attributes on return values from os.*stat* family.
382class StatAttributeTests(unittest.TestCase):
383 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800384 self.fname = os_helper.TESTFN
385 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100386 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Antoine Pitrou38425292010-09-21 18:19:07 +0000388 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000389 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390
391 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000392 self.assertEqual(result[stat.ST_SIZE], 3)
393 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000394
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000395 # Make sure all the attributes are there
396 members = dir(result)
397 for name in dir(stat):
398 if name[:3] == 'ST_':
399 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000400 if name.endswith("TIME"):
401 def trunc(x): return int(x)
402 else:
403 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000404 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000405 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000406 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000407
Larry Hastings6fe20b32012-04-19 15:07:49 -0700408 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700409 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700410 for name in 'st_atime st_mtime st_ctime'.split():
411 floaty = int(getattr(result, name) * 100000)
412 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700413 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700414
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000415 try:
416 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000418 except IndexError:
419 pass
420
421 # Make sure that assignment fails
422 try:
423 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200424 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000425 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000426 pass
427
428 try:
429 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200430 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000431 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000432 pass
433
434 try:
435 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except AttributeError:
438 pass
439
440 # Use the stat_result constructor with a too-short tuple.
441 try:
442 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200443 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000444 except TypeError:
445 pass
446
Ezio Melotti42da6632011-03-15 05:18:48 +0200447 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000448 try:
449 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
450 except TypeError:
451 pass
452
Antoine Pitrou38425292010-09-21 18:19:07 +0000453 def test_stat_attributes(self):
454 self.check_stat_attributes(self.fname)
455
456 def test_stat_attributes_bytes(self):
457 try:
458 fname = self.fname.encode(sys.getfilesystemencoding())
459 except UnicodeEncodeError:
460 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700461 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000462
Christian Heimes25827622013-10-12 01:27:08 +0200463 def test_stat_result_pickle(self):
464 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200465 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
466 p = pickle.dumps(result, proto)
467 self.assertIn(b'stat_result', p)
468 if proto < 4:
469 self.assertIn(b'cos\nstat_result\n', p)
470 unpickled = pickle.loads(p)
471 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200472
Serhiy Storchaka43767632013-11-03 21:31:38 +0200473 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000474 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700475 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000476
477 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000478 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000479
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000480 # Make sure all the attributes are there.
481 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
482 'ffree', 'favail', 'flag', 'namemax')
483 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000484 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000485
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100486 self.assertTrue(isinstance(result.f_fsid, int))
487
488 # Test that the size of the tuple doesn't change
489 self.assertEqual(len(result), 10)
490
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000491 # Make sure that assignment really fails
492 try:
493 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200494 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000495 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000496 pass
497
498 try:
499 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except AttributeError:
502 pass
503
504 # Use the constructor with a too-short tuple.
505 try:
506 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200507 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000508 except TypeError:
509 pass
510
Ezio Melotti42da6632011-03-15 05:18:48 +0200511 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000512 try:
513 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
514 except TypeError:
515 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000516
Christian Heimes25827622013-10-12 01:27:08 +0200517 @unittest.skipUnless(hasattr(os, 'statvfs'),
518 "need os.statvfs()")
519 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700520 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200521
Serhiy Storchakabad12572014-12-15 14:03:42 +0200522 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
523 p = pickle.dumps(result, proto)
524 self.assertIn(b'statvfs_result', p)
525 if proto < 4:
526 self.assertIn(b'cos\nstatvfs_result\n', p)
527 unpickled = pickle.loads(p)
528 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200529
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
531 def test_1686475(self):
532 # Verify that an open file can be stat'ed
533 try:
534 os.stat(r"c:\pagefile.sys")
535 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600536 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200537 except OSError as e:
538 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000539
Serhiy Storchaka43767632013-11-03 21:31:38 +0200540 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
541 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
542 def test_15261(self):
543 # Verify that stat'ing a closed fd does not cause crash
544 r, w = os.pipe()
545 try:
546 os.stat(r) # should not raise error
547 finally:
548 os.close(r)
549 os.close(w)
550 with self.assertRaises(OSError) as ctx:
551 os.stat(r)
552 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100553
Zachary Ware63f277b2014-06-19 09:46:37 -0500554 def check_file_attributes(self, result):
555 self.assertTrue(hasattr(result, 'st_file_attributes'))
556 self.assertTrue(isinstance(result.st_file_attributes, int))
557 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
558
559 @unittest.skipUnless(sys.platform == "win32",
560 "st_file_attributes is Win32 specific")
561 def test_file_attributes(self):
562 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
563 result = os.stat(self.fname)
564 self.check_file_attributes(result)
565 self.assertEqual(
566 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
567 0)
568
569 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800570 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200571 os.mkdir(dirname)
572 self.addCleanup(os.rmdir, dirname)
573
574 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500575 self.check_file_attributes(result)
576 self.assertEqual(
577 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
578 stat.FILE_ATTRIBUTE_DIRECTORY)
579
Berker Peksag0b4dc482016-09-17 15:49:59 +0300580 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
581 def test_access_denied(self):
582 # Default to FindFirstFile WIN32_FIND_DATA when access is
583 # denied. See issue 28075.
584 # os.environ['TEMP'] should be located on a volume that
585 # supports file ACLs.
586 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800587 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300588 create_file(fname, b'ABC')
589 # Deny the right to [S]YNCHRONIZE on the file to
590 # force CreateFile to fail with ERROR_ACCESS_DENIED.
591 DETACHED_PROCESS = 8
592 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500593 # bpo-30584: Use security identifier *S-1-5-32-545 instead
594 # of localized "Users" to not depend on the locale.
595 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300596 creationflags=DETACHED_PROCESS
597 )
598 result = os.stat(fname)
599 self.assertNotEqual(result.st_size, 0)
600
Steve Dower772ec0f2019-09-04 14:42:54 -0700601 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
602 def test_stat_block_device(self):
603 # bpo-38030: os.stat fails for block devices
604 # Test a filename like "//./C:"
605 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
606 result = os.stat(fname)
607 self.assertEqual(result.st_mode, stat.S_IFBLK)
608
Victor Stinner47aacc82015-06-12 17:26:23 +0200609
610class UtimeTests(unittest.TestCase):
611 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800612 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200613 self.fname = os.path.join(self.dirname, "f1")
614
Hai Shi0c4f0f32020-06-30 21:46:31 +0800615 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200616 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100617 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200618
Victor Stinner47aacc82015-06-12 17:26:23 +0200619 def support_subsecond(self, filename):
620 # Heuristic to check if the filesystem supports timestamp with
621 # subsecond resolution: check if float and int timestamps are different
622 st = os.stat(filename)
623 return ((st.st_atime != st[7])
624 or (st.st_mtime != st[8])
625 or (st.st_ctime != st[9]))
626
627 def _test_utime(self, set_time, filename=None):
628 if not filename:
629 filename = self.fname
630
631 support_subsecond = self.support_subsecond(filename)
632 if support_subsecond:
633 # Timestamp with a resolution of 1 microsecond (10^-6).
634 #
635 # The resolution of the C internal function used by os.utime()
636 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
637 # test with a resolution of 1 ns requires more work:
638 # see the issue #15745.
639 atime_ns = 1002003000 # 1.002003 seconds
640 mtime_ns = 4005006000 # 4.005006 seconds
641 else:
642 # use a resolution of 1 second
643 atime_ns = 5 * 10**9
644 mtime_ns = 8 * 10**9
645
646 set_time(filename, (atime_ns, mtime_ns))
647 st = os.stat(filename)
648
649 if support_subsecond:
650 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
651 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
652 else:
653 self.assertEqual(st.st_atime, atime_ns * 1e-9)
654 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
655 self.assertEqual(st.st_atime_ns, atime_ns)
656 self.assertEqual(st.st_mtime_ns, mtime_ns)
657
658 def test_utime(self):
659 def set_time(filename, ns):
660 # test the ns keyword parameter
661 os.utime(filename, ns=ns)
662 self._test_utime(set_time)
663
664 @staticmethod
665 def ns_to_sec(ns):
666 # Convert a number of nanosecond (int) to a number of seconds (float).
667 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
668 # issue, os.utime() rounds towards minus infinity.
669 return (ns * 1e-9) + 0.5e-9
670
671 def test_utime_by_indexed(self):
672 # pass times as floating point seconds as the second indexed parameter
673 def set_time(filename, ns):
674 atime_ns, mtime_ns = ns
675 atime = self.ns_to_sec(atime_ns)
676 mtime = self.ns_to_sec(mtime_ns)
677 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
678 # or utime(time_t)
679 os.utime(filename, (atime, mtime))
680 self._test_utime(set_time)
681
682 def test_utime_by_times(self):
683 def set_time(filename, ns):
684 atime_ns, mtime_ns = ns
685 atime = self.ns_to_sec(atime_ns)
686 mtime = self.ns_to_sec(mtime_ns)
687 # test the times keyword parameter
688 os.utime(filename, times=(atime, mtime))
689 self._test_utime(set_time)
690
691 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
692 "follow_symlinks support for utime required "
693 "for this test.")
694 def test_utime_nofollow_symlinks(self):
695 def set_time(filename, ns):
696 # use follow_symlinks=False to test utimensat(timespec)
697 # or lutimes(timeval)
698 os.utime(filename, ns=ns, follow_symlinks=False)
699 self._test_utime(set_time)
700
701 @unittest.skipUnless(os.utime in os.supports_fd,
702 "fd support for utime required for this test.")
703 def test_utime_fd(self):
704 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100705 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200706 # use a file descriptor to test futimens(timespec)
707 # or futimes(timeval)
708 os.utime(fp.fileno(), ns=ns)
709 self._test_utime(set_time)
710
711 @unittest.skipUnless(os.utime in os.supports_dir_fd,
712 "dir_fd support for utime required for this test.")
713 def test_utime_dir_fd(self):
714 def set_time(filename, ns):
715 dirname, name = os.path.split(filename)
716 dirfd = os.open(dirname, os.O_RDONLY)
717 try:
718 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
719 os.utime(name, dir_fd=dirfd, ns=ns)
720 finally:
721 os.close(dirfd)
722 self._test_utime(set_time)
723
724 def test_utime_directory(self):
725 def set_time(filename, ns):
726 # test calling os.utime() on a directory
727 os.utime(filename, ns=ns)
728 self._test_utime(set_time, filename=self.dirname)
729
730 def _test_utime_current(self, set_time):
731 # Get the system clock
732 current = time.time()
733
734 # Call os.utime() to set the timestamp to the current system clock
735 set_time(self.fname)
736
737 if not self.support_subsecond(self.fname):
738 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700739 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200740 # On Windows, the usual resolution of time.time() is 15.6 ms.
741 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700742 #
743 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
744 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200745 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200746 st = os.stat(self.fname)
747 msg = ("st_time=%r, current=%r, dt=%r"
748 % (st.st_mtime, current, st.st_mtime - current))
749 self.assertAlmostEqual(st.st_mtime, current,
750 delta=delta, msg=msg)
751
752 def test_utime_current(self):
753 def set_time(filename):
754 # Set to the current time in the new way
755 os.utime(self.fname)
756 self._test_utime_current(set_time)
757
758 def test_utime_current_old(self):
759 def set_time(filename):
760 # Set to the current time in the old explicit way.
761 os.utime(self.fname, None)
762 self._test_utime_current(set_time)
763
764 def get_file_system(self, path):
765 if sys.platform == 'win32':
766 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
767 import ctypes
768 kernel32 = ctypes.windll.kernel32
769 buf = ctypes.create_unicode_buffer("", 100)
770 ok = kernel32.GetVolumeInformationW(root, None, 0,
771 None, None, None,
772 buf, len(buf))
773 if ok:
774 return buf.value
775 # return None if the filesystem is unknown
776
777 def test_large_time(self):
778 # Many filesystems are limited to the year 2038. At least, the test
779 # pass with NTFS filesystem.
780 if self.get_file_system(self.dirname) != "NTFS":
781 self.skipTest("requires NTFS")
782
783 large = 5000000000 # some day in 2128
784 os.utime(self.fname, (large, large))
785 self.assertEqual(os.stat(self.fname).st_mtime, large)
786
787 def test_utime_invalid_arguments(self):
788 # seconds and nanoseconds parameters are mutually exclusive
789 with self.assertRaises(ValueError):
790 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200791 with self.assertRaises(TypeError):
792 os.utime(self.fname, [5, 5])
793 with self.assertRaises(TypeError):
794 os.utime(self.fname, (5,))
795 with self.assertRaises(TypeError):
796 os.utime(self.fname, (5, 5, 5))
797 with self.assertRaises(TypeError):
798 os.utime(self.fname, ns=[5, 5])
799 with self.assertRaises(TypeError):
800 os.utime(self.fname, ns=(5,))
801 with self.assertRaises(TypeError):
802 os.utime(self.fname, ns=(5, 5, 5))
803
804 if os.utime not in os.supports_follow_symlinks:
805 with self.assertRaises(NotImplementedError):
806 os.utime(self.fname, (5, 5), follow_symlinks=False)
807 if os.utime not in os.supports_fd:
808 with open(self.fname, 'wb', 0) as fp:
809 with self.assertRaises(TypeError):
810 os.utime(fp.fileno(), (5, 5))
811 if os.utime not in os.supports_dir_fd:
812 with self.assertRaises(NotImplementedError):
813 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200814
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300815 @support.cpython_only
816 def test_issue31577(self):
817 # The interpreter shouldn't crash in case utime() received a bad
818 # ns argument.
819 def get_bad_int(divmod_ret_val):
820 class BadInt:
821 def __divmod__(*args):
822 return divmod_ret_val
823 return BadInt()
824 with self.assertRaises(TypeError):
825 os.utime(self.fname, ns=(get_bad_int(42), 1))
826 with self.assertRaises(TypeError):
827 os.utime(self.fname, ns=(get_bad_int(()), 1))
828 with self.assertRaises(TypeError):
829 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
830
Victor Stinner47aacc82015-06-12 17:26:23 +0200831
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000832from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000833
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000834class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000835 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000836 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000837
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 def setUp(self):
839 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000840 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000841 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000842 for key, value in self._reference().items():
843 os.environ[key] = value
844
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000845 def tearDown(self):
846 os.environ.clear()
847 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000848 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000849 os.environb.clear()
850 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000851
Christian Heimes90333392007-11-01 19:08:42 +0000852 def _reference(self):
853 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
854
855 def _empty_mapping(self):
856 os.environ.clear()
857 return os.environ
858
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000859 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200860 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
861 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000862 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000863 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300864 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300866 value = popen.read().strip()
867 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000868
Xavier de Gayed1415312016-07-22 12:15:29 +0200869 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
870 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000871 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200872 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
873 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300874 it = iter(popen)
875 self.assertEqual(next(it), "line1\n")
876 self.assertEqual(next(it), "line2\n")
877 self.assertEqual(next(it), "line3\n")
878 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000879
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000880 # Verify environ keys and values from the OS are of the
881 # correct str type.
882 def test_keyvalue_types(self):
883 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000884 self.assertEqual(type(key), str)
885 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000886
Christian Heimes90333392007-11-01 19:08:42 +0000887 def test_items(self):
888 for key, value in self._reference().items():
889 self.assertEqual(os.environ.get(key), value)
890
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891 # Issue 7310
892 def test___repr__(self):
893 """Check that the repr() of os.environ looks like environ({...})."""
894 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000895 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
896 '{!r}: {!r}'.format(key, value)
897 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000898
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000899 def test_get_exec_path(self):
900 defpath_list = os.defpath.split(os.pathsep)
901 test_path = ['/monty', '/python', '', '/flying/circus']
902 test_env = {'PATH': os.pathsep.join(test_path)}
903
904 saved_environ = os.environ
905 try:
906 os.environ = dict(test_env)
907 # Test that defaulting to os.environ works.
908 self.assertSequenceEqual(test_path, os.get_exec_path())
909 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
910 finally:
911 os.environ = saved_environ
912
913 # No PATH environment variable
914 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
915 # Empty PATH environment variable
916 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
917 # Supplied PATH environment variable
918 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
919
Victor Stinnerb745a742010-05-18 17:17:23 +0000920 if os.supports_bytes_environ:
921 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000922 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000923 # ignore BytesWarning warning
924 with warnings.catch_warnings(record=True):
925 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000926 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000927 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000928 pass
929 else:
930 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000931
932 # bytes key and/or value
933 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
934 ['abc'])
935 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
936 ['abc'])
937 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
938 ['abc'])
939
940 @unittest.skipUnless(os.supports_bytes_environ,
941 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000942 def test_environb(self):
943 # os.environ -> os.environb
944 value = 'euro\u20ac'
945 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000946 value_bytes = value.encode(sys.getfilesystemencoding(),
947 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000948 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000949 msg = "U+20AC character is not encodable to %s" % (
950 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000951 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000952 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(os.environ['unicode'], value)
954 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000955
956 # os.environb -> os.environ
957 value = b'\xff'
958 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000959 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000960 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000961 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000962
Victor Stinner161e7b32020-01-24 11:53:44 +0100963 def test_putenv_unsetenv(self):
964 name = "PYTHONTESTVAR"
965 value = "testvalue"
966 code = f'import os; print(repr(os.environ.get({name!r})))'
967
Hai Shi0c4f0f32020-06-30 21:46:31 +0800968 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +0100969 env.pop(name, None)
970
971 os.putenv(name, value)
972 proc = subprocess.run([sys.executable, '-c', code], check=True,
973 stdout=subprocess.PIPE, text=True)
974 self.assertEqual(proc.stdout.rstrip(), repr(value))
975
976 os.unsetenv(name)
977 proc = subprocess.run([sys.executable, '-c', code], check=True,
978 stdout=subprocess.PIPE, text=True)
979 self.assertEqual(proc.stdout.rstrip(), repr(None))
980
Victor Stinner13ff2452018-01-22 18:32:50 +0100981 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100982 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100983 def test_putenv_unsetenv_error(self):
984 # Empty variable name is invalid.
985 # "=" and null character are not allowed in a variable name.
986 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
987 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
988 self.assertRaises((OSError, ValueError), os.unsetenv, name)
989
Victor Stinnerb73dd022020-01-22 21:11:17 +0100990 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100991 # On Windows, an environment variable string ("name=value" string)
992 # is limited to 32,767 characters
993 longstr = 'x' * 32_768
994 self.assertRaises(ValueError, os.putenv, longstr, "1")
995 self.assertRaises(ValueError, os.putenv, "X", longstr)
996 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +0100997
Victor Stinner6d101392013-04-14 16:35:04 +0200998 def test_key_type(self):
999 missing = 'missingkey'
1000 self.assertNotIn(missing, os.environ)
1001
Victor Stinner839e5ea2013-04-14 16:43:03 +02001002 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001003 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001004 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001005 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001006
Victor Stinner839e5ea2013-04-14 16:43:03 +02001007 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001008 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001009 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001010 self.assertTrue(cm.exception.__suppress_context__)
1011
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001012 def _test_environ_iteration(self, collection):
1013 iterator = iter(collection)
1014 new_key = "__new_key__"
1015
1016 next(iterator) # start iteration over os.environ.items
1017
1018 # add a new key in os.environ mapping
1019 os.environ[new_key] = "test_environ_iteration"
1020
1021 try:
1022 next(iterator) # force iteration over modified mapping
1023 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1024 finally:
1025 del os.environ[new_key]
1026
1027 def test_iter_error_when_changing_os_environ(self):
1028 self._test_environ_iteration(os.environ)
1029
1030 def test_iter_error_when_changing_os_environ_items(self):
1031 self._test_environ_iteration(os.environ.items())
1032
1033 def test_iter_error_when_changing_os_environ_values(self):
1034 self._test_environ_iteration(os.environ.values())
1035
Charles Burklandd648ef12020-03-13 09:04:43 -07001036 def _test_underlying_process_env(self, var, expected):
1037 if not (unix_shell and os.path.exists(unix_shell)):
1038 return
1039
1040 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1041 value = popen.read().strip()
1042
1043 self.assertEqual(expected, value)
1044
1045 def test_or_operator(self):
1046 overridden_key = '_TEST_VAR_'
1047 original_value = 'original_value'
1048 os.environ[overridden_key] = original_value
1049
1050 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1051 expected = dict(os.environ)
1052 expected.update(new_vars_dict)
1053
1054 actual = os.environ | new_vars_dict
1055 self.assertDictEqual(expected, actual)
1056 self.assertEqual('3', actual[overridden_key])
1057
1058 new_vars_items = new_vars_dict.items()
1059 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1060
1061 self._test_underlying_process_env('_A_', '')
1062 self._test_underlying_process_env(overridden_key, original_value)
1063
1064 def test_ior_operator(self):
1065 overridden_key = '_TEST_VAR_'
1066 os.environ[overridden_key] = 'original_value'
1067
1068 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1069 expected = dict(os.environ)
1070 expected.update(new_vars_dict)
1071
1072 os.environ |= new_vars_dict
1073 self.assertEqual(expected, os.environ)
1074 self.assertEqual('3', os.environ[overridden_key])
1075
1076 self._test_underlying_process_env('_A_', '1')
1077 self._test_underlying_process_env(overridden_key, '3')
1078
1079 def test_ior_operator_invalid_dicts(self):
1080 os_environ_copy = os.environ.copy()
1081 with self.assertRaises(TypeError):
1082 dict_with_bad_key = {1: '_A_'}
1083 os.environ |= dict_with_bad_key
1084
1085 with self.assertRaises(TypeError):
1086 dict_with_bad_val = {'_A_': 1}
1087 os.environ |= dict_with_bad_val
1088
1089 # Check nothing was added.
1090 self.assertEqual(os_environ_copy, os.environ)
1091
1092 def test_ior_operator_key_value_iterable(self):
1093 overridden_key = '_TEST_VAR_'
1094 os.environ[overridden_key] = 'original_value'
1095
1096 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1097 expected = dict(os.environ)
1098 expected.update(new_vars_items)
1099
1100 os.environ |= new_vars_items
1101 self.assertEqual(expected, os.environ)
1102 self.assertEqual('3', os.environ[overridden_key])
1103
1104 self._test_underlying_process_env('_A_', '1')
1105 self._test_underlying_process_env(overridden_key, '3')
1106
1107 def test_ror_operator(self):
1108 overridden_key = '_TEST_VAR_'
1109 original_value = 'original_value'
1110 os.environ[overridden_key] = original_value
1111
1112 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1113 expected = dict(new_vars_dict)
1114 expected.update(os.environ)
1115
1116 actual = new_vars_dict | os.environ
1117 self.assertDictEqual(expected, actual)
1118 self.assertEqual(original_value, actual[overridden_key])
1119
1120 new_vars_items = new_vars_dict.items()
1121 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1122
1123 self._test_underlying_process_env('_A_', '')
1124 self._test_underlying_process_env(overridden_key, original_value)
1125
Victor Stinner6d101392013-04-14 16:35:04 +02001126
Tim Petersc4e09402003-04-25 07:11:48 +00001127class WalkTests(unittest.TestCase):
1128 """Tests for os.walk()."""
1129
Victor Stinner0561c532015-03-12 10:28:24 +01001130 # Wrapper to hide minor differences between os.walk and os.fwalk
1131 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001132 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001133 if 'follow_symlinks' in kwargs:
1134 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001135 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001136
Charles-François Natali7372b062012-02-05 15:15:38 +01001137 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001138 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001139 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001140
1141 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001142 # TESTFN/
1143 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001144 # tmp1
1145 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001146 # tmp2
1147 # SUB11/ no kids
1148 # SUB2/ a file kid and a dirsymlink kid
1149 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001150 # SUB21/ not readable
1151 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001152 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001153 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001154 # broken_link2
1155 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001156 # TEST2/
1157 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001158 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001159 self.sub1_path = join(self.walk_path, "SUB1")
1160 self.sub11_path = join(self.sub1_path, "SUB11")
1161 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001162 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001163 tmp1_path = join(self.walk_path, "tmp1")
1164 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001165 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001166 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001167 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001168 t2_path = join(os_helper.TESTFN, "TEST2")
1169 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001170 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001171 broken_link2_path = join(sub2_path, "broken_link2")
1172 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001173
1174 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001175 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001176 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001177 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001178 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001179
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001180 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001181 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001182 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001183
Hai Shi0c4f0f32020-06-30 21:46:31 +08001184 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001185 os.symlink(os.path.abspath(t2_path), self.link_path)
1186 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001187 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1188 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001189 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001190 ["broken_link", "broken_link2", "broken_link3",
1191 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001192 else:
pxinwr3e028b22019-02-15 13:04:47 +08001193 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001194
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001195 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001196 try:
1197 os.listdir(sub21_path)
1198 except PermissionError:
1199 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1200 else:
1201 os.chmod(sub21_path, stat.S_IRWXU)
1202 os.unlink(tmp5_path)
1203 os.rmdir(sub21_path)
1204 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001205
Victor Stinner0561c532015-03-12 10:28:24 +01001206 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001207 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001208 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001209
Tim Petersc4e09402003-04-25 07:11:48 +00001210 self.assertEqual(len(all), 4)
1211 # We can't know which order SUB1 and SUB2 will appear in.
1212 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1213 # flipped: TESTFN, SUB2, SUB1, SUB11
1214 flipped = all[0][1][0] != "SUB1"
1215 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001216 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001217 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001218 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1219 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1220 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1221 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001222
Brett Cannon3f9183b2016-08-26 14:44:48 -07001223 def test_walk_prune(self, walk_path=None):
1224 if walk_path is None:
1225 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001226 # Prune the search.
1227 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001228 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001229 all.append((root, dirs, files))
1230 # Don't descend into SUB1.
1231 if 'SUB1' in dirs:
1232 # Note that this also mutates the dirs we appended to all!
1233 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001234
Victor Stinner0561c532015-03-12 10:28:24 +01001235 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001236 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001237
1238 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001239 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001240 self.assertEqual(all[1], self.sub2_tree)
1241
Brett Cannon3f9183b2016-08-26 14:44:48 -07001242 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001243 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001244
Victor Stinner0561c532015-03-12 10:28:24 +01001245 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001246 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001247 all = list(self.walk(self.walk_path, topdown=False))
1248
Victor Stinner53b0a412016-03-26 01:12:36 +01001249 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001250 # We can't know which order SUB1 and SUB2 will appear in.
1251 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1252 # flipped: SUB2, SUB11, SUB1, TESTFN
1253 flipped = all[3][1][0] != "SUB1"
1254 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001255 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001256 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001257 self.assertEqual(all[3],
1258 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1259 self.assertEqual(all[flipped],
1260 (self.sub11_path, [], []))
1261 self.assertEqual(all[flipped + 1],
1262 (self.sub1_path, ["SUB11"], ["tmp2"]))
1263 self.assertEqual(all[2 - 2 * flipped],
1264 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001265
Victor Stinner0561c532015-03-12 10:28:24 +01001266 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001267 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001268 self.skipTest("need symlink support")
1269
1270 # Walk, following symlinks.
1271 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1272 for root, dirs, files in walk_it:
1273 if root == self.link_path:
1274 self.assertEqual(dirs, [])
1275 self.assertEqual(files, ["tmp4"])
1276 break
1277 else:
1278 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001279
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001280 def test_walk_bad_dir(self):
1281 # Walk top-down.
1282 errors = []
1283 walk_it = self.walk(self.walk_path, onerror=errors.append)
1284 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001285 self.assertEqual(errors, [])
1286 dir1 = 'SUB1'
1287 path1 = os.path.join(root, dir1)
1288 path1new = os.path.join(root, dir1 + '.new')
1289 os.rename(path1, path1new)
1290 try:
1291 roots = [r for r, d, f in walk_it]
1292 self.assertTrue(errors)
1293 self.assertNotIn(path1, roots)
1294 self.assertNotIn(path1new, roots)
1295 for dir2 in dirs:
1296 if dir2 != dir1:
1297 self.assertIn(os.path.join(root, dir2), roots)
1298 finally:
1299 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001300
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001301 def test_walk_many_open_files(self):
1302 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001303 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001304 p = os.path.join(base, *(['d']*depth))
1305 os.makedirs(p)
1306
1307 iters = [self.walk(base, topdown=False) for j in range(100)]
1308 for i in range(depth + 1):
1309 expected = (p, ['d'] if i else [], [])
1310 for it in iters:
1311 self.assertEqual(next(it), expected)
1312 p = os.path.dirname(p)
1313
1314 iters = [self.walk(base, topdown=True) for j in range(100)]
1315 p = base
1316 for i in range(depth + 1):
1317 expected = (p, ['d'] if i < depth else [], [])
1318 for it in iters:
1319 self.assertEqual(next(it), expected)
1320 p = os.path.join(p, 'd')
1321
Charles-François Natali7372b062012-02-05 15:15:38 +01001322
1323@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1324class FwalkTests(WalkTests):
1325 """Tests for os.fwalk()."""
1326
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001327 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001328 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001329 yield (root, dirs, files)
1330
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001331 def fwalk(self, *args, **kwargs):
1332 return os.fwalk(*args, **kwargs)
1333
Larry Hastingsc48fe982012-06-25 04:49:05 -07001334 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1335 """
1336 compare with walk() results.
1337 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001338 walk_kwargs = walk_kwargs.copy()
1339 fwalk_kwargs = fwalk_kwargs.copy()
1340 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1341 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1342 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001343
Charles-François Natali7372b062012-02-05 15:15:38 +01001344 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001345 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001346 expected[root] = (set(dirs), set(files))
1347
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001348 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001349 self.assertIn(root, expected)
1350 self.assertEqual(expected[root], (set(dirs), set(files)))
1351
Larry Hastingsc48fe982012-06-25 04:49:05 -07001352 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001353 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001354 self._compare_to_walk(kwargs, kwargs)
1355
Charles-François Natali7372b062012-02-05 15:15:38 +01001356 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001357 try:
1358 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001359 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001360 fwalk_kwargs = walk_kwargs.copy()
1361 fwalk_kwargs['dir_fd'] = fd
1362 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1363 finally:
1364 os.close(fd)
1365
1366 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001367 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001368 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001369 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001370 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001371 # check that the FD is valid
1372 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001373 # redundant check
1374 os.stat(rootfd)
1375 # check that listdir() returns consistent information
1376 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001377
1378 def test_fd_leak(self):
1379 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1380 # we both check that calling fwalk() a large number of times doesn't
1381 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1382 minfd = os.dup(1)
1383 os.close(minfd)
1384 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001385 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001386 pass
1387 newfd = os.dup(1)
1388 self.addCleanup(os.close, newfd)
1389 self.assertEqual(newfd, minfd)
1390
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001391 # fwalk() keeps file descriptors open
1392 test_walk_many_open_files = None
1393
1394
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001395class BytesWalkTests(WalkTests):
1396 """Tests for os.walk() with bytes."""
1397 def walk(self, top, **kwargs):
1398 if 'follow_symlinks' in kwargs:
1399 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1400 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1401 root = os.fsdecode(broot)
1402 dirs = list(map(os.fsdecode, bdirs))
1403 files = list(map(os.fsdecode, bfiles))
1404 yield (root, dirs, files)
1405 bdirs[:] = list(map(os.fsencode, dirs))
1406 bfiles[:] = list(map(os.fsencode, files))
1407
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001408@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1409class BytesFwalkTests(FwalkTests):
1410 """Tests for os.walk() with bytes."""
1411 def fwalk(self, top='.', *args, **kwargs):
1412 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1413 root = os.fsdecode(broot)
1414 dirs = list(map(os.fsdecode, bdirs))
1415 files = list(map(os.fsdecode, bfiles))
1416 yield (root, dirs, files, topfd)
1417 bdirs[:] = list(map(os.fsencode, dirs))
1418 bfiles[:] = list(map(os.fsencode, files))
1419
Charles-François Natali7372b062012-02-05 15:15:38 +01001420
Guido van Rossume7ba4952007-06-06 23:52:48 +00001421class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001422 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001423 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001424
1425 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001426 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001427 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1428 os.makedirs(path) # Should work
1429 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1430 os.makedirs(path)
1431
1432 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001433 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001434 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1435 os.makedirs(path)
1436 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1437 'dir5', 'dir6')
1438 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001439
Serhiy Storchakae304e332017-03-24 13:27:42 +02001440 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001441 with os_helper.temp_umask(0o002):
1442 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001443 parent = os.path.join(base, 'dir1')
1444 path = os.path.join(parent, 'dir2')
1445 os.makedirs(path, 0o555)
1446 self.assertTrue(os.path.exists(path))
1447 self.assertTrue(os.path.isdir(path))
1448 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001449 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1450 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001451
Terry Reedy5a22b652010-12-02 07:05:56 +00001452 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001453 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001454 mode = 0o777
1455 old_mask = os.umask(0o022)
1456 os.makedirs(path, mode)
1457 self.assertRaises(OSError, os.makedirs, path, mode)
1458 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001459 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001460 os.makedirs(path, mode=mode, exist_ok=True)
1461 os.umask(old_mask)
1462
Martin Pantera82642f2015-11-19 04:48:44 +00001463 # Issue #25583: A drive root could raise PermissionError on Windows
1464 os.makedirs(os.path.abspath('/'), exist_ok=True)
1465
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001466 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001467 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001468 S_ISGID = stat.S_ISGID
1469 mode = 0o777
1470 old_mask = os.umask(0o022)
1471 try:
1472 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001473 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001474 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001475 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001476 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001477 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001478 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001479 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1480 # The os should apply S_ISGID from the parent dir for us, but
1481 # this test need not depend on that behavior. Be explicit.
1482 os.makedirs(path, mode | S_ISGID)
1483 # http://bugs.python.org/issue14992
1484 # Should not fail when the bit is already set.
1485 os.makedirs(path, mode, exist_ok=True)
1486 # remove the bit.
1487 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001488 # May work even when the bit is not already set when demanded.
1489 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001490 finally:
1491 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001492
1493 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001494 base = os_helper.TESTFN
1495 path = os.path.join(os_helper.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001496 with open(path, 'w') as f:
1497 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001498 self.assertRaises(OSError, os.makedirs, path)
1499 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1500 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1501 os.remove(path)
1502
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001503 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001504 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001505 'dir4', 'dir5', 'dir6')
1506 # If the tests failed, the bottom-most directory ('../dir6')
1507 # may not have been created, so we look for the outermost directory
1508 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001509 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001510 path = os.path.dirname(path)
1511
1512 os.removedirs(path)
1513
Andrew Svetlov405faed2012-12-25 12:18:09 +02001514
R David Murrayf2ad1732014-12-25 18:36:56 -05001515@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1516class ChownFileTests(unittest.TestCase):
1517
Berker Peksag036a71b2015-07-21 09:29:48 +03001518 @classmethod
1519 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001520 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001521
1522 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001523 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001524 uid = stat.st_uid
1525 gid = stat.st_gid
1526 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001527 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1528 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1529 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1530 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001531
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001532 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1533 def test_chown_gid(self):
1534 groups = os.getgroups()
1535 if len(groups) < 2:
1536 self.skipTest("test needs at least 2 groups")
1537
R David Murrayf2ad1732014-12-25 18:36:56 -05001538 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001539 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001540
Hai Shi0c4f0f32020-06-30 21:46:31 +08001541 os.chown(os_helper.TESTFN, uid, gid_1)
1542 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001543 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001544
Hai Shi0c4f0f32020-06-30 21:46:31 +08001545 os.chown(os_helper.TESTFN, uid, gid_2)
1546 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001547 self.assertEqual(gid, gid_2)
1548
1549 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1550 "test needs root privilege and more than one user")
1551 def test_chown_with_root(self):
1552 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001553 gid = os.stat(os_helper.TESTFN).st_gid
1554 os.chown(os_helper.TESTFN, uid_1, gid)
1555 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001556 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001557 os.chown(os_helper.TESTFN, uid_2, gid)
1558 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001559 self.assertEqual(uid, uid_2)
1560
1561 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1562 "test needs non-root account and more than one user")
1563 def test_chown_without_permission(self):
1564 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001565 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001566 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001567 os.chown(os_helper.TESTFN, uid_1, gid)
1568 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001569
Berker Peksag036a71b2015-07-21 09:29:48 +03001570 @classmethod
1571 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001572 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001573
1574
Andrew Svetlov405faed2012-12-25 12:18:09 +02001575class RemoveDirsTests(unittest.TestCase):
1576 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001577 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001578
1579 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001580 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001581
1582 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001583 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001584 os.mkdir(dira)
1585 dirb = os.path.join(dira, 'dirb')
1586 os.mkdir(dirb)
1587 os.removedirs(dirb)
1588 self.assertFalse(os.path.exists(dirb))
1589 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001590 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001591
1592 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001593 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001594 os.mkdir(dira)
1595 dirb = os.path.join(dira, 'dirb')
1596 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001597 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001598 os.removedirs(dirb)
1599 self.assertFalse(os.path.exists(dirb))
1600 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001601 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001602
1603 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001604 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001605 os.mkdir(dira)
1606 dirb = os.path.join(dira, 'dirb')
1607 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001608 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001609 with self.assertRaises(OSError):
1610 os.removedirs(dirb)
1611 self.assertTrue(os.path.exists(dirb))
1612 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001613 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001614
1615
Guido van Rossume7ba4952007-06-06 23:52:48 +00001616class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001617 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001618 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001619 f.write(b'hello')
1620 f.close()
1621 with open(os.devnull, 'rb') as f:
1622 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001623
Andrew Svetlov405faed2012-12-25 12:18:09 +02001624
Guido van Rossume7ba4952007-06-06 23:52:48 +00001625class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001626 def test_urandom_length(self):
1627 self.assertEqual(len(os.urandom(0)), 0)
1628 self.assertEqual(len(os.urandom(1)), 1)
1629 self.assertEqual(len(os.urandom(10)), 10)
1630 self.assertEqual(len(os.urandom(100)), 100)
1631 self.assertEqual(len(os.urandom(1000)), 1000)
1632
1633 def test_urandom_value(self):
1634 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001635 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001636 data2 = os.urandom(16)
1637 self.assertNotEqual(data1, data2)
1638
1639 def get_urandom_subprocess(self, count):
1640 code = '\n'.join((
1641 'import os, sys',
1642 'data = os.urandom(%s)' % count,
1643 'sys.stdout.buffer.write(data)',
1644 'sys.stdout.buffer.flush()'))
1645 out = assert_python_ok('-c', code)
1646 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001647 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001648 return stdout
1649
1650 def test_urandom_subprocess(self):
1651 data1 = self.get_urandom_subprocess(16)
1652 data2 = self.get_urandom_subprocess(16)
1653 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001654
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001655
Victor Stinner9b1f4742016-09-06 16:18:52 -07001656@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1657class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001658 @classmethod
1659 def setUpClass(cls):
1660 try:
1661 os.getrandom(1)
1662 except OSError as exc:
1663 if exc.errno == errno.ENOSYS:
1664 # Python compiled on a more recent Linux version
1665 # than the current Linux kernel
1666 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1667 else:
1668 raise
1669
Victor Stinner9b1f4742016-09-06 16:18:52 -07001670 def test_getrandom_type(self):
1671 data = os.getrandom(16)
1672 self.assertIsInstance(data, bytes)
1673 self.assertEqual(len(data), 16)
1674
1675 def test_getrandom0(self):
1676 empty = os.getrandom(0)
1677 self.assertEqual(empty, b'')
1678
1679 def test_getrandom_random(self):
1680 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1681
1682 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1683 # resource /dev/random
1684
1685 def test_getrandom_nonblock(self):
1686 # The call must not fail. Check also that the flag exists
1687 try:
1688 os.getrandom(1, os.GRND_NONBLOCK)
1689 except BlockingIOError:
1690 # System urandom is not initialized yet
1691 pass
1692
1693 def test_getrandom_value(self):
1694 data1 = os.getrandom(16)
1695 data2 = os.getrandom(16)
1696 self.assertNotEqual(data1, data2)
1697
1698
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001699# os.urandom() doesn't use a file descriptor when it is implemented with the
1700# getentropy() function, the getrandom() function or the getrandom() syscall
1701OS_URANDOM_DONT_USE_FD = (
1702 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1703 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1704 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001705
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001706@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1707 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001708@unittest.skipIf(sys.platform == "vxworks",
1709 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001710class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001711 @unittest.skipUnless(resource, "test requires the resource module")
1712 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001713 # Check urandom() failing when it is not able to open /dev/random.
1714 # We spawn a new process to make the test more robust (if getrlimit()
1715 # failed to restore the file descriptor limit after this, the whole
1716 # test suite would crash; this actually happened on the OS X Tiger
1717 # buildbot).
1718 code = """if 1:
1719 import errno
1720 import os
1721 import resource
1722
1723 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1724 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1725 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001726 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001727 except OSError as e:
1728 assert e.errno == errno.EMFILE, e.errno
1729 else:
1730 raise AssertionError("OSError not raised")
1731 """
1732 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001733
Antoine Pitroue472aea2014-04-26 14:33:03 +02001734 def test_urandom_fd_closed(self):
1735 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1736 # closed.
1737 code = """if 1:
1738 import os
1739 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001740 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001741 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001742 with test.support.SuppressCrashReport():
1743 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001744 sys.stdout.buffer.write(os.urandom(4))
1745 """
1746 rc, out, err = assert_python_ok('-Sc', code)
1747
1748 def test_urandom_fd_reopened(self):
1749 # Issue #21207: urandom() should detect its fd to /dev/urandom
1750 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001751 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1752 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001753
Antoine Pitroue472aea2014-04-26 14:33:03 +02001754 code = """if 1:
1755 import os
1756 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001757 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001758 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001759 with test.support.SuppressCrashReport():
1760 for fd in range(3, 256):
1761 try:
1762 os.close(fd)
1763 except OSError:
1764 pass
1765 else:
1766 # Found the urandom fd (XXX hopefully)
1767 break
1768 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001769 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001770 new_fd = f.fileno()
1771 # Issue #26935: posix allows new_fd and fd to be equal but
1772 # some libc implementations have dup2 return an error in this
1773 # case.
1774 if new_fd != fd:
1775 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001776 sys.stdout.buffer.write(os.urandom(4))
1777 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001778 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001779 rc, out, err = assert_python_ok('-Sc', code)
1780 self.assertEqual(len(out), 8)
1781 self.assertNotEqual(out[0:4], out[4:8])
1782 rc, out2, err2 = assert_python_ok('-Sc', code)
1783 self.assertEqual(len(out2), 8)
1784 self.assertNotEqual(out2, out)
1785
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001786
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001787@contextlib.contextmanager
1788def _execvpe_mockup(defpath=None):
1789 """
1790 Stubs out execv and execve functions when used as context manager.
1791 Records exec calls. The mock execv and execve functions always raise an
1792 exception as they would normally never return.
1793 """
1794 # A list of tuples containing (function name, first arg, args)
1795 # of calls to execv or execve that have been made.
1796 calls = []
1797
1798 def mock_execv(name, *args):
1799 calls.append(('execv', name, args))
1800 raise RuntimeError("execv called")
1801
1802 def mock_execve(name, *args):
1803 calls.append(('execve', name, args))
1804 raise OSError(errno.ENOTDIR, "execve called")
1805
1806 try:
1807 orig_execv = os.execv
1808 orig_execve = os.execve
1809 orig_defpath = os.defpath
1810 os.execv = mock_execv
1811 os.execve = mock_execve
1812 if defpath is not None:
1813 os.defpath = defpath
1814 yield calls
1815 finally:
1816 os.execv = orig_execv
1817 os.execve = orig_execve
1818 os.defpath = orig_defpath
1819
pxinwrf2d7ac72019-05-21 18:46:37 +08001820@unittest.skipUnless(hasattr(os, 'execv'),
1821 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001822class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001823 @unittest.skipIf(USING_LINUXTHREADS,
1824 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001825 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001826 self.assertRaises(OSError, os.execvpe, 'no such app-',
1827 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001828
Steve Dowerbce26262016-11-19 19:17:26 -08001829 def test_execv_with_bad_arglist(self):
1830 self.assertRaises(ValueError, os.execv, 'notepad', ())
1831 self.assertRaises(ValueError, os.execv, 'notepad', [])
1832 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1833 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1834
Thomas Heller6790d602007-08-30 17:15:14 +00001835 def test_execvpe_with_bad_arglist(self):
1836 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001837 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1838 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001839
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001840 @unittest.skipUnless(hasattr(os, '_execvpe'),
1841 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001842 def _test_internal_execvpe(self, test_type):
1843 program_path = os.sep + 'absolutepath'
1844 if test_type is bytes:
1845 program = b'executable'
1846 fullpath = os.path.join(os.fsencode(program_path), program)
1847 native_fullpath = fullpath
1848 arguments = [b'progname', 'arg1', 'arg2']
1849 else:
1850 program = 'executable'
1851 arguments = ['progname', 'arg1', 'arg2']
1852 fullpath = os.path.join(program_path, program)
1853 if os.name != "nt":
1854 native_fullpath = os.fsencode(fullpath)
1855 else:
1856 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001857 env = {'spam': 'beans'}
1858
Victor Stinnerb745a742010-05-18 17:17:23 +00001859 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001860 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001861 self.assertRaises(RuntimeError,
1862 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001863 self.assertEqual(len(calls), 1)
1864 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1865
Victor Stinnerb745a742010-05-18 17:17:23 +00001866 # test os._execvpe() with a relative path:
1867 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001868 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001869 self.assertRaises(OSError,
1870 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001871 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001872 self.assertSequenceEqual(calls[0],
1873 ('execve', native_fullpath, (arguments, env)))
1874
1875 # test os._execvpe() with a relative path:
1876 # os.get_exec_path() reads the 'PATH' variable
1877 with _execvpe_mockup() as calls:
1878 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001879 if test_type is bytes:
1880 env_path[b'PATH'] = program_path
1881 else:
1882 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001883 self.assertRaises(OSError,
1884 os._execvpe, program, arguments, env=env_path)
1885 self.assertEqual(len(calls), 1)
1886 self.assertSequenceEqual(calls[0],
1887 ('execve', native_fullpath, (arguments, env_path)))
1888
1889 def test_internal_execvpe_str(self):
1890 self._test_internal_execvpe(str)
1891 if os.name != "nt":
1892 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001893
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001894 def test_execve_invalid_env(self):
1895 args = [sys.executable, '-c', 'pass']
1896
Ville Skyttä49b27342017-08-03 09:00:59 +03001897 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001898 newenv = os.environ.copy()
1899 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1900 with self.assertRaises(ValueError):
1901 os.execve(args[0], args, newenv)
1902
Ville Skyttä49b27342017-08-03 09:00:59 +03001903 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001904 newenv = os.environ.copy()
1905 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1906 with self.assertRaises(ValueError):
1907 os.execve(args[0], args, newenv)
1908
Ville Skyttä49b27342017-08-03 09:00:59 +03001909 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001910 newenv = os.environ.copy()
1911 newenv["FRUIT=ORANGE"] = "lemon"
1912 with self.assertRaises(ValueError):
1913 os.execve(args[0], args, newenv)
1914
Alexey Izbyshev83460312018-10-20 03:28:22 +03001915 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1916 def test_execve_with_empty_path(self):
1917 # bpo-32890: Check GetLastError() misuse
1918 try:
1919 os.execve('', ['arg'], {})
1920 except OSError as e:
1921 self.assertTrue(e.winerror is None or e.winerror != 0)
1922 else:
1923 self.fail('No OSError raised')
1924
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001925
Serhiy Storchaka43767632013-11-03 21:31:38 +02001926@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001927class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001928 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001929 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001930 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01001931 except FileNotFoundError:
1932 exists = False
1933 except OSError as exc:
1934 exists = True
1935 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08001936 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01001937 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001938 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001939
Thomas Wouters477c8d52006-05-27 19:21:47 +00001940 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001941 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001942
1943 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001944 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001945
1946 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001947 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001948
1949 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001950 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01001951
Hai Shi0c4f0f32020-06-30 21:46:31 +08001952 with open(os_helper.TESTFN, "x") as f:
1953 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001954
1955 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001956 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001957
Thomas Wouters477c8d52006-05-27 19:21:47 +00001958 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001959 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001960
Victor Stinnere77c9742016-03-25 10:28:23 +01001961
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001962class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001963 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001964 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1965 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001966 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001967 def get_single(f):
1968 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001969 if hasattr(os, f):
1970 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001971 return helper
1972 for f in singles:
1973 locals()["test_"+f] = get_single(f)
1974
Benjamin Peterson7522c742009-01-19 21:00:09 +00001975 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001976 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001977 f(os_helper.make_bad_fd(), *args)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001978 except OSError as e:
1979 self.assertEqual(e.errno, errno.EBADF)
1980 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001981 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001982 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001983
Serhiy Storchaka43767632013-11-03 21:31:38 +02001984 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001985 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001986 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001987
Serhiy Storchaka43767632013-11-03 21:31:38 +02001988 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001989 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001990 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02001991 # Make sure none of the descriptors we are about to close are
1992 # currently valid (issue 6542).
1993 for i in range(10):
1994 try: os.fstat(fd+i)
1995 except OSError:
1996 pass
1997 else:
1998 break
1999 if i < 2:
2000 raise unittest.SkipTest(
2001 "Unable to acquire a range of invalid file descriptors")
2002 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002003
Serhiy Storchaka43767632013-11-03 21:31:38 +02002004 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002005 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002007
Serhiy Storchaka43767632013-11-03 21:31:38 +02002008 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002009 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002010 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002011
Serhiy Storchaka43767632013-11-03 21:31:38 +02002012 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002013 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002014 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002015
Serhiy Storchaka43767632013-11-03 21:31:38 +02002016 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002017 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002018 self.check(os.pathconf, "PC_NAME_MAX")
2019 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002020
Serhiy Storchaka43767632013-11-03 21:31:38 +02002021 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002022 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002023 self.check(os.truncate, 0)
2024 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002025
Serhiy Storchaka43767632013-11-03 21:31:38 +02002026 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002027 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002028 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002029
Serhiy Storchaka43767632013-11-03 21:31:38 +02002030 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002031 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002032 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002033
Victor Stinner57ddf782014-01-08 15:21:28 +01002034 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2035 def test_readv(self):
2036 buf = bytearray(10)
2037 self.check(os.readv, [buf])
2038
Serhiy Storchaka43767632013-11-03 21:31:38 +02002039 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002040 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002041 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002042
Serhiy Storchaka43767632013-11-03 21:31:38 +02002043 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002044 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002045 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002046
Victor Stinner57ddf782014-01-08 15:21:28 +01002047 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2048 def test_writev(self):
2049 self.check(os.writev, [b'abc'])
2050
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002051 def test_inheritable(self):
2052 self.check(os.get_inheritable)
2053 self.check(os.set_inheritable, True)
2054
2055 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2056 'needs os.get_blocking() and os.set_blocking()')
2057 def test_blocking(self):
2058 self.check(os.get_blocking)
2059 self.check(os.set_blocking, True)
2060
Brian Curtin1b9df392010-11-24 20:24:31 +00002061
2062class LinkTests(unittest.TestCase):
2063 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002064 self.file1 = os_helper.TESTFN
2065 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002066
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002067 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002068 for file in (self.file1, self.file2):
2069 if os.path.exists(file):
2070 os.unlink(file)
2071
Brian Curtin1b9df392010-11-24 20:24:31 +00002072 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002073 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002074
xdegaye6a55d092017-11-12 17:57:04 +01002075 try:
2076 os.link(file1, file2)
2077 except PermissionError as e:
2078 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002079 with open(file1, "r") as f1, open(file2, "r") as f2:
2080 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2081
2082 def test_link(self):
2083 self._test_link(self.file1, self.file2)
2084
2085 def test_link_bytes(self):
2086 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2087 bytes(self.file2, sys.getfilesystemencoding()))
2088
Brian Curtinf498b752010-11-30 15:54:04 +00002089 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002090 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002091 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002092 except UnicodeError:
2093 raise unittest.SkipTest("Unable to encode for this platform.")
2094
Brian Curtinf498b752010-11-30 15:54:04 +00002095 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002096 self.file2 = self.file1 + "2"
2097 self._test_link(self.file1, self.file2)
2098
Serhiy Storchaka43767632013-11-03 21:31:38 +02002099@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2100class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002101 # uid_t and gid_t are 32-bit unsigned integers on Linux
2102 UID_OVERFLOW = (1 << 32)
2103 GID_OVERFLOW = (1 << 32)
2104
Serhiy Storchaka43767632013-11-03 21:31:38 +02002105 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2106 def test_setuid(self):
2107 if os.getuid() != 0:
2108 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002109 self.assertRaises(TypeError, os.setuid, 'not an int')
2110 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002111
Serhiy Storchaka43767632013-11-03 21:31:38 +02002112 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2113 def test_setgid(self):
2114 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2115 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002116 self.assertRaises(TypeError, os.setgid, 'not an int')
2117 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002118
Serhiy Storchaka43767632013-11-03 21:31:38 +02002119 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2120 def test_seteuid(self):
2121 if os.getuid() != 0:
2122 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002123 self.assertRaises(TypeError, os.setegid, 'not an int')
2124 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002125
Serhiy Storchaka43767632013-11-03 21:31:38 +02002126 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2127 def test_setegid(self):
2128 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2129 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002130 self.assertRaises(TypeError, os.setegid, 'not an int')
2131 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002132
Serhiy Storchaka43767632013-11-03 21:31:38 +02002133 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2134 def test_setreuid(self):
2135 if os.getuid() != 0:
2136 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002137 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2138 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2139 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2140 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002141
Serhiy Storchaka43767632013-11-03 21:31:38 +02002142 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2143 def test_setreuid_neg1(self):
2144 # Needs to accept -1. We run this in a subprocess to avoid
2145 # altering the test runner's process state (issue8045).
2146 subprocess.check_call([
2147 sys.executable, '-c',
2148 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002149
Serhiy Storchaka43767632013-11-03 21:31:38 +02002150 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2151 def test_setregid(self):
2152 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2153 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002154 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2155 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2156 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2157 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002158
Serhiy Storchaka43767632013-11-03 21:31:38 +02002159 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2160 def test_setregid_neg1(self):
2161 # Needs to accept -1. We run this in a subprocess to avoid
2162 # altering the test runner's process state (issue8045).
2163 subprocess.check_call([
2164 sys.executable, '-c',
2165 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002166
Serhiy Storchaka43767632013-11-03 21:31:38 +02002167@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2168class Pep383Tests(unittest.TestCase):
2169 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002170 if os_helper.TESTFN_UNENCODABLE:
2171 self.dir = os_helper.TESTFN_UNENCODABLE
2172 elif os_helper.TESTFN_NONASCII:
2173 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002174 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002175 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002176 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002177
Serhiy Storchaka43767632013-11-03 21:31:38 +02002178 bytesfn = []
2179 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002180 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002181 fn = os.fsencode(fn)
2182 except UnicodeEncodeError:
2183 return
2184 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002185 add_filename(os_helper.TESTFN_UNICODE)
2186 if os_helper.TESTFN_UNENCODABLE:
2187 add_filename(os_helper.TESTFN_UNENCODABLE)
2188 if os_helper.TESTFN_NONASCII:
2189 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002190 if not bytesfn:
2191 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002192
Serhiy Storchaka43767632013-11-03 21:31:38 +02002193 self.unicodefn = set()
2194 os.mkdir(self.dir)
2195 try:
2196 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002197 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002198 fn = os.fsdecode(fn)
2199 if fn in self.unicodefn:
2200 raise ValueError("duplicate filename")
2201 self.unicodefn.add(fn)
2202 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002203 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002204 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002205
Serhiy Storchaka43767632013-11-03 21:31:38 +02002206 def tearDown(self):
2207 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002208
Serhiy Storchaka43767632013-11-03 21:31:38 +02002209 def test_listdir(self):
2210 expected = self.unicodefn
2211 found = set(os.listdir(self.dir))
2212 self.assertEqual(found, expected)
2213 # test listdir without arguments
2214 current_directory = os.getcwd()
2215 try:
2216 os.chdir(os.sep)
2217 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2218 finally:
2219 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002220
Serhiy Storchaka43767632013-11-03 21:31:38 +02002221 def test_open(self):
2222 for fn in self.unicodefn:
2223 f = open(os.path.join(self.dir, fn), 'rb')
2224 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002225
Serhiy Storchaka43767632013-11-03 21:31:38 +02002226 @unittest.skipUnless(hasattr(os, 'statvfs'),
2227 "need os.statvfs()")
2228 def test_statvfs(self):
2229 # issue #9645
2230 for fn in self.unicodefn:
2231 # should not fail with file not found error
2232 fullname = os.path.join(self.dir, fn)
2233 os.statvfs(fullname)
2234
2235 def test_stat(self):
2236 for fn in self.unicodefn:
2237 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002238
Brian Curtineb24d742010-04-12 17:16:38 +00002239@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2240class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002241 def _kill(self, sig):
2242 # Start sys.executable as a subprocess and communicate from the
2243 # subprocess to the parent that the interpreter is ready. When it
2244 # becomes ready, send *sig* via os.kill to the subprocess and check
2245 # that the return code is equal to *sig*.
2246 import ctypes
2247 from ctypes import wintypes
2248 import msvcrt
2249
2250 # Since we can't access the contents of the process' stdout until the
2251 # process has exited, use PeekNamedPipe to see what's inside stdout
2252 # without waiting. This is done so we can tell that the interpreter
2253 # is started and running at a point where it could handle a signal.
2254 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2255 PeekNamedPipe.restype = wintypes.BOOL
2256 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2257 ctypes.POINTER(ctypes.c_char), # stdout buf
2258 wintypes.DWORD, # Buffer size
2259 ctypes.POINTER(wintypes.DWORD), # bytes read
2260 ctypes.POINTER(wintypes.DWORD), # bytes avail
2261 ctypes.POINTER(wintypes.DWORD)) # bytes left
2262 msg = "running"
2263 proc = subprocess.Popen([sys.executable, "-c",
2264 "import sys;"
2265 "sys.stdout.write('{}');"
2266 "sys.stdout.flush();"
2267 "input()".format(msg)],
2268 stdout=subprocess.PIPE,
2269 stderr=subprocess.PIPE,
2270 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002271 self.addCleanup(proc.stdout.close)
2272 self.addCleanup(proc.stderr.close)
2273 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002274
2275 count, max = 0, 100
2276 while count < max and proc.poll() is None:
2277 # Create a string buffer to store the result of stdout from the pipe
2278 buf = ctypes.create_string_buffer(len(msg))
2279 # Obtain the text currently in proc.stdout
2280 # Bytes read/avail/left are left as NULL and unused
2281 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2282 buf, ctypes.sizeof(buf), None, None, None)
2283 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2284 if buf.value:
2285 self.assertEqual(msg, buf.value.decode())
2286 break
2287 time.sleep(0.1)
2288 count += 1
2289 else:
2290 self.fail("Did not receive communication from the subprocess")
2291
Brian Curtineb24d742010-04-12 17:16:38 +00002292 os.kill(proc.pid, sig)
2293 self.assertEqual(proc.wait(), sig)
2294
2295 def test_kill_sigterm(self):
2296 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002297 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002298
2299 def test_kill_int(self):
2300 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002301 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002302
2303 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002304 tagname = "test_os_%s" % uuid.uuid1()
2305 m = mmap.mmap(-1, 1, tagname)
2306 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002307 # Run a script which has console control handling enabled.
2308 proc = subprocess.Popen([sys.executable,
2309 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002310 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002311 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2312 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002313 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002314 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002315 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002316 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002317 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002318 count += 1
2319 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002320 # Forcefully kill the process if we weren't able to signal it.
2321 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002322 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002323 os.kill(proc.pid, event)
2324 # proc.send_signal(event) could also be done here.
2325 # Allow time for the signal to be passed and the process to exit.
2326 time.sleep(0.5)
2327 if not proc.poll():
2328 # Forcefully kill the process if we weren't able to signal it.
2329 os.kill(proc.pid, signal.SIGINT)
2330 self.fail("subprocess did not stop on {}".format(name))
2331
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002332 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002333 def test_CTRL_C_EVENT(self):
2334 from ctypes import wintypes
2335 import ctypes
2336
2337 # Make a NULL value by creating a pointer with no argument.
2338 NULL = ctypes.POINTER(ctypes.c_int)()
2339 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2340 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2341 wintypes.BOOL)
2342 SetConsoleCtrlHandler.restype = wintypes.BOOL
2343
2344 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002345 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002346 # by subprocesses.
2347 SetConsoleCtrlHandler(NULL, 0)
2348
2349 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2350
2351 def test_CTRL_BREAK_EVENT(self):
2352 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2353
2354
Brian Curtind40e6f72010-07-08 21:39:08 +00002355@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002356class Win32ListdirTests(unittest.TestCase):
2357 """Test listdir on Windows."""
2358
2359 def setUp(self):
2360 self.created_paths = []
2361 for i in range(2):
2362 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002363 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002364 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002365 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002366 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002367 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002368 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2369 self.created_paths.extend([dir_name, file_name])
2370 self.created_paths.sort()
2371
2372 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002373 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002374
2375 def test_listdir_no_extended_path(self):
2376 """Test when the path is not an "extended" path."""
2377 # unicode
2378 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002379 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002380 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002381
Tim Golden781bbeb2013-10-25 20:24:06 +01002382 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002383 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002384 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002385 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002386
2387 def test_listdir_extended_path(self):
2388 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002389 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002390 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002391 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002392 self.assertEqual(
2393 sorted(os.listdir(path)),
2394 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002395
Tim Golden781bbeb2013-10-25 20:24:06 +01002396 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002397 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002398 self.assertEqual(
2399 sorted(os.listdir(path)),
2400 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002401
2402
Berker Peksage0b5b202018-08-15 13:03:41 +03002403@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2404class ReadlinkTests(unittest.TestCase):
2405 filelink = 'readlinktest'
2406 filelink_target = os.path.abspath(__file__)
2407 filelinkb = os.fsencode(filelink)
2408 filelinkb_target = os.fsencode(filelink_target)
2409
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002410 def assertPathEqual(self, left, right):
2411 left = os.path.normcase(left)
2412 right = os.path.normcase(right)
2413 if sys.platform == 'win32':
2414 # Bad practice to blindly strip the prefix as it may be required to
2415 # correctly refer to the file, but we're only comparing paths here.
2416 has_prefix = lambda p: p.startswith(
2417 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2418 if has_prefix(left):
2419 left = left[4:]
2420 if has_prefix(right):
2421 right = right[4:]
2422 self.assertEqual(left, right)
2423
Berker Peksage0b5b202018-08-15 13:03:41 +03002424 def setUp(self):
2425 self.assertTrue(os.path.exists(self.filelink_target))
2426 self.assertTrue(os.path.exists(self.filelinkb_target))
2427 self.assertFalse(os.path.exists(self.filelink))
2428 self.assertFalse(os.path.exists(self.filelinkb))
2429
2430 def test_not_symlink(self):
2431 filelink_target = FakePath(self.filelink_target)
2432 self.assertRaises(OSError, os.readlink, self.filelink_target)
2433 self.assertRaises(OSError, os.readlink, filelink_target)
2434
2435 def test_missing_link(self):
2436 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2437 self.assertRaises(FileNotFoundError, os.readlink,
2438 FakePath('missing-link'))
2439
Hai Shi0c4f0f32020-06-30 21:46:31 +08002440 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002441 def test_pathlike(self):
2442 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002443 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002444 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002445 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002446
Hai Shi0c4f0f32020-06-30 21:46:31 +08002447 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002448 def test_pathlike_bytes(self):
2449 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002450 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002451 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002452 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002453 self.assertIsInstance(path, bytes)
2454
Hai Shi0c4f0f32020-06-30 21:46:31 +08002455 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002456 def test_bytes(self):
2457 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002458 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002459 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002460 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002461 self.assertIsInstance(path, bytes)
2462
2463
Tim Golden781bbeb2013-10-25 20:24:06 +01002464@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002465@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002466class Win32SymlinkTests(unittest.TestCase):
2467 filelink = 'filelinktest'
2468 filelink_target = os.path.abspath(__file__)
2469 dirlink = 'dirlinktest'
2470 dirlink_target = os.path.dirname(filelink_target)
2471 missing_link = 'missing link'
2472
2473 def setUp(self):
2474 assert os.path.exists(self.dirlink_target)
2475 assert os.path.exists(self.filelink_target)
2476 assert not os.path.exists(self.dirlink)
2477 assert not os.path.exists(self.filelink)
2478 assert not os.path.exists(self.missing_link)
2479
2480 def tearDown(self):
2481 if os.path.exists(self.filelink):
2482 os.remove(self.filelink)
2483 if os.path.exists(self.dirlink):
2484 os.rmdir(self.dirlink)
2485 if os.path.lexists(self.missing_link):
2486 os.remove(self.missing_link)
2487
2488 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002489 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002490 self.assertTrue(os.path.exists(self.dirlink))
2491 self.assertTrue(os.path.isdir(self.dirlink))
2492 self.assertTrue(os.path.islink(self.dirlink))
2493 self.check_stat(self.dirlink, self.dirlink_target)
2494
2495 def test_file_link(self):
2496 os.symlink(self.filelink_target, self.filelink)
2497 self.assertTrue(os.path.exists(self.filelink))
2498 self.assertTrue(os.path.isfile(self.filelink))
2499 self.assertTrue(os.path.islink(self.filelink))
2500 self.check_stat(self.filelink, self.filelink_target)
2501
2502 def _create_missing_dir_link(self):
2503 'Create a "directory" link to a non-existent target'
2504 linkname = self.missing_link
2505 if os.path.lexists(linkname):
2506 os.remove(linkname)
2507 target = r'c:\\target does not exist.29r3c740'
2508 assert not os.path.exists(target)
2509 target_is_dir = True
2510 os.symlink(target, linkname, target_is_dir)
2511
2512 def test_remove_directory_link_to_missing_target(self):
2513 self._create_missing_dir_link()
2514 # For compatibility with Unix, os.remove will check the
2515 # directory status and call RemoveDirectory if the symlink
2516 # was created with target_is_dir==True.
2517 os.remove(self.missing_link)
2518
Brian Curtind40e6f72010-07-08 21:39:08 +00002519 def test_isdir_on_directory_link_to_missing_target(self):
2520 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002521 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002522
Brian Curtind40e6f72010-07-08 21:39:08 +00002523 def test_rmdir_on_directory_link_to_missing_target(self):
2524 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002525 os.rmdir(self.missing_link)
2526
2527 def check_stat(self, link, target):
2528 self.assertEqual(os.stat(link), os.stat(target))
2529 self.assertNotEqual(os.lstat(link), os.stat(link))
2530
Brian Curtind25aef52011-06-13 15:16:04 -05002531 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002532 self.assertEqual(os.stat(bytes_link), os.stat(target))
2533 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002534
2535 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002536 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002537 level2 = os.path.join(level1, "level2")
2538 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002539 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002540
2541 os.mkdir(level1)
2542 os.mkdir(level2)
2543 os.mkdir(level3)
2544
2545 file1 = os.path.abspath(os.path.join(level1, "file1"))
2546 create_file(file1)
2547
2548 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002549 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002550 os.chdir(level2)
2551 link = os.path.join(level2, "link")
2552 os.symlink(os.path.relpath(file1), "link")
2553 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002554
Victor Stinnerae39d232016-03-24 17:12:55 +01002555 # Check os.stat calls from the same dir as the link
2556 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002557
Victor Stinnerae39d232016-03-24 17:12:55 +01002558 # Check os.stat calls from a dir below the link
2559 os.chdir(level1)
2560 self.assertEqual(os.stat(file1),
2561 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002562
Victor Stinnerae39d232016-03-24 17:12:55 +01002563 # Check os.stat calls from a dir above the link
2564 os.chdir(level3)
2565 self.assertEqual(os.stat(file1),
2566 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002567 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002568 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002569
SSE43c34aad2018-02-13 00:10:35 +07002570 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2571 and os.path.exists(r'C:\ProgramData'),
2572 'Test directories not found')
2573 def test_29248(self):
2574 # os.symlink() calls CreateSymbolicLink, which creates
2575 # the reparse data buffer with the print name stored
2576 # first, so the offset is always 0. CreateSymbolicLink
2577 # stores the "PrintName" DOS path (e.g. "C:\") first,
2578 # with an offset of 0, followed by the "SubstituteName"
2579 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2580 # the other hand, seems to have been created manually
2581 # with an inverted order.
2582 target = os.readlink(r'C:\Users\All Users')
2583 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2584
Steve Dower6921e732018-03-05 14:26:08 -08002585 def test_buffer_overflow(self):
2586 # Older versions would have a buffer overflow when detecting
2587 # whether a link source was a directory. This test ensures we
2588 # no longer crash, but does not otherwise validate the behavior
2589 segment = 'X' * 27
2590 path = os.path.join(*[segment] * 10)
2591 test_cases = [
2592 # overflow with absolute src
2593 ('\\' + path, segment),
2594 # overflow dest with relative src
2595 (segment, path),
2596 # overflow when joining src
2597 (path[:180], path[:180]),
2598 ]
2599 for src, dest in test_cases:
2600 try:
2601 os.symlink(src, dest)
2602 except FileNotFoundError:
2603 pass
2604 else:
2605 try:
2606 os.remove(dest)
2607 except OSError:
2608 pass
2609 # Also test with bytes, since that is a separate code path.
2610 try:
2611 os.symlink(os.fsencode(src), os.fsencode(dest))
2612 except FileNotFoundError:
2613 pass
2614 else:
2615 try:
2616 os.remove(dest)
2617 except OSError:
2618 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002619
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002620 def test_appexeclink(self):
2621 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002622 if not os.path.isdir(root):
2623 self.skipTest("test requires a WindowsApps directory")
2624
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002625 aliases = [os.path.join(root, a)
2626 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2627
2628 for alias in aliases:
2629 if support.verbose:
2630 print()
2631 print("Testing with", alias)
2632 st = os.lstat(alias)
2633 self.assertEqual(st, os.stat(alias))
2634 self.assertFalse(stat.S_ISLNK(st.st_mode))
2635 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2636 # testing the first one we see is sufficient
2637 break
2638 else:
2639 self.skipTest("test requires an app execution alias")
2640
Tim Golden0321cf22014-05-05 19:46:17 +01002641@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2642class Win32JunctionTests(unittest.TestCase):
2643 junction = 'junctiontest'
2644 junction_target = os.path.dirname(os.path.abspath(__file__))
2645
2646 def setUp(self):
2647 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002648 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002649
2650 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002651 if os.path.lexists(self.junction):
2652 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002653
2654 def test_create_junction(self):
2655 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002656 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002657 self.assertTrue(os.path.exists(self.junction))
2658 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002659 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2660 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002661
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002662 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002663 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002664 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2665 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002666
2667 def test_unlink_removes_junction(self):
2668 _winapi.CreateJunction(self.junction_target, self.junction)
2669 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002670 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002671
2672 os.unlink(self.junction)
2673 self.assertFalse(os.path.exists(self.junction))
2674
Mark Becwarb82bfac2019-02-02 16:08:23 -05002675@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2676class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002677 def test_getfinalpathname_handles(self):
Hai Shid94af3f2020-08-08 17:32:41 +08002678 nt = import_helper.import_module('nt')
2679 ctypes = import_helper.import_module('ctypes')
Berker Peksag6ef726a2019-04-22 18:46:28 +03002680 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002681
2682 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2683 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2684
2685 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2686 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2687 ctypes.wintypes.LPDWORD)
2688
2689 # This is a pseudo-handle that doesn't need to be closed
2690 hproc = kernel.GetCurrentProcess()
2691
2692 handle_count = ctypes.wintypes.DWORD()
2693 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2694 self.assertEqual(1, ok)
2695
2696 before_count = handle_count.value
2697
2698 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002699 filenames = [
2700 r'\\?\C:',
2701 r'\\?\NUL',
2702 r'\\?\CONIN',
2703 __file__,
2704 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002705
Berker Peksag6ef726a2019-04-22 18:46:28 +03002706 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002707 for name in filenames:
2708 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002709 nt._getfinalpathname(name)
2710 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002711 # Failure is expected
2712 pass
2713 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002714 os.stat(name)
2715 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002716 pass
2717
2718 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2719 self.assertEqual(1, ok)
2720
2721 handle_delta = handle_count.value - before_count
2722
2723 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002724
Hai Shi0c4f0f32020-06-30 21:46:31 +08002725@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002726class NonLocalSymlinkTests(unittest.TestCase):
2727
2728 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002729 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002730 Create this structure:
2731
2732 base
2733 \___ some_dir
2734 """
2735 os.makedirs('base/some_dir')
2736
2737 def tearDown(self):
2738 shutil.rmtree('base')
2739
2740 def test_directory_link_nonlocal(self):
2741 """
2742 The symlink target should resolve relative to the link, not relative
2743 to the current directory.
2744
2745 Then, link base/some_link -> base/some_dir and ensure that some_link
2746 is resolved as a directory.
2747
2748 In issue13772, it was discovered that directory detection failed if
2749 the symlink target was not specified relative to the current
2750 directory, which was a defect in the implementation.
2751 """
2752 src = os.path.join('base', 'some_link')
2753 os.symlink('some_dir', src)
2754 assert os.path.isdir(src)
2755
2756
Victor Stinnere8d51452010-08-19 01:05:19 +00002757class FSEncodingTests(unittest.TestCase):
2758 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002759 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2760 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002761
Victor Stinnere8d51452010-08-19 01:05:19 +00002762 def test_identity(self):
2763 # assert fsdecode(fsencode(x)) == x
2764 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2765 try:
2766 bytesfn = os.fsencode(fn)
2767 except UnicodeEncodeError:
2768 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002769 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002770
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002771
Brett Cannonefb00c02012-02-29 18:31:31 -05002772
2773class DeviceEncodingTests(unittest.TestCase):
2774
2775 def test_bad_fd(self):
2776 # Return None when an fd doesn't actually exist.
2777 self.assertIsNone(os.device_encoding(123456))
2778
Paul Monson62dfd7d2019-04-25 11:36:45 -07002779 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002780 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002781 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002782 def test_device_encoding(self):
2783 encoding = os.device_encoding(0)
2784 self.assertIsNotNone(encoding)
2785 self.assertTrue(codecs.lookup(encoding))
2786
2787
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002788class PidTests(unittest.TestCase):
2789 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2790 def test_getppid(self):
2791 p = subprocess.Popen([sys.executable, '-c',
2792 'import os; print(os.getppid())'],
2793 stdout=subprocess.PIPE)
2794 stdout, _ = p.communicate()
2795 # We are the parent of our subprocess
2796 self.assertEqual(int(stdout), os.getpid())
2797
Victor Stinner9bee32b2020-04-22 16:30:35 +02002798 def check_waitpid(self, code, exitcode, callback=None):
2799 if sys.platform == 'win32':
2800 # On Windows, os.spawnv() simply joins arguments with spaces:
2801 # arguments need to be quoted
2802 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2803 else:
2804 args = [sys.executable, '-c', code]
2805 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002806
Victor Stinner9bee32b2020-04-22 16:30:35 +02002807 if callback is not None:
2808 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002809
Victor Stinner9bee32b2020-04-22 16:30:35 +02002810 # don't use support.wait_process() to test directly os.waitpid()
2811 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002812 pid2, status = os.waitpid(pid, 0)
2813 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2814 self.assertEqual(pid2, pid)
2815
Victor Stinner9bee32b2020-04-22 16:30:35 +02002816 def test_waitpid(self):
2817 self.check_waitpid(code='pass', exitcode=0)
2818
2819 def test_waitstatus_to_exitcode(self):
2820 exitcode = 23
2821 code = f'import sys; sys.exit({exitcode})'
2822 self.check_waitpid(code, exitcode=exitcode)
2823
2824 with self.assertRaises(TypeError):
2825 os.waitstatus_to_exitcode(0.0)
2826
2827 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2828 def test_waitpid_windows(self):
2829 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2830 # with exit code larger than INT_MAX.
2831 STATUS_CONTROL_C_EXIT = 0xC000013A
2832 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2833 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2834
2835 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2836 def test_waitstatus_to_exitcode_windows(self):
2837 max_exitcode = 2 ** 32 - 1
2838 for exitcode in (0, 1, 5, max_exitcode):
2839 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2840 exitcode)
2841
2842 # invalid values
2843 with self.assertRaises(ValueError):
2844 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2845 with self.assertRaises(OverflowError):
2846 os.waitstatus_to_exitcode(-1)
2847
Victor Stinner65a796e2020-04-01 18:49:29 +02002848 # Skip the test on Windows
2849 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2850 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002851 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002852 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002853
Victor Stinner9bee32b2020-04-22 16:30:35 +02002854 def kill_process(pid):
2855 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002856
Victor Stinner9bee32b2020-04-22 16:30:35 +02002857 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002858
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002859
Victor Stinner4659ccf2016-09-14 10:57:00 +02002860class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002861 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002862 self.exitcode = 17
2863
Hai Shi0c4f0f32020-06-30 21:46:31 +08002864 filename = os_helper.TESTFN
2865 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002866
2867 if not with_env:
2868 code = 'import sys; sys.exit(%s)' % self.exitcode
2869 else:
2870 self.env = dict(os.environ)
2871 # create an unique key
2872 self.key = str(uuid.uuid4())
2873 self.env[self.key] = self.key
2874 # read the variable from os.environ to check that it exists
2875 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2876 % (self.key, self.exitcode))
2877
2878 with open(filename, "w") as fp:
2879 fp.write(code)
2880
Berker Peksag81816462016-09-15 20:19:47 +03002881 args = [sys.executable, filename]
2882 if use_bytes:
2883 args = [os.fsencode(a) for a in args]
2884 self.env = {os.fsencode(k): os.fsencode(v)
2885 for k, v in self.env.items()}
2886
2887 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002888
Berker Peksag4af23d72016-09-15 20:32:44 +03002889 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002890 def test_spawnl(self):
2891 args = self.create_args()
2892 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2893 self.assertEqual(exitcode, self.exitcode)
2894
Berker Peksag4af23d72016-09-15 20:32:44 +03002895 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002896 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002897 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002898 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2899 self.assertEqual(exitcode, self.exitcode)
2900
Berker Peksag4af23d72016-09-15 20:32:44 +03002901 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002902 def test_spawnlp(self):
2903 args = self.create_args()
2904 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2905 self.assertEqual(exitcode, self.exitcode)
2906
Berker Peksag4af23d72016-09-15 20:32:44 +03002907 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002908 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002909 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002910 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2911 self.assertEqual(exitcode, self.exitcode)
2912
Berker Peksag4af23d72016-09-15 20:32:44 +03002913 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002914 def test_spawnv(self):
2915 args = self.create_args()
2916 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2917 self.assertEqual(exitcode, self.exitcode)
2918
Victor Stinner9bee32b2020-04-22 16:30:35 +02002919 # Test for PyUnicode_FSConverter()
2920 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
2921 self.assertEqual(exitcode, self.exitcode)
2922
Berker Peksag4af23d72016-09-15 20:32:44 +03002923 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002924 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002925 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002926 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2927 self.assertEqual(exitcode, self.exitcode)
2928
Berker Peksag4af23d72016-09-15 20:32:44 +03002929 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002930 def test_spawnvp(self):
2931 args = self.create_args()
2932 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2933 self.assertEqual(exitcode, self.exitcode)
2934
Berker Peksag4af23d72016-09-15 20:32:44 +03002935 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002936 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002937 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002938 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2939 self.assertEqual(exitcode, self.exitcode)
2940
Berker Peksag4af23d72016-09-15 20:32:44 +03002941 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002942 def test_nowait(self):
2943 args = self.create_args()
2944 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02002945 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002946
Berker Peksag4af23d72016-09-15 20:32:44 +03002947 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002948 def test_spawnve_bytes(self):
2949 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2950 args = self.create_args(with_env=True, use_bytes=True)
2951 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2952 self.assertEqual(exitcode, self.exitcode)
2953
Steve Dower859fd7b2016-11-19 18:53:19 -08002954 @requires_os_func('spawnl')
2955 def test_spawnl_noargs(self):
2956 args = self.create_args()
2957 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002958 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002959
2960 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002961 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002962 args = self.create_args()
2963 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002964 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002965
2966 @requires_os_func('spawnv')
2967 def test_spawnv_noargs(self):
2968 args = self.create_args()
2969 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2970 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002971 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2972 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002973
2974 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002975 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002976 args = self.create_args()
2977 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2978 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002979 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2980 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002981
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002982 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002983 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002984
Ville Skyttä49b27342017-08-03 09:00:59 +03002985 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002986 newenv = os.environ.copy()
2987 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2988 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002989 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002990 except ValueError:
2991 pass
2992 else:
2993 self.assertEqual(exitcode, 127)
2994
Ville Skyttä49b27342017-08-03 09:00:59 +03002995 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002996 newenv = os.environ.copy()
2997 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2998 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002999 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003000 except ValueError:
3001 pass
3002 else:
3003 self.assertEqual(exitcode, 127)
3004
Ville Skyttä49b27342017-08-03 09:00:59 +03003005 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003006 newenv = os.environ.copy()
3007 newenv["FRUIT=ORANGE"] = "lemon"
3008 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003009 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003010 except ValueError:
3011 pass
3012 else:
3013 self.assertEqual(exitcode, 127)
3014
Ville Skyttä49b27342017-08-03 09:00:59 +03003015 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003016 filename = os_helper.TESTFN
3017 self.addCleanup(os_helper.unlink, filename)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003018 with open(filename, "w") as fp:
3019 fp.write('import sys, os\n'
3020 'if os.getenv("FRUIT") != "orange=lemon":\n'
3021 ' raise AssertionError')
3022 args = [sys.executable, filename]
3023 newenv = os.environ.copy()
3024 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003025 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003026 self.assertEqual(exitcode, 0)
3027
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003028 @requires_os_func('spawnve')
3029 def test_spawnve_invalid_env(self):
3030 self._test_invalid_env(os.spawnve)
3031
3032 @requires_os_func('spawnvpe')
3033 def test_spawnvpe_invalid_env(self):
3034 self._test_invalid_env(os.spawnvpe)
3035
Serhiy Storchaka77703942017-06-25 07:33:01 +03003036
Brian Curtin0151b8e2010-09-24 13:43:43 +00003037# The introduction of this TestCase caused at least two different errors on
3038# *nix buildbots. Temporarily skip this to let the buildbots move along.
3039@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003040@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3041class LoginTests(unittest.TestCase):
3042 def test_getlogin(self):
3043 user_name = os.getlogin()
3044 self.assertNotEqual(len(user_name), 0)
3045
3046
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003047@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3048 "needs os.getpriority and os.setpriority")
3049class ProgramPriorityTests(unittest.TestCase):
3050 """Tests for os.getpriority() and os.setpriority()."""
3051
3052 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003053
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003054 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3055 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3056 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003057 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3058 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003059 raise unittest.SkipTest("unable to reliably test setpriority "
3060 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003061 else:
3062 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003063 finally:
3064 try:
3065 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3066 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003067 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003068 raise
3069
3070
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003071class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003072
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003073 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003074
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003075 def __init__(self, conn):
3076 asynchat.async_chat.__init__(self, conn)
3077 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003078 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003079 self.closed = False
3080 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003081
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003082 def handle_read(self):
3083 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003084 if self.accumulate:
3085 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003086
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003087 def get_data(self):
3088 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003089
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003090 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003091 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003092 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003093
3094 def handle_error(self):
3095 raise
3096
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003097 def __init__(self, address):
3098 threading.Thread.__init__(self)
3099 asyncore.dispatcher.__init__(self)
3100 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3101 self.bind(address)
3102 self.listen(5)
3103 self.host, self.port = self.socket.getsockname()[:2]
3104 self.handler_instance = None
3105 self._active = False
3106 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108 # --- public API
3109
3110 @property
3111 def running(self):
3112 return self._active
3113
3114 def start(self):
3115 assert not self.running
3116 self.__flag = threading.Event()
3117 threading.Thread.start(self)
3118 self.__flag.wait()
3119
3120 def stop(self):
3121 assert self.running
3122 self._active = False
3123 self.join()
3124
3125 def wait(self):
3126 # wait for handler connection to be closed, then stop the server
3127 while not getattr(self.handler_instance, "closed", False):
3128 time.sleep(0.001)
3129 self.stop()
3130
3131 # --- internals
3132
3133 def run(self):
3134 self._active = True
3135 self.__flag.set()
3136 while self._active and asyncore.socket_map:
3137 self._active_lock.acquire()
3138 asyncore.loop(timeout=0.001, count=1)
3139 self._active_lock.release()
3140 asyncore.close_all()
3141
3142 def handle_accept(self):
3143 conn, addr = self.accept()
3144 self.handler_instance = self.Handler(conn)
3145
3146 def handle_connect(self):
3147 self.close()
3148 handle_read = handle_connect
3149
3150 def writable(self):
3151 return 0
3152
3153 def handle_error(self):
3154 raise
3155
3156
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003157@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3158class TestSendfile(unittest.TestCase):
3159
Victor Stinner8c663fd2017-11-08 14:44:44 -08003160 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003161 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003162 not sys.platform.startswith("solaris") and \
3163 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003164 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3165 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003166 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3167 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003168
3169 @classmethod
3170 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003171 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003172 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003173
3174 @classmethod
3175 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003176 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003177 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003178
3179 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003180 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003181 self.server.start()
3182 self.client = socket.socket()
3183 self.client.connect((self.server.host, self.server.port))
3184 self.client.settimeout(1)
3185 # synchronize by waiting for "220 ready" response
3186 self.client.recv(1024)
3187 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003188 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003189 self.fileno = self.file.fileno()
3190
3191 def tearDown(self):
3192 self.file.close()
3193 self.client.close()
3194 if self.server.running:
3195 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003196 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003197
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003198 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003199 """A higher level wrapper representing how an application is
3200 supposed to use sendfile().
3201 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003202 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003203 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003204 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003205 except OSError as err:
3206 if err.errno == errno.ECONNRESET:
3207 # disconnected
3208 raise
3209 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3210 # we have to retry send data
3211 continue
3212 else:
3213 raise
3214
3215 def test_send_whole_file(self):
3216 # normal send
3217 total_sent = 0
3218 offset = 0
3219 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003220 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003221 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3222 if sent == 0:
3223 break
3224 offset += sent
3225 total_sent += sent
3226 self.assertTrue(sent <= nbytes)
3227 self.assertEqual(offset, total_sent)
3228
3229 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003230 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003231 self.client.close()
3232 self.server.wait()
3233 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003234 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003235 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003236
3237 def test_send_at_certain_offset(self):
3238 # start sending a file at a certain offset
3239 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003240 offset = len(self.DATA) // 2
3241 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003242 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003243 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003244 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3245 if sent == 0:
3246 break
3247 offset += sent
3248 total_sent += sent
3249 self.assertTrue(sent <= nbytes)
3250
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003251 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003252 self.client.close()
3253 self.server.wait()
3254 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003255 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003256 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003257 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003258 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003259
3260 def test_offset_overflow(self):
3261 # specify an offset > file size
3262 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003263 try:
3264 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3265 except OSError as e:
3266 # Solaris can raise EINVAL if offset >= file length, ignore.
3267 if e.errno != errno.EINVAL:
3268 raise
3269 else:
3270 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003271 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003272 self.client.close()
3273 self.server.wait()
3274 data = self.server.handler_instance.get_data()
3275 self.assertEqual(data, b'')
3276
3277 def test_invalid_offset(self):
3278 with self.assertRaises(OSError) as cm:
3279 os.sendfile(self.sockno, self.fileno, -1, 4096)
3280 self.assertEqual(cm.exception.errno, errno.EINVAL)
3281
Martin Panterbf19d162015-09-09 01:01:13 +00003282 def test_keywords(self):
3283 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003284 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3285 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003286 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003287 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3288 offset=0, count=4096,
3289 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003290
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003291 # --- headers / trailers tests
3292
Serhiy Storchaka43767632013-11-03 21:31:38 +02003293 @requires_headers_trailers
3294 def test_headers(self):
3295 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003296 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003297 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003298 headers=[b"x" * 512, b"y" * 256])
3299 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003300 total_sent += sent
3301 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003302 while total_sent < len(expected_data):
3303 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003304 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3305 offset, nbytes)
3306 if sent == 0:
3307 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003308 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003309 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003310 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003311
Serhiy Storchaka43767632013-11-03 21:31:38 +02003312 self.assertEqual(total_sent, len(expected_data))
3313 self.client.close()
3314 self.server.wait()
3315 data = self.server.handler_instance.get_data()
3316 self.assertEqual(hash(data), hash(expected_data))
3317
3318 @requires_headers_trailers
3319 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003320 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003321 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003322
Hai Shi0c4f0f32020-06-30 21:46:31 +08003323 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003324 create_file(TESTFN2, file_data)
3325
3326 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003327 os.sendfile(self.sockno, f.fileno(), 0, 5,
3328 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003329 self.client.close()
3330 self.server.wait()
3331 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003332 self.assertEqual(data, b"abcde123456789")
3333
3334 @requires_headers_trailers
3335 @requires_32b
3336 def test_headers_overflow_32bits(self):
3337 self.server.handler_instance.accumulate = False
3338 with self.assertRaises(OSError) as cm:
3339 os.sendfile(self.sockno, self.fileno, 0, 0,
3340 headers=[b"x" * 2**16] * 2**15)
3341 self.assertEqual(cm.exception.errno, errno.EINVAL)
3342
3343 @requires_headers_trailers
3344 @requires_32b
3345 def test_trailers_overflow_32bits(self):
3346 self.server.handler_instance.accumulate = False
3347 with self.assertRaises(OSError) as cm:
3348 os.sendfile(self.sockno, self.fileno, 0, 0,
3349 trailers=[b"x" * 2**16] * 2**15)
3350 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003351
Serhiy Storchaka43767632013-11-03 21:31:38 +02003352 @requires_headers_trailers
3353 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3354 'test needs os.SF_NODISKIO')
3355 def test_flags(self):
3356 try:
3357 os.sendfile(self.sockno, self.fileno, 0, 4096,
3358 flags=os.SF_NODISKIO)
3359 except OSError as err:
3360 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3361 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003362
3363
Larry Hastings9cf065c2012-06-22 16:30:09 -07003364def supports_extended_attributes():
3365 if not hasattr(os, "setxattr"):
3366 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003367
Larry Hastings9cf065c2012-06-22 16:30:09 -07003368 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003369 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003370 try:
3371 os.setxattr(fp.fileno(), b"user.test", b"")
3372 except OSError:
3373 return False
3374 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003375 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003376
3377 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003378
3379
3380@unittest.skipUnless(supports_extended_attributes(),
3381 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003382# Kernels < 2.6.39 don't respect setxattr flags.
3383@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003384class ExtendedAttributeTests(unittest.TestCase):
3385
Larry Hastings9cf065c2012-06-22 16:30:09 -07003386 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003387 fn = os_helper.TESTFN
3388 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003389 create_file(fn)
3390
Benjamin Peterson799bd802011-08-31 22:15:17 -04003391 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003392 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003393 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003394
Victor Stinnerf12e5062011-10-16 22:12:03 +02003395 init_xattr = listxattr(fn)
3396 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003397
Larry Hastings9cf065c2012-06-22 16:30:09 -07003398 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003399 xattr = set(init_xattr)
3400 xattr.add("user.test")
3401 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003402 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3403 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3404 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003405
Benjamin Peterson799bd802011-08-31 22:15:17 -04003406 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003407 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003408 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003409
Benjamin Peterson799bd802011-08-31 22:15:17 -04003410 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003411 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003412 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003413
Larry Hastings9cf065c2012-06-22 16:30:09 -07003414 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003415 xattr.add("user.test2")
3416 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003417 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003418
Benjamin Peterson799bd802011-08-31 22:15:17 -04003419 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003420 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003421 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003422
Victor Stinnerf12e5062011-10-16 22:12:03 +02003423 xattr.remove("user.test")
3424 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003425 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3426 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3427 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3428 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003429 many = sorted("user.test{}".format(i) for i in range(100))
3430 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003431 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003432 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003433
Larry Hastings9cf065c2012-06-22 16:30:09 -07003434 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003435 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003436 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003437
3438 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003439 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003440
3441 def test_simple(self):
3442 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3443 os.listxattr)
3444
3445 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003446 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3447 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003448
3449 def test_fds(self):
3450 def getxattr(path, *args):
3451 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003452 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003453 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003454 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003455 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003456 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003457 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003458 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003459 def listxattr(path, *args):
3460 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003461 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003462 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3463
3464
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003465@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3466class TermsizeTests(unittest.TestCase):
3467 def test_does_not_crash(self):
3468 """Check if get_terminal_size() returns a meaningful value.
3469
3470 There's no easy portable way to actually check the size of the
3471 terminal, so let's check if it returns something sensible instead.
3472 """
3473 try:
3474 size = os.get_terminal_size()
3475 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003476 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003477 # Under win32 a generic OSError can be thrown if the
3478 # handle cannot be retrieved
3479 self.skipTest("failed to query terminal size")
3480 raise
3481
Antoine Pitroucfade362012-02-08 23:48:59 +01003482 self.assertGreaterEqual(size.columns, 0)
3483 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003484
3485 def test_stty_match(self):
3486 """Check if stty returns the same results
3487
3488 stty actually tests stdin, so get_terminal_size is invoked on
3489 stdin explicitly. If stty succeeded, then get_terminal_size()
3490 should work too.
3491 """
3492 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003493 size = (
3494 subprocess.check_output(
3495 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3496 ).split()
3497 )
xdegaye6a55d092017-11-12 17:57:04 +01003498 except (FileNotFoundError, subprocess.CalledProcessError,
3499 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003500 self.skipTest("stty invocation failed")
3501 expected = (int(size[1]), int(size[0])) # reversed order
3502
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003503 try:
3504 actual = os.get_terminal_size(sys.__stdin__.fileno())
3505 except OSError as e:
3506 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3507 # Under win32 a generic OSError can be thrown if the
3508 # handle cannot be retrieved
3509 self.skipTest("failed to query terminal size")
3510 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003511 self.assertEqual(expected, actual)
3512
3513
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003514@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003515@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003516class MemfdCreateTests(unittest.TestCase):
3517 def test_memfd_create(self):
3518 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3519 self.assertNotEqual(fd, -1)
3520 self.addCleanup(os.close, fd)
3521 self.assertFalse(os.get_inheritable(fd))
3522 with open(fd, "wb", closefd=False) as f:
3523 f.write(b'memfd_create')
3524 self.assertEqual(f.tell(), 12)
3525
3526 fd2 = os.memfd_create("Hi")
3527 self.addCleanup(os.close, fd2)
3528 self.assertFalse(os.get_inheritable(fd2))
3529
3530
Victor Stinner292c8352012-10-30 02:17:38 +01003531class OSErrorTests(unittest.TestCase):
3532 def setUp(self):
3533 class Str(str):
3534 pass
3535
Victor Stinnerafe17062012-10-31 22:47:43 +01003536 self.bytes_filenames = []
3537 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003538 if os_helper.TESTFN_UNENCODABLE is not None:
3539 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003540 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003541 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003542 self.unicode_filenames.append(decoded)
3543 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003544 if os_helper.TESTFN_UNDECODABLE is not None:
3545 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003546 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003547 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003548 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003549 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003550 self.bytes_filenames.append(memoryview(encoded))
3551
3552 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003553
3554 def test_oserror_filename(self):
3555 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003556 (self.filenames, os.chdir,),
3557 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003558 (self.filenames, os.lstat,),
3559 (self.filenames, os.open, os.O_RDONLY),
3560 (self.filenames, os.rmdir,),
3561 (self.filenames, os.stat,),
3562 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003563 ]
3564 if sys.platform == "win32":
3565 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003566 (self.bytes_filenames, os.rename, b"dst"),
3567 (self.bytes_filenames, os.replace, b"dst"),
3568 (self.unicode_filenames, os.rename, "dst"),
3569 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003570 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003571 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003572 else:
3573 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003574 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003575 (self.filenames, os.rename, "dst"),
3576 (self.filenames, os.replace, "dst"),
3577 ))
3578 if hasattr(os, "chown"):
3579 funcs.append((self.filenames, os.chown, 0, 0))
3580 if hasattr(os, "lchown"):
3581 funcs.append((self.filenames, os.lchown, 0, 0))
3582 if hasattr(os, "truncate"):
3583 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003584 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003585 funcs.append((self.filenames, os.chflags, 0))
3586 if hasattr(os, "lchflags"):
3587 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003588 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003589 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003590 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003591 if sys.platform == "win32":
3592 funcs.append((self.bytes_filenames, os.link, b"dst"))
3593 funcs.append((self.unicode_filenames, os.link, "dst"))
3594 else:
3595 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003596 if hasattr(os, "listxattr"):
3597 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003598 (self.filenames, os.listxattr,),
3599 (self.filenames, os.getxattr, "user.test"),
3600 (self.filenames, os.setxattr, "user.test", b'user'),
3601 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003602 ))
3603 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003604 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003605 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003606 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003607
Steve Dowercc16be82016-09-08 10:35:16 -07003608
Victor Stinnerafe17062012-10-31 22:47:43 +01003609 for filenames, func, *func_args in funcs:
3610 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003611 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003612 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003613 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003614 else:
3615 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3616 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003617 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003618 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003619 except UnicodeDecodeError:
3620 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003621 else:
3622 self.fail("No exception thrown by {}".format(func))
3623
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003624class CPUCountTests(unittest.TestCase):
3625 def test_cpu_count(self):
3626 cpus = os.cpu_count()
3627 if cpus is not None:
3628 self.assertIsInstance(cpus, int)
3629 self.assertGreater(cpus, 0)
3630 else:
3631 self.skipTest("Could not determine the number of CPUs")
3632
Victor Stinnerdaf45552013-08-28 00:53:59 +02003633
3634class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003635 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003636 fd = os.open(__file__, os.O_RDONLY)
3637 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003638 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003639
Victor Stinnerdaf45552013-08-28 00:53:59 +02003640 os.set_inheritable(fd, True)
3641 self.assertEqual(os.get_inheritable(fd), True)
3642
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003643 @unittest.skipIf(fcntl is None, "need fcntl")
3644 def test_get_inheritable_cloexec(self):
3645 fd = os.open(__file__, os.O_RDONLY)
3646 self.addCleanup(os.close, fd)
3647 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003648
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003649 # clear FD_CLOEXEC flag
3650 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3651 flags &= ~fcntl.FD_CLOEXEC
3652 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003653
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003654 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003655
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003656 @unittest.skipIf(fcntl is None, "need fcntl")
3657 def test_set_inheritable_cloexec(self):
3658 fd = os.open(__file__, os.O_RDONLY)
3659 self.addCleanup(os.close, fd)
3660 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3661 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003662
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003663 os.set_inheritable(fd, True)
3664 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3665 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003666
Victor Stinnerdaf45552013-08-28 00:53:59 +02003667 def test_open(self):
3668 fd = os.open(__file__, os.O_RDONLY)
3669 self.addCleanup(os.close, fd)
3670 self.assertEqual(os.get_inheritable(fd), False)
3671
3672 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3673 def test_pipe(self):
3674 rfd, wfd = os.pipe()
3675 self.addCleanup(os.close, rfd)
3676 self.addCleanup(os.close, wfd)
3677 self.assertEqual(os.get_inheritable(rfd), False)
3678 self.assertEqual(os.get_inheritable(wfd), False)
3679
3680 def test_dup(self):
3681 fd1 = os.open(__file__, os.O_RDONLY)
3682 self.addCleanup(os.close, fd1)
3683
3684 fd2 = os.dup(fd1)
3685 self.addCleanup(os.close, fd2)
3686 self.assertEqual(os.get_inheritable(fd2), False)
3687
Zackery Spytz5be66602019-08-23 12:38:41 -06003688 def test_dup_standard_stream(self):
3689 fd = os.dup(1)
3690 self.addCleanup(os.close, fd)
3691 self.assertGreater(fd, 0)
3692
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003693 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3694 def test_dup_nul(self):
3695 # os.dup() was creating inheritable fds for character files.
3696 fd1 = os.open('NUL', os.O_RDONLY)
3697 self.addCleanup(os.close, fd1)
3698 fd2 = os.dup(fd1)
3699 self.addCleanup(os.close, fd2)
3700 self.assertFalse(os.get_inheritable(fd2))
3701
Victor Stinnerdaf45552013-08-28 00:53:59 +02003702 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3703 def test_dup2(self):
3704 fd = os.open(__file__, os.O_RDONLY)
3705 self.addCleanup(os.close, fd)
3706
3707 # inheritable by default
3708 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003709 self.addCleanup(os.close, fd2)
3710 self.assertEqual(os.dup2(fd, fd2), fd2)
3711 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003712
3713 # force non-inheritable
3714 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003715 self.addCleanup(os.close, fd3)
3716 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3717 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003718
3719 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3720 def test_openpty(self):
3721 master_fd, slave_fd = os.openpty()
3722 self.addCleanup(os.close, master_fd)
3723 self.addCleanup(os.close, slave_fd)
3724 self.assertEqual(os.get_inheritable(master_fd), False)
3725 self.assertEqual(os.get_inheritable(slave_fd), False)
3726
3727
Brett Cannon3f9183b2016-08-26 14:44:48 -07003728class PathTConverterTests(unittest.TestCase):
3729 # tuples of (function name, allows fd arguments, additional arguments to
3730 # function, cleanup function)
3731 functions = [
3732 ('stat', True, (), None),
3733 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003734 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003735 ('chflags', False, (0,), None),
3736 ('lchflags', False, (0,), None),
3737 ('open', False, (0,), getattr(os, 'close', None)),
3738 ]
3739
3740 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003741 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003742 if os.name == 'nt':
3743 bytes_fspath = bytes_filename = None
3744 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003745 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003746 bytes_fspath = FakePath(bytes_filename)
3747 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003748 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003749 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003750
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003751 int_fspath = FakePath(fd)
3752 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003753
3754 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3755 with self.subTest(name=name):
3756 try:
3757 fn = getattr(os, name)
3758 except AttributeError:
3759 continue
3760
Brett Cannon8f96a302016-08-26 19:30:11 -07003761 for path in (str_filename, bytes_filename, str_fspath,
3762 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003763 if path is None:
3764 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003765 with self.subTest(name=name, path=path):
3766 result = fn(path, *extra_args)
3767 if cleanup_fn is not None:
3768 cleanup_fn(result)
3769
3770 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003771 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003772 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003773
3774 if allow_fd:
3775 result = fn(fd, *extra_args) # should not fail
3776 if cleanup_fn is not None:
3777 cleanup_fn(result)
3778 else:
3779 with self.assertRaisesRegex(
3780 TypeError,
3781 'os.PathLike'):
3782 fn(fd, *extra_args)
3783
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003784 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003785 msg = r'__fspath__\(\) to return str or bytes, not %s'
3786 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003787 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003788 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003789 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003790 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003791 os.stat(FakePath(object()))
3792
Brett Cannon3f9183b2016-08-26 14:44:48 -07003793
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003794@unittest.skipUnless(hasattr(os, 'get_blocking'),
3795 'needs os.get_blocking() and os.set_blocking()')
3796class BlockingTests(unittest.TestCase):
3797 def test_blocking(self):
3798 fd = os.open(__file__, os.O_RDONLY)
3799 self.addCleanup(os.close, fd)
3800 self.assertEqual(os.get_blocking(fd), True)
3801
3802 os.set_blocking(fd, False)
3803 self.assertEqual(os.get_blocking(fd), False)
3804
3805 os.set_blocking(fd, True)
3806 self.assertEqual(os.get_blocking(fd), True)
3807
3808
Yury Selivanov97e2e062014-09-26 12:33:06 -04003809
3810class ExportsTests(unittest.TestCase):
3811 def test_os_all(self):
3812 self.assertIn('open', os.__all__)
3813 self.assertIn('walk', os.__all__)
3814
3815
Eddie Elizondob3966632019-11-05 07:16:14 -08003816class TestDirEntry(unittest.TestCase):
3817 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003818 self.path = os.path.realpath(os_helper.TESTFN)
3819 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08003820 os.mkdir(self.path)
3821
3822 def test_uninstantiable(self):
3823 self.assertRaises(TypeError, os.DirEntry)
3824
3825 def test_unpickable(self):
3826 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3827 entry = [entry for entry in os.scandir(self.path)].pop()
3828 self.assertIsInstance(entry, os.DirEntry)
3829 self.assertEqual(entry.name, "file.txt")
3830 import pickle
3831 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3832
3833
Victor Stinner6036e442015-03-08 01:58:04 +01003834class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003835 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003836
Victor Stinner6036e442015-03-08 01:58:04 +01003837 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003838 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003839 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003840 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003841 os.mkdir(self.path)
3842
3843 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003844 path = self.bytes_path if isinstance(name, bytes) else self.path
3845 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003846 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003847 return filename
3848
3849 def get_entries(self, names):
3850 entries = dict((entry.name, entry)
3851 for entry in os.scandir(self.path))
3852 self.assertEqual(sorted(entries.keys()), names)
3853 return entries
3854
3855 def assert_stat_equal(self, stat1, stat2, skip_fields):
3856 if skip_fields:
3857 for attr in dir(stat1):
3858 if not attr.startswith("st_"):
3859 continue
3860 if attr in ("st_dev", "st_ino", "st_nlink"):
3861 continue
3862 self.assertEqual(getattr(stat1, attr),
3863 getattr(stat2, attr),
3864 (stat1, stat2, attr))
3865 else:
3866 self.assertEqual(stat1, stat2)
3867
Eddie Elizondob3966632019-11-05 07:16:14 -08003868 def test_uninstantiable(self):
3869 scandir_iter = os.scandir(self.path)
3870 self.assertRaises(TypeError, type(scandir_iter))
3871 scandir_iter.close()
3872
3873 def test_unpickable(self):
3874 filename = self.create_file("file.txt")
3875 scandir_iter = os.scandir(self.path)
3876 import pickle
3877 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3878 scandir_iter.close()
3879
Victor Stinner6036e442015-03-08 01:58:04 +01003880 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003881 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003882 self.assertEqual(entry.name, name)
3883 self.assertEqual(entry.path, os.path.join(self.path, name))
3884 self.assertEqual(entry.inode(),
3885 os.stat(entry.path, follow_symlinks=False).st_ino)
3886
3887 entry_stat = os.stat(entry.path)
3888 self.assertEqual(entry.is_dir(),
3889 stat.S_ISDIR(entry_stat.st_mode))
3890 self.assertEqual(entry.is_file(),
3891 stat.S_ISREG(entry_stat.st_mode))
3892 self.assertEqual(entry.is_symlink(),
3893 os.path.islink(entry.path))
3894
3895 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3896 self.assertEqual(entry.is_dir(follow_symlinks=False),
3897 stat.S_ISDIR(entry_lstat.st_mode))
3898 self.assertEqual(entry.is_file(follow_symlinks=False),
3899 stat.S_ISREG(entry_lstat.st_mode))
3900
3901 self.assert_stat_equal(entry.stat(),
3902 entry_stat,
3903 os.name == 'nt' and not is_symlink)
3904 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3905 entry_lstat,
3906 os.name == 'nt')
3907
3908 def test_attributes(self):
3909 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08003910 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01003911
3912 dirname = os.path.join(self.path, "dir")
3913 os.mkdir(dirname)
3914 filename = self.create_file("file.txt")
3915 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003916 try:
3917 os.link(filename, os.path.join(self.path, "link_file.txt"))
3918 except PermissionError as e:
3919 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003920 if symlink:
3921 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3922 target_is_directory=True)
3923 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3924
3925 names = ['dir', 'file.txt']
3926 if link:
3927 names.append('link_file.txt')
3928 if symlink:
3929 names.extend(('symlink_dir', 'symlink_file.txt'))
3930 entries = self.get_entries(names)
3931
3932 entry = entries['dir']
3933 self.check_entry(entry, 'dir', True, False, False)
3934
3935 entry = entries['file.txt']
3936 self.check_entry(entry, 'file.txt', False, True, False)
3937
3938 if link:
3939 entry = entries['link_file.txt']
3940 self.check_entry(entry, 'link_file.txt', False, True, False)
3941
3942 if symlink:
3943 entry = entries['symlink_dir']
3944 self.check_entry(entry, 'symlink_dir', True, False, True)
3945
3946 entry = entries['symlink_file.txt']
3947 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3948
3949 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003950 path = self.bytes_path if isinstance(name, bytes) else self.path
3951 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003952 self.assertEqual(len(entries), 1)
3953
3954 entry = entries[0]
3955 self.assertEqual(entry.name, name)
3956 return entry
3957
Brett Cannon96881cd2016-06-10 14:37:21 -07003958 def create_file_entry(self, name='file.txt'):
3959 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003960 return self.get_entry(os.path.basename(filename))
3961
3962 def test_current_directory(self):
3963 filename = self.create_file()
3964 old_dir = os.getcwd()
3965 try:
3966 os.chdir(self.path)
3967
3968 # call scandir() without parameter: it must list the content
3969 # of the current directory
3970 entries = dict((entry.name, entry) for entry in os.scandir())
3971 self.assertEqual(sorted(entries.keys()),
3972 [os.path.basename(filename)])
3973 finally:
3974 os.chdir(old_dir)
3975
3976 def test_repr(self):
3977 entry = self.create_file_entry()
3978 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3979
Brett Cannon96881cd2016-06-10 14:37:21 -07003980 def test_fspath_protocol(self):
3981 entry = self.create_file_entry()
3982 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3983
3984 def test_fspath_protocol_bytes(self):
3985 bytes_filename = os.fsencode('bytesfile.txt')
3986 bytes_entry = self.create_file_entry(name=bytes_filename)
3987 fspath = os.fspath(bytes_entry)
3988 self.assertIsInstance(fspath, bytes)
3989 self.assertEqual(fspath,
3990 os.path.join(os.fsencode(self.path),bytes_filename))
3991
Victor Stinner6036e442015-03-08 01:58:04 +01003992 def test_removed_dir(self):
3993 path = os.path.join(self.path, 'dir')
3994
3995 os.mkdir(path)
3996 entry = self.get_entry('dir')
3997 os.rmdir(path)
3998
3999 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4000 if os.name == 'nt':
4001 self.assertTrue(entry.is_dir())
4002 self.assertFalse(entry.is_file())
4003 self.assertFalse(entry.is_symlink())
4004 if os.name == 'nt':
4005 self.assertRaises(FileNotFoundError, entry.inode)
4006 # don't fail
4007 entry.stat()
4008 entry.stat(follow_symlinks=False)
4009 else:
4010 self.assertGreater(entry.inode(), 0)
4011 self.assertRaises(FileNotFoundError, entry.stat)
4012 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4013
4014 def test_removed_file(self):
4015 entry = self.create_file_entry()
4016 os.unlink(entry.path)
4017
4018 self.assertFalse(entry.is_dir())
4019 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4020 if os.name == 'nt':
4021 self.assertTrue(entry.is_file())
4022 self.assertFalse(entry.is_symlink())
4023 if os.name == 'nt':
4024 self.assertRaises(FileNotFoundError, entry.inode)
4025 # don't fail
4026 entry.stat()
4027 entry.stat(follow_symlinks=False)
4028 else:
4029 self.assertGreater(entry.inode(), 0)
4030 self.assertRaises(FileNotFoundError, entry.stat)
4031 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4032
4033 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004034 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004035 return self.skipTest('cannot create symbolic link')
4036
4037 filename = self.create_file("file.txt")
4038 os.symlink(filename,
4039 os.path.join(self.path, "symlink.txt"))
4040 entries = self.get_entries(['file.txt', 'symlink.txt'])
4041 entry = entries['symlink.txt']
4042 os.unlink(filename)
4043
4044 self.assertGreater(entry.inode(), 0)
4045 self.assertFalse(entry.is_dir())
4046 self.assertFalse(entry.is_file()) # broken symlink returns False
4047 self.assertFalse(entry.is_dir(follow_symlinks=False))
4048 self.assertFalse(entry.is_file(follow_symlinks=False))
4049 self.assertTrue(entry.is_symlink())
4050 self.assertRaises(FileNotFoundError, entry.stat)
4051 # don't fail
4052 entry.stat(follow_symlinks=False)
4053
4054 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004055 self.create_file("file.txt")
4056
4057 path_bytes = os.fsencode(self.path)
4058 entries = list(os.scandir(path_bytes))
4059 self.assertEqual(len(entries), 1, entries)
4060 entry = entries[0]
4061
4062 self.assertEqual(entry.name, b'file.txt')
4063 self.assertEqual(entry.path,
4064 os.fsencode(os.path.join(self.path, 'file.txt')))
4065
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004066 def test_bytes_like(self):
4067 self.create_file("file.txt")
4068
4069 for cls in bytearray, memoryview:
4070 path_bytes = cls(os.fsencode(self.path))
4071 with self.assertWarns(DeprecationWarning):
4072 entries = list(os.scandir(path_bytes))
4073 self.assertEqual(len(entries), 1, entries)
4074 entry = entries[0]
4075
4076 self.assertEqual(entry.name, b'file.txt')
4077 self.assertEqual(entry.path,
4078 os.fsencode(os.path.join(self.path, 'file.txt')))
4079 self.assertIs(type(entry.name), bytes)
4080 self.assertIs(type(entry.path), bytes)
4081
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004082 @unittest.skipUnless(os.listdir in os.supports_fd,
4083 'fd support for listdir required for this test.')
4084 def test_fd(self):
4085 self.assertIn(os.scandir, os.supports_fd)
4086 self.create_file('file.txt')
4087 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004088 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004089 os.symlink('file.txt', os.path.join(self.path, 'link'))
4090 expected_names.append('link')
4091
4092 fd = os.open(self.path, os.O_RDONLY)
4093 try:
4094 with os.scandir(fd) as it:
4095 entries = list(it)
4096 names = [entry.name for entry in entries]
4097 self.assertEqual(sorted(names), expected_names)
4098 self.assertEqual(names, os.listdir(fd))
4099 for entry in entries:
4100 self.assertEqual(entry.path, entry.name)
4101 self.assertEqual(os.fspath(entry), entry.name)
4102 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4103 if os.stat in os.supports_dir_fd:
4104 st = os.stat(entry.name, dir_fd=fd)
4105 self.assertEqual(entry.stat(), st)
4106 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4107 self.assertEqual(entry.stat(follow_symlinks=False), st)
4108 finally:
4109 os.close(fd)
4110
Victor Stinner6036e442015-03-08 01:58:04 +01004111 def test_empty_path(self):
4112 self.assertRaises(FileNotFoundError, os.scandir, '')
4113
4114 def test_consume_iterator_twice(self):
4115 self.create_file("file.txt")
4116 iterator = os.scandir(self.path)
4117
4118 entries = list(iterator)
4119 self.assertEqual(len(entries), 1, entries)
4120
4121 # check than consuming the iterator twice doesn't raise exception
4122 entries2 = list(iterator)
4123 self.assertEqual(len(entries2), 0, entries2)
4124
4125 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004126 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004127 self.assertRaises(TypeError, os.scandir, obj)
4128
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004129 def test_close(self):
4130 self.create_file("file.txt")
4131 self.create_file("file2.txt")
4132 iterator = os.scandir(self.path)
4133 next(iterator)
4134 iterator.close()
4135 # multiple closes
4136 iterator.close()
4137 with self.check_no_resource_warning():
4138 del iterator
4139
4140 def test_context_manager(self):
4141 self.create_file("file.txt")
4142 self.create_file("file2.txt")
4143 with os.scandir(self.path) as iterator:
4144 next(iterator)
4145 with self.check_no_resource_warning():
4146 del iterator
4147
4148 def test_context_manager_close(self):
4149 self.create_file("file.txt")
4150 self.create_file("file2.txt")
4151 with os.scandir(self.path) as iterator:
4152 next(iterator)
4153 iterator.close()
4154
4155 def test_context_manager_exception(self):
4156 self.create_file("file.txt")
4157 self.create_file("file2.txt")
4158 with self.assertRaises(ZeroDivisionError):
4159 with os.scandir(self.path) as iterator:
4160 next(iterator)
4161 1/0
4162 with self.check_no_resource_warning():
4163 del iterator
4164
4165 def test_resource_warning(self):
4166 self.create_file("file.txt")
4167 self.create_file("file2.txt")
4168 iterator = os.scandir(self.path)
4169 next(iterator)
4170 with self.assertWarns(ResourceWarning):
4171 del iterator
4172 support.gc_collect()
4173 # exhausted iterator
4174 iterator = os.scandir(self.path)
4175 list(iterator)
4176 with self.check_no_resource_warning():
4177 del iterator
4178
Victor Stinner6036e442015-03-08 01:58:04 +01004179
Ethan Furmancdc08792016-06-02 15:06:09 -07004180class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004181
4182 # Abstracted so it can be overridden to test pure Python implementation
4183 # if a C version is provided.
4184 fspath = staticmethod(os.fspath)
4185
Ethan Furmancdc08792016-06-02 15:06:09 -07004186 def test_return_bytes(self):
4187 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004188 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004189
4190 def test_return_string(self):
4191 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004192 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004193
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004194 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004195 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004196 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004197
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004198 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004199 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4200 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4201
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004202 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004203 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4204 self.assertTrue(issubclass(FakePath, os.PathLike))
4205 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004206
Ethan Furmancdc08792016-06-02 15:06:09 -07004207 def test_garbage_in_exception_out(self):
4208 vapor = type('blah', (), {})
4209 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004210 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004211
4212 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004213 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004214
Brett Cannon044283a2016-07-15 10:41:49 -07004215 def test_bad_pathlike(self):
4216 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004217 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004218 # __fspath__ attribute that is not callable.
4219 c = type('foo', (), {})
4220 c.__fspath__ = 1
4221 self.assertRaises(TypeError, self.fspath, c())
4222 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004223 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004224 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004225
Bar Hareleae87e32019-12-22 11:57:27 +02004226 def test_pathlike_subclasshook(self):
4227 # bpo-38878: subclasshook causes subclass checks
4228 # true on abstract implementation.
4229 class A(os.PathLike):
4230 pass
4231 self.assertFalse(issubclass(FakePath, A))
4232 self.assertTrue(issubclass(FakePath, os.PathLike))
4233
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004234 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004235 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004236
Victor Stinnerc29b5852017-11-02 07:28:27 -07004237
4238class TimesTests(unittest.TestCase):
4239 def test_times(self):
4240 times = os.times()
4241 self.assertIsInstance(times, os.times_result)
4242
4243 for field in ('user', 'system', 'children_user', 'children_system',
4244 'elapsed'):
4245 value = getattr(times, field)
4246 self.assertIsInstance(value, float)
4247
4248 if os.name == 'nt':
4249 self.assertEqual(times.children_user, 0)
4250 self.assertEqual(times.children_system, 0)
4251 self.assertEqual(times.elapsed, 0)
4252
4253
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004254# Only test if the C version is provided, otherwise TestPEP519 already tested
4255# the pure Python implementation.
4256if hasattr(os, "_fspath"):
4257 class TestPEP519PurePython(TestPEP519):
4258
4259 """Explicitly test the pure Python implementation of os.fspath()."""
4260
4261 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004262
4263
Fred Drake2e2be372001-09-20 21:33:42 +00004264if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004265 unittest.main()