blob: 0db7d30f6385e98755a0f23f5272e33cb7b43fcc [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070028import types
Victor Stinner47aacc82015-06-12 17:26:23 +020029import unittest
30import uuid
31import warnings
32from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +030033from test.support import socket_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070034from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020035
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import pwd
50 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010051except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050052 all_users = []
53try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020055except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020056 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020057
Berker Peksagce643912015-05-06 06:33:17 +030058from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020059from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000060
Victor Stinner923590e2016-03-24 09:11:48 +010061
R David Murrayf2ad1732014-12-25 18:36:56 -050062root_in_posix = False
63if hasattr(os, 'geteuid'):
64 root_in_posix = (os.geteuid() == 0)
65
Mark Dickinson7cf03892010-04-16 13:45:35 +000066# Detect whether we're on a Linux system that uses the (now outdated
67# and unmaintained) linuxthreads threading library. There's an issue
68# when combining linuxthreads with a failed execv call: see
69# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020070if hasattr(sys, 'thread_info') and sys.thread_info.version:
71 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
72else:
73 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000074
Stefan Krahebee49a2013-01-17 15:31:00 +010075# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
76HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
77
Victor Stinner923590e2016-03-24 09:11:48 +010078
Berker Peksag4af23d72016-09-15 20:32:44 +030079def requires_os_func(name):
80 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
81
82
Victor Stinnerae39d232016-03-24 17:12:55 +010083def create_file(filename, content=b'content'):
84 with open(filename, "xb", 0) as fp:
85 fp.write(content)
86
87
Victor Stinner689830e2019-06-26 17:31:12 +020088class MiscTests(unittest.TestCase):
89 def test_getcwd(self):
90 cwd = os.getcwd()
91 self.assertIsInstance(cwd, str)
92
Victor Stinnerec3e20a2019-06-28 18:01:59 +020093 def test_getcwd_long_path(self):
94 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
95 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
96 # longer path if longer paths support is enabled. Internally, the os
97 # module uses MAXPATHLEN which is at least 1024.
98 #
99 # Use a directory name of 200 characters to fit into Windows MAX_PATH
100 # limit.
101 #
102 # On Windows, the test can stop when trying to create a path longer
103 # than MAX_PATH if long paths support is disabled:
104 # see RtlAreLongPathsEnabled().
105 min_len = 2000 # characters
106 dirlen = 200 # characters
107 dirname = 'python_test_dir_'
108 dirname = dirname + ('a' * (dirlen - len(dirname)))
109
110 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200111 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200112 expected = path
113
114 while True:
115 cwd = os.getcwd()
116 self.assertEqual(cwd, expected)
117
118 need = min_len - (len(cwd) + len(os.path.sep))
119 if need <= 0:
120 break
121 if len(dirname) > need and need > 0:
122 dirname = dirname[:need]
123
124 path = os.path.join(path, dirname)
125 try:
126 os.mkdir(path)
127 # On Windows, chdir() can fail
128 # even if mkdir() succeeded
129 os.chdir(path)
130 except FileNotFoundError:
131 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
132 # ERROR_FILENAME_EXCED_RANGE (206) errors
133 # ("The filename or extension is too long")
134 break
135 except OSError as exc:
136 if exc.errno == errno.ENAMETOOLONG:
137 break
138 else:
139 raise
140
141 expected = path
142
143 if support.verbose:
144 print(f"Tested current directory length: {len(cwd)}")
145
Victor Stinner689830e2019-06-26 17:31:12 +0200146 def test_getcwdb(self):
147 cwd = os.getcwdb()
148 self.assertIsInstance(cwd, bytes)
149 self.assertEqual(os.fsdecode(cwd), os.getcwd())
150
151
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000152# Tests creating TESTFN
153class FileTests(unittest.TestCase):
154 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000155 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000156 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000157 tearDown = setUp
158
159 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000162 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000163
Christian Heimesfdab48e2008-01-20 09:06:41 +0000164 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000165 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
166 # We must allocate two consecutive file descriptors, otherwise
167 # it will mess up other file descriptors (perhaps even the three
168 # standard ones).
169 second = os.dup(first)
170 try:
171 retries = 0
172 while second != first + 1:
173 os.close(first)
174 retries += 1
175 if retries > 10:
176 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000177 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000178 first, second = second, os.dup(second)
179 finally:
180 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000181 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000182 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000183 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000184
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000185 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000186 def test_rename(self):
187 path = support.TESTFN
188 old = sys.getrefcount(path)
189 self.assertRaises(TypeError, os.rename, path, 0)
190 new = sys.getrefcount(path)
191 self.assertEqual(old, new)
192
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000193 def test_read(self):
194 with open(support.TESTFN, "w+b") as fobj:
195 fobj.write(b"spam")
196 fobj.flush()
197 fd = fobj.fileno()
198 os.lseek(fd, 0, 0)
199 s = os.read(fd, 4)
200 self.assertEqual(type(s), bytes)
201 self.assertEqual(s, b"spam")
202
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200203 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200204 # Skip the test on 32-bit platforms: the number of bytes must fit in a
205 # Py_ssize_t type
206 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
207 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200208 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
209 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100211 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200212
213 # Issue #21932: Make sure that os.read() does not raise an
214 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200215 with open(support.TESTFN, "rb") as fp:
216 data = os.read(fp.fileno(), size)
217
Victor Stinner8c663fd2017-11-08 14:44:44 -0800218 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200219 # operating system is free to return less bytes than requested.
220 self.assertEqual(data, b'test')
221
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000222 def test_write(self):
223 # os.write() accepts bytes- and buffer-like objects but not strings
224 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
225 self.assertRaises(TypeError, os.write, fd, "beans")
226 os.write(fd, b"bacon\n")
227 os.write(fd, bytearray(b"eggs\n"))
228 os.write(fd, memoryview(b"spam\n"))
229 os.close(fd)
230 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000231 self.assertEqual(fobj.read().splitlines(),
232 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000233
Victor Stinnere0daff12011-03-20 23:36:35 +0100234 def write_windows_console(self, *args):
235 retcode = subprocess.call(args,
236 # use a new console to not flood the test output
237 creationflags=subprocess.CREATE_NEW_CONSOLE,
238 # use a shell to hide the console window (SW_HIDE)
239 shell=True)
240 self.assertEqual(retcode, 0)
241
242 @unittest.skipUnless(sys.platform == 'win32',
243 'test specific to the Windows console')
244 def test_write_windows_console(self):
245 # Issue #11395: the Windows console returns an error (12: not enough
246 # space error) on writing into stdout if stdout mode is binary and the
247 # length is greater than 66,000 bytes (or less, depending on heap
248 # usage).
249 code = "print('x' * 100000)"
250 self.write_windows_console(sys.executable, "-c", code)
251 self.write_windows_console(sys.executable, "-u", "-c", code)
252
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000253 def fdopen_helper(self, *args):
254 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200255 f = os.fdopen(fd, *args)
256 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000257
258 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200259 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
260 os.close(fd)
261
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000262 self.fdopen_helper()
263 self.fdopen_helper('r')
264 self.fdopen_helper('r', 100)
265
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100266 def test_replace(self):
267 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100268 self.addCleanup(support.unlink, support.TESTFN)
269 self.addCleanup(support.unlink, TESTFN2)
270
271 create_file(support.TESTFN, b"1")
272 create_file(TESTFN2, b"2")
273
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100274 os.replace(support.TESTFN, TESTFN2)
275 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
276 with open(TESTFN2, 'r') as f:
277 self.assertEqual(f.read(), "1")
278
Martin Panterbf19d162015-09-09 01:01:13 +0000279 def test_open_keywords(self):
280 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
281 dir_fd=None)
282 os.close(f)
283
284 def test_symlink_keywords(self):
285 symlink = support.get_attribute(os, "symlink")
286 try:
287 symlink(src='target', dst=support.TESTFN,
288 target_is_directory=False, dir_fd=None)
289 except (NotImplementedError, OSError):
290 pass # No OS support or unprivileged user
291
Pablo Galindoaac4d032019-05-31 19:39:47 +0100292 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
293 def test_copy_file_range_invalid_values(self):
294 with self.assertRaises(ValueError):
295 os.copy_file_range(0, 1, -10)
296
297 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
298 def test_copy_file_range(self):
299 TESTFN2 = support.TESTFN + ".3"
300 data = b'0123456789'
301
302 create_file(support.TESTFN, data)
303 self.addCleanup(support.unlink, support.TESTFN)
304
305 in_file = open(support.TESTFN, 'rb')
306 self.addCleanup(in_file.close)
307 in_fd = in_file.fileno()
308
309 out_file = open(TESTFN2, 'w+b')
310 self.addCleanup(support.unlink, TESTFN2)
311 self.addCleanup(out_file.close)
312 out_fd = out_file.fileno()
313
314 try:
315 i = os.copy_file_range(in_fd, out_fd, 5)
316 except OSError as e:
317 # Handle the case in which Python was compiled
318 # in a system with the syscall but without support
319 # in the kernel.
320 if e.errno != errno.ENOSYS:
321 raise
322 self.skipTest(e)
323 else:
324 # The number of copied bytes can be less than
325 # the number of bytes originally requested.
326 self.assertIn(i, range(0, 6));
327
328 with open(TESTFN2, 'rb') as in_file:
329 self.assertEqual(in_file.read(), data[:i])
330
331 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
332 def test_copy_file_range_offset(self):
333 TESTFN4 = support.TESTFN + ".4"
334 data = b'0123456789'
335 bytes_to_copy = 6
336 in_skip = 3
337 out_seek = 5
338
339 create_file(support.TESTFN, data)
340 self.addCleanup(support.unlink, support.TESTFN)
341
342 in_file = open(support.TESTFN, 'rb')
343 self.addCleanup(in_file.close)
344 in_fd = in_file.fileno()
345
346 out_file = open(TESTFN4, 'w+b')
347 self.addCleanup(support.unlink, TESTFN4)
348 self.addCleanup(out_file.close)
349 out_fd = out_file.fileno()
350
351 try:
352 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
353 offset_src=in_skip,
354 offset_dst=out_seek)
355 except OSError as e:
356 # Handle the case in which Python was compiled
357 # in a system with the syscall but without support
358 # in the kernel.
359 if e.errno != errno.ENOSYS:
360 raise
361 self.skipTest(e)
362 else:
363 # The number of copied bytes can be less than
364 # the number of bytes originally requested.
365 self.assertIn(i, range(0, bytes_to_copy+1));
366
367 with open(TESTFN4, 'rb') as in_file:
368 read = in_file.read()
369 # seeked bytes (5) are zero'ed
370 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
371 # 012 are skipped (in_skip)
372 # 345678 are copied in the file (in_skip + bytes_to_copy)
373 self.assertEqual(read[out_seek:],
374 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200375
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000376# Test attributes on return values from os.*stat* family.
377class StatAttributeTests(unittest.TestCase):
378 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200379 self.fname = support.TESTFN
380 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100381 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000382
Antoine Pitrou38425292010-09-21 18:19:07 +0000383 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000384 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385
386 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000387 self.assertEqual(result[stat.ST_SIZE], 3)
388 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390 # Make sure all the attributes are there
391 members = dir(result)
392 for name in dir(stat):
393 if name[:3] == 'ST_':
394 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000395 if name.endswith("TIME"):
396 def trunc(x): return int(x)
397 else:
398 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000399 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000401 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000402
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700404 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700405 for name in 'st_atime st_mtime st_ctime'.split():
406 floaty = int(getattr(result, name) * 100000)
407 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700408 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700409
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000410 try:
411 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200412 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000413 except IndexError:
414 pass
415
416 # Make sure that assignment fails
417 try:
418 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200419 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000420 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000421 pass
422
423 try:
424 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200425 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000426 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000427 pass
428
429 try:
430 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200431 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000432 except AttributeError:
433 pass
434
435 # Use the stat_result constructor with a too-short tuple.
436 try:
437 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200438 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000439 except TypeError:
440 pass
441
Ezio Melotti42da6632011-03-15 05:18:48 +0200442 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000443 try:
444 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
445 except TypeError:
446 pass
447
Antoine Pitrou38425292010-09-21 18:19:07 +0000448 def test_stat_attributes(self):
449 self.check_stat_attributes(self.fname)
450
451 def test_stat_attributes_bytes(self):
452 try:
453 fname = self.fname.encode(sys.getfilesystemencoding())
454 except UnicodeEncodeError:
455 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700456 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000457
Christian Heimes25827622013-10-12 01:27:08 +0200458 def test_stat_result_pickle(self):
459 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200460 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
461 p = pickle.dumps(result, proto)
462 self.assertIn(b'stat_result', p)
463 if proto < 4:
464 self.assertIn(b'cos\nstat_result\n', p)
465 unpickled = pickle.loads(p)
466 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200467
Serhiy Storchaka43767632013-11-03 21:31:38 +0200468 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700470 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000471
472 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000473 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000474
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000475 # Make sure all the attributes are there.
476 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
477 'ffree', 'favail', 'flag', 'namemax')
478 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000479 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000480
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100481 self.assertTrue(isinstance(result.f_fsid, int))
482
483 # Test that the size of the tuple doesn't change
484 self.assertEqual(len(result), 10)
485
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000486 # Make sure that assignment really fails
487 try:
488 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200489 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000490 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000491 pass
492
493 try:
494 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200495 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000496 except AttributeError:
497 pass
498
499 # Use the constructor with a too-short tuple.
500 try:
501 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200502 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000503 except TypeError:
504 pass
505
Ezio Melotti42da6632011-03-15 05:18:48 +0200506 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000507 try:
508 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
509 except TypeError:
510 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000511
Christian Heimes25827622013-10-12 01:27:08 +0200512 @unittest.skipUnless(hasattr(os, 'statvfs'),
513 "need os.statvfs()")
514 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700515 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200516
Serhiy Storchakabad12572014-12-15 14:03:42 +0200517 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
518 p = pickle.dumps(result, proto)
519 self.assertIn(b'statvfs_result', p)
520 if proto < 4:
521 self.assertIn(b'cos\nstatvfs_result\n', p)
522 unpickled = pickle.loads(p)
523 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200524
Serhiy Storchaka43767632013-11-03 21:31:38 +0200525 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
526 def test_1686475(self):
527 # Verify that an open file can be stat'ed
528 try:
529 os.stat(r"c:\pagefile.sys")
530 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600531 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200532 except OSError as e:
533 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000534
Serhiy Storchaka43767632013-11-03 21:31:38 +0200535 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
536 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
537 def test_15261(self):
538 # Verify that stat'ing a closed fd does not cause crash
539 r, w = os.pipe()
540 try:
541 os.stat(r) # should not raise error
542 finally:
543 os.close(r)
544 os.close(w)
545 with self.assertRaises(OSError) as ctx:
546 os.stat(r)
547 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100548
Zachary Ware63f277b2014-06-19 09:46:37 -0500549 def check_file_attributes(self, result):
550 self.assertTrue(hasattr(result, 'st_file_attributes'))
551 self.assertTrue(isinstance(result.st_file_attributes, int))
552 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
553
554 @unittest.skipUnless(sys.platform == "win32",
555 "st_file_attributes is Win32 specific")
556 def test_file_attributes(self):
557 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
558 result = os.stat(self.fname)
559 self.check_file_attributes(result)
560 self.assertEqual(
561 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
562 0)
563
564 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200565 dirname = support.TESTFN + "dir"
566 os.mkdir(dirname)
567 self.addCleanup(os.rmdir, dirname)
568
569 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500570 self.check_file_attributes(result)
571 self.assertEqual(
572 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
573 stat.FILE_ATTRIBUTE_DIRECTORY)
574
Berker Peksag0b4dc482016-09-17 15:49:59 +0300575 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
576 def test_access_denied(self):
577 # Default to FindFirstFile WIN32_FIND_DATA when access is
578 # denied. See issue 28075.
579 # os.environ['TEMP'] should be located on a volume that
580 # supports file ACLs.
581 fname = os.path.join(os.environ['TEMP'], self.fname)
582 self.addCleanup(support.unlink, fname)
583 create_file(fname, b'ABC')
584 # Deny the right to [S]YNCHRONIZE on the file to
585 # force CreateFile to fail with ERROR_ACCESS_DENIED.
586 DETACHED_PROCESS = 8
587 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500588 # bpo-30584: Use security identifier *S-1-5-32-545 instead
589 # of localized "Users" to not depend on the locale.
590 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300591 creationflags=DETACHED_PROCESS
592 )
593 result = os.stat(fname)
594 self.assertNotEqual(result.st_size, 0)
595
Steve Dower772ec0f2019-09-04 14:42:54 -0700596 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
597 def test_stat_block_device(self):
598 # bpo-38030: os.stat fails for block devices
599 # Test a filename like "//./C:"
600 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
601 result = os.stat(fname)
602 self.assertEqual(result.st_mode, stat.S_IFBLK)
603
Victor Stinner47aacc82015-06-12 17:26:23 +0200604
605class UtimeTests(unittest.TestCase):
606 def setUp(self):
607 self.dirname = support.TESTFN
608 self.fname = os.path.join(self.dirname, "f1")
609
610 self.addCleanup(support.rmtree, self.dirname)
611 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100612 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200613
Victor Stinner47aacc82015-06-12 17:26:23 +0200614 def support_subsecond(self, filename):
615 # Heuristic to check if the filesystem supports timestamp with
616 # subsecond resolution: check if float and int timestamps are different
617 st = os.stat(filename)
618 return ((st.st_atime != st[7])
619 or (st.st_mtime != st[8])
620 or (st.st_ctime != st[9]))
621
622 def _test_utime(self, set_time, filename=None):
623 if not filename:
624 filename = self.fname
625
626 support_subsecond = self.support_subsecond(filename)
627 if support_subsecond:
628 # Timestamp with a resolution of 1 microsecond (10^-6).
629 #
630 # The resolution of the C internal function used by os.utime()
631 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
632 # test with a resolution of 1 ns requires more work:
633 # see the issue #15745.
634 atime_ns = 1002003000 # 1.002003 seconds
635 mtime_ns = 4005006000 # 4.005006 seconds
636 else:
637 # use a resolution of 1 second
638 atime_ns = 5 * 10**9
639 mtime_ns = 8 * 10**9
640
641 set_time(filename, (atime_ns, mtime_ns))
642 st = os.stat(filename)
643
644 if support_subsecond:
645 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
646 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
647 else:
648 self.assertEqual(st.st_atime, atime_ns * 1e-9)
649 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
650 self.assertEqual(st.st_atime_ns, atime_ns)
651 self.assertEqual(st.st_mtime_ns, mtime_ns)
652
653 def test_utime(self):
654 def set_time(filename, ns):
655 # test the ns keyword parameter
656 os.utime(filename, ns=ns)
657 self._test_utime(set_time)
658
659 @staticmethod
660 def ns_to_sec(ns):
661 # Convert a number of nanosecond (int) to a number of seconds (float).
662 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
663 # issue, os.utime() rounds towards minus infinity.
664 return (ns * 1e-9) + 0.5e-9
665
666 def test_utime_by_indexed(self):
667 # pass times as floating point seconds as the second indexed parameter
668 def set_time(filename, ns):
669 atime_ns, mtime_ns = ns
670 atime = self.ns_to_sec(atime_ns)
671 mtime = self.ns_to_sec(mtime_ns)
672 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
673 # or utime(time_t)
674 os.utime(filename, (atime, mtime))
675 self._test_utime(set_time)
676
677 def test_utime_by_times(self):
678 def set_time(filename, ns):
679 atime_ns, mtime_ns = ns
680 atime = self.ns_to_sec(atime_ns)
681 mtime = self.ns_to_sec(mtime_ns)
682 # test the times keyword parameter
683 os.utime(filename, times=(atime, mtime))
684 self._test_utime(set_time)
685
686 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
687 "follow_symlinks support for utime required "
688 "for this test.")
689 def test_utime_nofollow_symlinks(self):
690 def set_time(filename, ns):
691 # use follow_symlinks=False to test utimensat(timespec)
692 # or lutimes(timeval)
693 os.utime(filename, ns=ns, follow_symlinks=False)
694 self._test_utime(set_time)
695
696 @unittest.skipUnless(os.utime in os.supports_fd,
697 "fd support for utime required for this test.")
698 def test_utime_fd(self):
699 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100700 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200701 # use a file descriptor to test futimens(timespec)
702 # or futimes(timeval)
703 os.utime(fp.fileno(), ns=ns)
704 self._test_utime(set_time)
705
706 @unittest.skipUnless(os.utime in os.supports_dir_fd,
707 "dir_fd support for utime required for this test.")
708 def test_utime_dir_fd(self):
709 def set_time(filename, ns):
710 dirname, name = os.path.split(filename)
711 dirfd = os.open(dirname, os.O_RDONLY)
712 try:
713 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
714 os.utime(name, dir_fd=dirfd, ns=ns)
715 finally:
716 os.close(dirfd)
717 self._test_utime(set_time)
718
719 def test_utime_directory(self):
720 def set_time(filename, ns):
721 # test calling os.utime() on a directory
722 os.utime(filename, ns=ns)
723 self._test_utime(set_time, filename=self.dirname)
724
725 def _test_utime_current(self, set_time):
726 # Get the system clock
727 current = time.time()
728
729 # Call os.utime() to set the timestamp to the current system clock
730 set_time(self.fname)
731
732 if not self.support_subsecond(self.fname):
733 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700734 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200735 # On Windows, the usual resolution of time.time() is 15.6 ms.
736 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700737 #
738 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
739 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200740 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200741 st = os.stat(self.fname)
742 msg = ("st_time=%r, current=%r, dt=%r"
743 % (st.st_mtime, current, st.st_mtime - current))
744 self.assertAlmostEqual(st.st_mtime, current,
745 delta=delta, msg=msg)
746
747 def test_utime_current(self):
748 def set_time(filename):
749 # Set to the current time in the new way
750 os.utime(self.fname)
751 self._test_utime_current(set_time)
752
753 def test_utime_current_old(self):
754 def set_time(filename):
755 # Set to the current time in the old explicit way.
756 os.utime(self.fname, None)
757 self._test_utime_current(set_time)
758
759 def get_file_system(self, path):
760 if sys.platform == 'win32':
761 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
762 import ctypes
763 kernel32 = ctypes.windll.kernel32
764 buf = ctypes.create_unicode_buffer("", 100)
765 ok = kernel32.GetVolumeInformationW(root, None, 0,
766 None, None, None,
767 buf, len(buf))
768 if ok:
769 return buf.value
770 # return None if the filesystem is unknown
771
772 def test_large_time(self):
773 # Many filesystems are limited to the year 2038. At least, the test
774 # pass with NTFS filesystem.
775 if self.get_file_system(self.dirname) != "NTFS":
776 self.skipTest("requires NTFS")
777
778 large = 5000000000 # some day in 2128
779 os.utime(self.fname, (large, large))
780 self.assertEqual(os.stat(self.fname).st_mtime, large)
781
782 def test_utime_invalid_arguments(self):
783 # seconds and nanoseconds parameters are mutually exclusive
784 with self.assertRaises(ValueError):
785 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200786 with self.assertRaises(TypeError):
787 os.utime(self.fname, [5, 5])
788 with self.assertRaises(TypeError):
789 os.utime(self.fname, (5,))
790 with self.assertRaises(TypeError):
791 os.utime(self.fname, (5, 5, 5))
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, ns=[5, 5])
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, ns=(5,))
796 with self.assertRaises(TypeError):
797 os.utime(self.fname, ns=(5, 5, 5))
798
799 if os.utime not in os.supports_follow_symlinks:
800 with self.assertRaises(NotImplementedError):
801 os.utime(self.fname, (5, 5), follow_symlinks=False)
802 if os.utime not in os.supports_fd:
803 with open(self.fname, 'wb', 0) as fp:
804 with self.assertRaises(TypeError):
805 os.utime(fp.fileno(), (5, 5))
806 if os.utime not in os.supports_dir_fd:
807 with self.assertRaises(NotImplementedError):
808 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200809
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300810 @support.cpython_only
811 def test_issue31577(self):
812 # The interpreter shouldn't crash in case utime() received a bad
813 # ns argument.
814 def get_bad_int(divmod_ret_val):
815 class BadInt:
816 def __divmod__(*args):
817 return divmod_ret_val
818 return BadInt()
819 with self.assertRaises(TypeError):
820 os.utime(self.fname, ns=(get_bad_int(42), 1))
821 with self.assertRaises(TypeError):
822 os.utime(self.fname, ns=(get_bad_int(()), 1))
823 with self.assertRaises(TypeError):
824 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
825
Victor Stinner47aacc82015-06-12 17:26:23 +0200826
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000827from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000828
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000829class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000830 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000831 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000832
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000833 def setUp(self):
834 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000835 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000836 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000837 for key, value in self._reference().items():
838 os.environ[key] = value
839
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000840 def tearDown(self):
841 os.environ.clear()
842 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000843 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000844 os.environb.clear()
845 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000846
Christian Heimes90333392007-11-01 19:08:42 +0000847 def _reference(self):
848 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
849
850 def _empty_mapping(self):
851 os.environ.clear()
852 return os.environ
853
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000854 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200855 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
856 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000857 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000858 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200860 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300861 value = popen.read().strip()
862 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000863
Xavier de Gayed1415312016-07-22 12:15:29 +0200864 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
865 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000866 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200867 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
868 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300869 it = iter(popen)
870 self.assertEqual(next(it), "line1\n")
871 self.assertEqual(next(it), "line2\n")
872 self.assertEqual(next(it), "line3\n")
873 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000874
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000875 # Verify environ keys and values from the OS are of the
876 # correct str type.
877 def test_keyvalue_types(self):
878 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(type(key), str)
880 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000881
Christian Heimes90333392007-11-01 19:08:42 +0000882 def test_items(self):
883 for key, value in self._reference().items():
884 self.assertEqual(os.environ.get(key), value)
885
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000886 # Issue 7310
887 def test___repr__(self):
888 """Check that the repr() of os.environ looks like environ({...})."""
889 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000890 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
891 '{!r}: {!r}'.format(key, value)
892 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000893
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000894 def test_get_exec_path(self):
895 defpath_list = os.defpath.split(os.pathsep)
896 test_path = ['/monty', '/python', '', '/flying/circus']
897 test_env = {'PATH': os.pathsep.join(test_path)}
898
899 saved_environ = os.environ
900 try:
901 os.environ = dict(test_env)
902 # Test that defaulting to os.environ works.
903 self.assertSequenceEqual(test_path, os.get_exec_path())
904 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
905 finally:
906 os.environ = saved_environ
907
908 # No PATH environment variable
909 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
910 # Empty PATH environment variable
911 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
912 # Supplied PATH environment variable
913 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
914
Victor Stinnerb745a742010-05-18 17:17:23 +0000915 if os.supports_bytes_environ:
916 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000917 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000918 # ignore BytesWarning warning
919 with warnings.catch_warnings(record=True):
920 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000921 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000922 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000923 pass
924 else:
925 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000926
927 # bytes key and/or value
928 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
929 ['abc'])
930 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
931 ['abc'])
932 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
933 ['abc'])
934
935 @unittest.skipUnless(os.supports_bytes_environ,
936 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000937 def test_environb(self):
938 # os.environ -> os.environb
939 value = 'euro\u20ac'
940 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000941 value_bytes = value.encode(sys.getfilesystemencoding(),
942 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000943 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000944 msg = "U+20AC character is not encodable to %s" % (
945 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000946 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000947 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000948 self.assertEqual(os.environ['unicode'], value)
949 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000950
951 # os.environb -> os.environ
952 value = b'\xff'
953 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000955 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000956 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000957
Victor Stinner161e7b32020-01-24 11:53:44 +0100958 def test_putenv_unsetenv(self):
959 name = "PYTHONTESTVAR"
960 value = "testvalue"
961 code = f'import os; print(repr(os.environ.get({name!r})))'
962
963 with support.EnvironmentVarGuard() as env:
964 env.pop(name, None)
965
966 os.putenv(name, value)
967 proc = subprocess.run([sys.executable, '-c', code], check=True,
968 stdout=subprocess.PIPE, text=True)
969 self.assertEqual(proc.stdout.rstrip(), repr(value))
970
971 os.unsetenv(name)
972 proc = subprocess.run([sys.executable, '-c', code], check=True,
973 stdout=subprocess.PIPE, text=True)
974 self.assertEqual(proc.stdout.rstrip(), repr(None))
975
Victor Stinner13ff2452018-01-22 18:32:50 +0100976 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100977 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100978 def test_putenv_unsetenv_error(self):
979 # Empty variable name is invalid.
980 # "=" and null character are not allowed in a variable name.
981 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
982 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
983 self.assertRaises((OSError, ValueError), os.unsetenv, name)
984
Victor Stinnerb73dd022020-01-22 21:11:17 +0100985 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100986 # On Windows, an environment variable string ("name=value" string)
987 # is limited to 32,767 characters
988 longstr = 'x' * 32_768
989 self.assertRaises(ValueError, os.putenv, longstr, "1")
990 self.assertRaises(ValueError, os.putenv, "X", longstr)
991 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +0100992
Victor Stinner6d101392013-04-14 16:35:04 +0200993 def test_key_type(self):
994 missing = 'missingkey'
995 self.assertNotIn(missing, os.environ)
996
Victor Stinner839e5ea2013-04-14 16:43:03 +0200997 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200998 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200999 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001000 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001001
Victor Stinner839e5ea2013-04-14 16:43:03 +02001002 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001003 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001004 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001005 self.assertTrue(cm.exception.__suppress_context__)
1006
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001007 def _test_environ_iteration(self, collection):
1008 iterator = iter(collection)
1009 new_key = "__new_key__"
1010
1011 next(iterator) # start iteration over os.environ.items
1012
1013 # add a new key in os.environ mapping
1014 os.environ[new_key] = "test_environ_iteration"
1015
1016 try:
1017 next(iterator) # force iteration over modified mapping
1018 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1019 finally:
1020 del os.environ[new_key]
1021
1022 def test_iter_error_when_changing_os_environ(self):
1023 self._test_environ_iteration(os.environ)
1024
1025 def test_iter_error_when_changing_os_environ_items(self):
1026 self._test_environ_iteration(os.environ.items())
1027
1028 def test_iter_error_when_changing_os_environ_values(self):
1029 self._test_environ_iteration(os.environ.values())
1030
Charles Burklandd648ef12020-03-13 09:04:43 -07001031 def _test_underlying_process_env(self, var, expected):
1032 if not (unix_shell and os.path.exists(unix_shell)):
1033 return
1034
1035 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1036 value = popen.read().strip()
1037
1038 self.assertEqual(expected, value)
1039
1040 def test_or_operator(self):
1041 overridden_key = '_TEST_VAR_'
1042 original_value = 'original_value'
1043 os.environ[overridden_key] = original_value
1044
1045 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1046 expected = dict(os.environ)
1047 expected.update(new_vars_dict)
1048
1049 actual = os.environ | new_vars_dict
1050 self.assertDictEqual(expected, actual)
1051 self.assertEqual('3', actual[overridden_key])
1052
1053 new_vars_items = new_vars_dict.items()
1054 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1055
1056 self._test_underlying_process_env('_A_', '')
1057 self._test_underlying_process_env(overridden_key, original_value)
1058
1059 def test_ior_operator(self):
1060 overridden_key = '_TEST_VAR_'
1061 os.environ[overridden_key] = 'original_value'
1062
1063 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1064 expected = dict(os.environ)
1065 expected.update(new_vars_dict)
1066
1067 os.environ |= new_vars_dict
1068 self.assertEqual(expected, os.environ)
1069 self.assertEqual('3', os.environ[overridden_key])
1070
1071 self._test_underlying_process_env('_A_', '1')
1072 self._test_underlying_process_env(overridden_key, '3')
1073
1074 def test_ior_operator_invalid_dicts(self):
1075 os_environ_copy = os.environ.copy()
1076 with self.assertRaises(TypeError):
1077 dict_with_bad_key = {1: '_A_'}
1078 os.environ |= dict_with_bad_key
1079
1080 with self.assertRaises(TypeError):
1081 dict_with_bad_val = {'_A_': 1}
1082 os.environ |= dict_with_bad_val
1083
1084 # Check nothing was added.
1085 self.assertEqual(os_environ_copy, os.environ)
1086
1087 def test_ior_operator_key_value_iterable(self):
1088 overridden_key = '_TEST_VAR_'
1089 os.environ[overridden_key] = 'original_value'
1090
1091 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1092 expected = dict(os.environ)
1093 expected.update(new_vars_items)
1094
1095 os.environ |= new_vars_items
1096 self.assertEqual(expected, os.environ)
1097 self.assertEqual('3', os.environ[overridden_key])
1098
1099 self._test_underlying_process_env('_A_', '1')
1100 self._test_underlying_process_env(overridden_key, '3')
1101
1102 def test_ror_operator(self):
1103 overridden_key = '_TEST_VAR_'
1104 original_value = 'original_value'
1105 os.environ[overridden_key] = original_value
1106
1107 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1108 expected = dict(new_vars_dict)
1109 expected.update(os.environ)
1110
1111 actual = new_vars_dict | os.environ
1112 self.assertDictEqual(expected, actual)
1113 self.assertEqual(original_value, actual[overridden_key])
1114
1115 new_vars_items = new_vars_dict.items()
1116 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1117
1118 self._test_underlying_process_env('_A_', '')
1119 self._test_underlying_process_env(overridden_key, original_value)
1120
Victor Stinner6d101392013-04-14 16:35:04 +02001121
Tim Petersc4e09402003-04-25 07:11:48 +00001122class WalkTests(unittest.TestCase):
1123 """Tests for os.walk()."""
1124
Victor Stinner0561c532015-03-12 10:28:24 +01001125 # Wrapper to hide minor differences between os.walk and os.fwalk
1126 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001127 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001128 if 'follow_symlinks' in kwargs:
1129 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001130 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001131
Charles-François Natali7372b062012-02-05 15:15:38 +01001132 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001133 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001134 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001135
1136 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001137 # TESTFN/
1138 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001139 # tmp1
1140 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001141 # tmp2
1142 # SUB11/ no kids
1143 # SUB2/ a file kid and a dirsymlink kid
1144 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001145 # SUB21/ not readable
1146 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001147 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001148 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001149 # broken_link2
1150 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001151 # TEST2/
1152 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001153 self.walk_path = join(support.TESTFN, "TEST1")
1154 self.sub1_path = join(self.walk_path, "SUB1")
1155 self.sub11_path = join(self.sub1_path, "SUB11")
1156 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001157 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001158 tmp1_path = join(self.walk_path, "tmp1")
1159 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001160 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001161 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001162 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001163 t2_path = join(support.TESTFN, "TEST2")
1164 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001165 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001166 broken_link2_path = join(sub2_path, "broken_link2")
1167 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001168
1169 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001170 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001171 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001172 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001173 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001174
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001175 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001176 with open(path, "x") as f:
1177 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001178
Victor Stinner0561c532015-03-12 10:28:24 +01001179 if support.can_symlink():
1180 os.symlink(os.path.abspath(t2_path), self.link_path)
1181 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001182 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1183 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001184 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001185 ["broken_link", "broken_link2", "broken_link3",
1186 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001187 else:
pxinwr3e028b22019-02-15 13:04:47 +08001188 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001189
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001190 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001191 try:
1192 os.listdir(sub21_path)
1193 except PermissionError:
1194 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1195 else:
1196 os.chmod(sub21_path, stat.S_IRWXU)
1197 os.unlink(tmp5_path)
1198 os.rmdir(sub21_path)
1199 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001200
Victor Stinner0561c532015-03-12 10:28:24 +01001201 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001202 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001203 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001204
Tim Petersc4e09402003-04-25 07:11:48 +00001205 self.assertEqual(len(all), 4)
1206 # We can't know which order SUB1 and SUB2 will appear in.
1207 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1208 # flipped: TESTFN, SUB2, SUB1, SUB11
1209 flipped = all[0][1][0] != "SUB1"
1210 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001211 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001212 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001213 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1214 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1215 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1216 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001217
Brett Cannon3f9183b2016-08-26 14:44:48 -07001218 def test_walk_prune(self, walk_path=None):
1219 if walk_path is None:
1220 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001221 # Prune the search.
1222 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001223 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001224 all.append((root, dirs, files))
1225 # Don't descend into SUB1.
1226 if 'SUB1' in dirs:
1227 # Note that this also mutates the dirs we appended to all!
1228 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001229
Victor Stinner0561c532015-03-12 10:28:24 +01001230 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001231 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001232
1233 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001234 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001235 self.assertEqual(all[1], self.sub2_tree)
1236
Brett Cannon3f9183b2016-08-26 14:44:48 -07001237 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001238 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001239
Victor Stinner0561c532015-03-12 10:28:24 +01001240 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001241 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001242 all = list(self.walk(self.walk_path, topdown=False))
1243
Victor Stinner53b0a412016-03-26 01:12:36 +01001244 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001245 # We can't know which order SUB1 and SUB2 will appear in.
1246 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1247 # flipped: SUB2, SUB11, SUB1, TESTFN
1248 flipped = all[3][1][0] != "SUB1"
1249 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001250 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001251 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001252 self.assertEqual(all[3],
1253 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1254 self.assertEqual(all[flipped],
1255 (self.sub11_path, [], []))
1256 self.assertEqual(all[flipped + 1],
1257 (self.sub1_path, ["SUB11"], ["tmp2"]))
1258 self.assertEqual(all[2 - 2 * flipped],
1259 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001260
Victor Stinner0561c532015-03-12 10:28:24 +01001261 def test_walk_symlink(self):
1262 if not support.can_symlink():
1263 self.skipTest("need symlink support")
1264
1265 # Walk, following symlinks.
1266 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1267 for root, dirs, files in walk_it:
1268 if root == self.link_path:
1269 self.assertEqual(dirs, [])
1270 self.assertEqual(files, ["tmp4"])
1271 break
1272 else:
1273 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001274
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001275 def test_walk_bad_dir(self):
1276 # Walk top-down.
1277 errors = []
1278 walk_it = self.walk(self.walk_path, onerror=errors.append)
1279 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001280 self.assertEqual(errors, [])
1281 dir1 = 'SUB1'
1282 path1 = os.path.join(root, dir1)
1283 path1new = os.path.join(root, dir1 + '.new')
1284 os.rename(path1, path1new)
1285 try:
1286 roots = [r for r, d, f in walk_it]
1287 self.assertTrue(errors)
1288 self.assertNotIn(path1, roots)
1289 self.assertNotIn(path1new, roots)
1290 for dir2 in dirs:
1291 if dir2 != dir1:
1292 self.assertIn(os.path.join(root, dir2), roots)
1293 finally:
1294 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001295
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001296 def test_walk_many_open_files(self):
1297 depth = 30
1298 base = os.path.join(support.TESTFN, 'deep')
1299 p = os.path.join(base, *(['d']*depth))
1300 os.makedirs(p)
1301
1302 iters = [self.walk(base, topdown=False) for j in range(100)]
1303 for i in range(depth + 1):
1304 expected = (p, ['d'] if i else [], [])
1305 for it in iters:
1306 self.assertEqual(next(it), expected)
1307 p = os.path.dirname(p)
1308
1309 iters = [self.walk(base, topdown=True) for j in range(100)]
1310 p = base
1311 for i in range(depth + 1):
1312 expected = (p, ['d'] if i < depth else [], [])
1313 for it in iters:
1314 self.assertEqual(next(it), expected)
1315 p = os.path.join(p, 'd')
1316
Charles-François Natali7372b062012-02-05 15:15:38 +01001317
1318@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1319class FwalkTests(WalkTests):
1320 """Tests for os.fwalk()."""
1321
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001322 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001323 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001324 yield (root, dirs, files)
1325
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001326 def fwalk(self, *args, **kwargs):
1327 return os.fwalk(*args, **kwargs)
1328
Larry Hastingsc48fe982012-06-25 04:49:05 -07001329 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1330 """
1331 compare with walk() results.
1332 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001333 walk_kwargs = walk_kwargs.copy()
1334 fwalk_kwargs = fwalk_kwargs.copy()
1335 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1336 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1337 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001338
Charles-François Natali7372b062012-02-05 15:15:38 +01001339 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001340 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001341 expected[root] = (set(dirs), set(files))
1342
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001343 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001344 self.assertIn(root, expected)
1345 self.assertEqual(expected[root], (set(dirs), set(files)))
1346
Larry Hastingsc48fe982012-06-25 04:49:05 -07001347 def test_compare_to_walk(self):
1348 kwargs = {'top': support.TESTFN}
1349 self._compare_to_walk(kwargs, kwargs)
1350
Charles-François Natali7372b062012-02-05 15:15:38 +01001351 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001352 try:
1353 fd = os.open(".", os.O_RDONLY)
1354 walk_kwargs = {'top': support.TESTFN}
1355 fwalk_kwargs = walk_kwargs.copy()
1356 fwalk_kwargs['dir_fd'] = fd
1357 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1358 finally:
1359 os.close(fd)
1360
1361 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001362 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001363 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1364 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001365 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001366 # check that the FD is valid
1367 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001368 # redundant check
1369 os.stat(rootfd)
1370 # check that listdir() returns consistent information
1371 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001372
1373 def test_fd_leak(self):
1374 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1375 # we both check that calling fwalk() a large number of times doesn't
1376 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1377 minfd = os.dup(1)
1378 os.close(minfd)
1379 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001380 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001381 pass
1382 newfd = os.dup(1)
1383 self.addCleanup(os.close, newfd)
1384 self.assertEqual(newfd, minfd)
1385
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001386 # fwalk() keeps file descriptors open
1387 test_walk_many_open_files = None
1388
1389
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001390class BytesWalkTests(WalkTests):
1391 """Tests for os.walk() with bytes."""
1392 def walk(self, top, **kwargs):
1393 if 'follow_symlinks' in kwargs:
1394 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1395 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1396 root = os.fsdecode(broot)
1397 dirs = list(map(os.fsdecode, bdirs))
1398 files = list(map(os.fsdecode, bfiles))
1399 yield (root, dirs, files)
1400 bdirs[:] = list(map(os.fsencode, dirs))
1401 bfiles[:] = list(map(os.fsencode, files))
1402
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001403@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1404class BytesFwalkTests(FwalkTests):
1405 """Tests for os.walk() with bytes."""
1406 def fwalk(self, top='.', *args, **kwargs):
1407 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1408 root = os.fsdecode(broot)
1409 dirs = list(map(os.fsdecode, bdirs))
1410 files = list(map(os.fsdecode, bfiles))
1411 yield (root, dirs, files, topfd)
1412 bdirs[:] = list(map(os.fsencode, dirs))
1413 bfiles[:] = list(map(os.fsencode, files))
1414
Charles-François Natali7372b062012-02-05 15:15:38 +01001415
Guido van Rossume7ba4952007-06-06 23:52:48 +00001416class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001417 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001418 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001419
1420 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001421 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001422 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1423 os.makedirs(path) # Should work
1424 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1425 os.makedirs(path)
1426
1427 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001428 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001429 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1430 os.makedirs(path)
1431 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1432 'dir5', 'dir6')
1433 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001434
Serhiy Storchakae304e332017-03-24 13:27:42 +02001435 def test_mode(self):
1436 with support.temp_umask(0o002):
1437 base = support.TESTFN
1438 parent = os.path.join(base, 'dir1')
1439 path = os.path.join(parent, 'dir2')
1440 os.makedirs(path, 0o555)
1441 self.assertTrue(os.path.exists(path))
1442 self.assertTrue(os.path.isdir(path))
1443 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001444 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1445 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001446
Terry Reedy5a22b652010-12-02 07:05:56 +00001447 def test_exist_ok_existing_directory(self):
1448 path = os.path.join(support.TESTFN, 'dir1')
1449 mode = 0o777
1450 old_mask = os.umask(0o022)
1451 os.makedirs(path, mode)
1452 self.assertRaises(OSError, os.makedirs, path, mode)
1453 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001454 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001455 os.makedirs(path, mode=mode, exist_ok=True)
1456 os.umask(old_mask)
1457
Martin Pantera82642f2015-11-19 04:48:44 +00001458 # Issue #25583: A drive root could raise PermissionError on Windows
1459 os.makedirs(os.path.abspath('/'), exist_ok=True)
1460
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001461 def test_exist_ok_s_isgid_directory(self):
1462 path = os.path.join(support.TESTFN, 'dir1')
1463 S_ISGID = stat.S_ISGID
1464 mode = 0o777
1465 old_mask = os.umask(0o022)
1466 try:
1467 existing_testfn_mode = stat.S_IMODE(
1468 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001469 try:
1470 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001471 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001472 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001473 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1474 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1475 # The os should apply S_ISGID from the parent dir for us, but
1476 # this test need not depend on that behavior. Be explicit.
1477 os.makedirs(path, mode | S_ISGID)
1478 # http://bugs.python.org/issue14992
1479 # Should not fail when the bit is already set.
1480 os.makedirs(path, mode, exist_ok=True)
1481 # remove the bit.
1482 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001483 # May work even when the bit is not already set when demanded.
1484 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001485 finally:
1486 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001487
1488 def test_exist_ok_existing_regular_file(self):
1489 base = support.TESTFN
1490 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001491 with open(path, 'w') as f:
1492 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001493 self.assertRaises(OSError, os.makedirs, path)
1494 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1495 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1496 os.remove(path)
1497
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001498 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001499 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001500 'dir4', 'dir5', 'dir6')
1501 # If the tests failed, the bottom-most directory ('../dir6')
1502 # may not have been created, so we look for the outermost directory
1503 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001504 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001505 path = os.path.dirname(path)
1506
1507 os.removedirs(path)
1508
Andrew Svetlov405faed2012-12-25 12:18:09 +02001509
R David Murrayf2ad1732014-12-25 18:36:56 -05001510@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1511class ChownFileTests(unittest.TestCase):
1512
Berker Peksag036a71b2015-07-21 09:29:48 +03001513 @classmethod
1514 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001515 os.mkdir(support.TESTFN)
1516
1517 def test_chown_uid_gid_arguments_must_be_index(self):
1518 stat = os.stat(support.TESTFN)
1519 uid = stat.st_uid
1520 gid = stat.st_gid
1521 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1522 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1523 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1524 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1525 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1526
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001527 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1528 def test_chown_gid(self):
1529 groups = os.getgroups()
1530 if len(groups) < 2:
1531 self.skipTest("test needs at least 2 groups")
1532
R David Murrayf2ad1732014-12-25 18:36:56 -05001533 gid_1, gid_2 = groups[:2]
1534 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001535
R David Murrayf2ad1732014-12-25 18:36:56 -05001536 os.chown(support.TESTFN, uid, gid_1)
1537 gid = os.stat(support.TESTFN).st_gid
1538 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001539
R David Murrayf2ad1732014-12-25 18:36:56 -05001540 os.chown(support.TESTFN, uid, gid_2)
1541 gid = os.stat(support.TESTFN).st_gid
1542 self.assertEqual(gid, gid_2)
1543
1544 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1545 "test needs root privilege and more than one user")
1546 def test_chown_with_root(self):
1547 uid_1, uid_2 = all_users[:2]
1548 gid = os.stat(support.TESTFN).st_gid
1549 os.chown(support.TESTFN, uid_1, gid)
1550 uid = os.stat(support.TESTFN).st_uid
1551 self.assertEqual(uid, uid_1)
1552 os.chown(support.TESTFN, uid_2, gid)
1553 uid = os.stat(support.TESTFN).st_uid
1554 self.assertEqual(uid, uid_2)
1555
1556 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1557 "test needs non-root account and more than one user")
1558 def test_chown_without_permission(self):
1559 uid_1, uid_2 = all_users[:2]
1560 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001561 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001562 os.chown(support.TESTFN, uid_1, gid)
1563 os.chown(support.TESTFN, uid_2, gid)
1564
Berker Peksag036a71b2015-07-21 09:29:48 +03001565 @classmethod
1566 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001567 os.rmdir(support.TESTFN)
1568
1569
Andrew Svetlov405faed2012-12-25 12:18:09 +02001570class RemoveDirsTests(unittest.TestCase):
1571 def setUp(self):
1572 os.makedirs(support.TESTFN)
1573
1574 def tearDown(self):
1575 support.rmtree(support.TESTFN)
1576
1577 def test_remove_all(self):
1578 dira = os.path.join(support.TESTFN, 'dira')
1579 os.mkdir(dira)
1580 dirb = os.path.join(dira, 'dirb')
1581 os.mkdir(dirb)
1582 os.removedirs(dirb)
1583 self.assertFalse(os.path.exists(dirb))
1584 self.assertFalse(os.path.exists(dira))
1585 self.assertFalse(os.path.exists(support.TESTFN))
1586
1587 def test_remove_partial(self):
1588 dira = os.path.join(support.TESTFN, 'dira')
1589 os.mkdir(dira)
1590 dirb = os.path.join(dira, 'dirb')
1591 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001592 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001593 os.removedirs(dirb)
1594 self.assertFalse(os.path.exists(dirb))
1595 self.assertTrue(os.path.exists(dira))
1596 self.assertTrue(os.path.exists(support.TESTFN))
1597
1598 def test_remove_nothing(self):
1599 dira = os.path.join(support.TESTFN, 'dira')
1600 os.mkdir(dira)
1601 dirb = os.path.join(dira, 'dirb')
1602 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001603 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001604 with self.assertRaises(OSError):
1605 os.removedirs(dirb)
1606 self.assertTrue(os.path.exists(dirb))
1607 self.assertTrue(os.path.exists(dira))
1608 self.assertTrue(os.path.exists(support.TESTFN))
1609
1610
Guido van Rossume7ba4952007-06-06 23:52:48 +00001611class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001612 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001613 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001614 f.write(b'hello')
1615 f.close()
1616 with open(os.devnull, 'rb') as f:
1617 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001618
Andrew Svetlov405faed2012-12-25 12:18:09 +02001619
Guido van Rossume7ba4952007-06-06 23:52:48 +00001620class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001621 def test_urandom_length(self):
1622 self.assertEqual(len(os.urandom(0)), 0)
1623 self.assertEqual(len(os.urandom(1)), 1)
1624 self.assertEqual(len(os.urandom(10)), 10)
1625 self.assertEqual(len(os.urandom(100)), 100)
1626 self.assertEqual(len(os.urandom(1000)), 1000)
1627
1628 def test_urandom_value(self):
1629 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001630 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001631 data2 = os.urandom(16)
1632 self.assertNotEqual(data1, data2)
1633
1634 def get_urandom_subprocess(self, count):
1635 code = '\n'.join((
1636 'import os, sys',
1637 'data = os.urandom(%s)' % count,
1638 'sys.stdout.buffer.write(data)',
1639 'sys.stdout.buffer.flush()'))
1640 out = assert_python_ok('-c', code)
1641 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001642 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001643 return stdout
1644
1645 def test_urandom_subprocess(self):
1646 data1 = self.get_urandom_subprocess(16)
1647 data2 = self.get_urandom_subprocess(16)
1648 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001649
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001650
Victor Stinner9b1f4742016-09-06 16:18:52 -07001651@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1652class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001653 @classmethod
1654 def setUpClass(cls):
1655 try:
1656 os.getrandom(1)
1657 except OSError as exc:
1658 if exc.errno == errno.ENOSYS:
1659 # Python compiled on a more recent Linux version
1660 # than the current Linux kernel
1661 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1662 else:
1663 raise
1664
Victor Stinner9b1f4742016-09-06 16:18:52 -07001665 def test_getrandom_type(self):
1666 data = os.getrandom(16)
1667 self.assertIsInstance(data, bytes)
1668 self.assertEqual(len(data), 16)
1669
1670 def test_getrandom0(self):
1671 empty = os.getrandom(0)
1672 self.assertEqual(empty, b'')
1673
1674 def test_getrandom_random(self):
1675 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1676
1677 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1678 # resource /dev/random
1679
1680 def test_getrandom_nonblock(self):
1681 # The call must not fail. Check also that the flag exists
1682 try:
1683 os.getrandom(1, os.GRND_NONBLOCK)
1684 except BlockingIOError:
1685 # System urandom is not initialized yet
1686 pass
1687
1688 def test_getrandom_value(self):
1689 data1 = os.getrandom(16)
1690 data2 = os.getrandom(16)
1691 self.assertNotEqual(data1, data2)
1692
1693
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001694# os.urandom() doesn't use a file descriptor when it is implemented with the
1695# getentropy() function, the getrandom() function or the getrandom() syscall
1696OS_URANDOM_DONT_USE_FD = (
1697 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1698 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1699 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001700
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001701@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1702 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001703@unittest.skipIf(sys.platform == "vxworks",
1704 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001705class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001706 @unittest.skipUnless(resource, "test requires the resource module")
1707 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001708 # Check urandom() failing when it is not able to open /dev/random.
1709 # We spawn a new process to make the test more robust (if getrlimit()
1710 # failed to restore the file descriptor limit after this, the whole
1711 # test suite would crash; this actually happened on the OS X Tiger
1712 # buildbot).
1713 code = """if 1:
1714 import errno
1715 import os
1716 import resource
1717
1718 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1719 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1720 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001721 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001722 except OSError as e:
1723 assert e.errno == errno.EMFILE, e.errno
1724 else:
1725 raise AssertionError("OSError not raised")
1726 """
1727 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001728
Antoine Pitroue472aea2014-04-26 14:33:03 +02001729 def test_urandom_fd_closed(self):
1730 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1731 # closed.
1732 code = """if 1:
1733 import os
1734 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001735 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001736 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001737 with test.support.SuppressCrashReport():
1738 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001739 sys.stdout.buffer.write(os.urandom(4))
1740 """
1741 rc, out, err = assert_python_ok('-Sc', code)
1742
1743 def test_urandom_fd_reopened(self):
1744 # Issue #21207: urandom() should detect its fd to /dev/urandom
1745 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001746 self.addCleanup(support.unlink, support.TESTFN)
1747 create_file(support.TESTFN, b"x" * 256)
1748
Antoine Pitroue472aea2014-04-26 14:33:03 +02001749 code = """if 1:
1750 import os
1751 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001752 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001753 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001754 with test.support.SuppressCrashReport():
1755 for fd in range(3, 256):
1756 try:
1757 os.close(fd)
1758 except OSError:
1759 pass
1760 else:
1761 # Found the urandom fd (XXX hopefully)
1762 break
1763 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001764 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001765 new_fd = f.fileno()
1766 # Issue #26935: posix allows new_fd and fd to be equal but
1767 # some libc implementations have dup2 return an error in this
1768 # case.
1769 if new_fd != fd:
1770 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001771 sys.stdout.buffer.write(os.urandom(4))
1772 sys.stdout.buffer.write(os.urandom(4))
1773 """.format(TESTFN=support.TESTFN)
1774 rc, out, err = assert_python_ok('-Sc', code)
1775 self.assertEqual(len(out), 8)
1776 self.assertNotEqual(out[0:4], out[4:8])
1777 rc, out2, err2 = assert_python_ok('-Sc', code)
1778 self.assertEqual(len(out2), 8)
1779 self.assertNotEqual(out2, out)
1780
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001781
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001782@contextlib.contextmanager
1783def _execvpe_mockup(defpath=None):
1784 """
1785 Stubs out execv and execve functions when used as context manager.
1786 Records exec calls. The mock execv and execve functions always raise an
1787 exception as they would normally never return.
1788 """
1789 # A list of tuples containing (function name, first arg, args)
1790 # of calls to execv or execve that have been made.
1791 calls = []
1792
1793 def mock_execv(name, *args):
1794 calls.append(('execv', name, args))
1795 raise RuntimeError("execv called")
1796
1797 def mock_execve(name, *args):
1798 calls.append(('execve', name, args))
1799 raise OSError(errno.ENOTDIR, "execve called")
1800
1801 try:
1802 orig_execv = os.execv
1803 orig_execve = os.execve
1804 orig_defpath = os.defpath
1805 os.execv = mock_execv
1806 os.execve = mock_execve
1807 if defpath is not None:
1808 os.defpath = defpath
1809 yield calls
1810 finally:
1811 os.execv = orig_execv
1812 os.execve = orig_execve
1813 os.defpath = orig_defpath
1814
pxinwrf2d7ac72019-05-21 18:46:37 +08001815@unittest.skipUnless(hasattr(os, 'execv'),
1816 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001817class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001818 @unittest.skipIf(USING_LINUXTHREADS,
1819 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001820 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001821 self.assertRaises(OSError, os.execvpe, 'no such app-',
1822 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001823
Steve Dowerbce26262016-11-19 19:17:26 -08001824 def test_execv_with_bad_arglist(self):
1825 self.assertRaises(ValueError, os.execv, 'notepad', ())
1826 self.assertRaises(ValueError, os.execv, 'notepad', [])
1827 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1828 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1829
Thomas Heller6790d602007-08-30 17:15:14 +00001830 def test_execvpe_with_bad_arglist(self):
1831 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001832 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1833 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001834
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001835 @unittest.skipUnless(hasattr(os, '_execvpe'),
1836 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001837 def _test_internal_execvpe(self, test_type):
1838 program_path = os.sep + 'absolutepath'
1839 if test_type is bytes:
1840 program = b'executable'
1841 fullpath = os.path.join(os.fsencode(program_path), program)
1842 native_fullpath = fullpath
1843 arguments = [b'progname', 'arg1', 'arg2']
1844 else:
1845 program = 'executable'
1846 arguments = ['progname', 'arg1', 'arg2']
1847 fullpath = os.path.join(program_path, program)
1848 if os.name != "nt":
1849 native_fullpath = os.fsencode(fullpath)
1850 else:
1851 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001852 env = {'spam': 'beans'}
1853
Victor Stinnerb745a742010-05-18 17:17:23 +00001854 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001855 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001856 self.assertRaises(RuntimeError,
1857 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001858 self.assertEqual(len(calls), 1)
1859 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1860
Victor Stinnerb745a742010-05-18 17:17:23 +00001861 # test os._execvpe() with a relative path:
1862 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001863 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001864 self.assertRaises(OSError,
1865 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001866 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001867 self.assertSequenceEqual(calls[0],
1868 ('execve', native_fullpath, (arguments, env)))
1869
1870 # test os._execvpe() with a relative path:
1871 # os.get_exec_path() reads the 'PATH' variable
1872 with _execvpe_mockup() as calls:
1873 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001874 if test_type is bytes:
1875 env_path[b'PATH'] = program_path
1876 else:
1877 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001878 self.assertRaises(OSError,
1879 os._execvpe, program, arguments, env=env_path)
1880 self.assertEqual(len(calls), 1)
1881 self.assertSequenceEqual(calls[0],
1882 ('execve', native_fullpath, (arguments, env_path)))
1883
1884 def test_internal_execvpe_str(self):
1885 self._test_internal_execvpe(str)
1886 if os.name != "nt":
1887 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001888
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001889 def test_execve_invalid_env(self):
1890 args = [sys.executable, '-c', 'pass']
1891
Ville Skyttä49b27342017-08-03 09:00:59 +03001892 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001893 newenv = os.environ.copy()
1894 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1895 with self.assertRaises(ValueError):
1896 os.execve(args[0], args, newenv)
1897
Ville Skyttä49b27342017-08-03 09:00:59 +03001898 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001899 newenv = os.environ.copy()
1900 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1901 with self.assertRaises(ValueError):
1902 os.execve(args[0], args, newenv)
1903
Ville Skyttä49b27342017-08-03 09:00:59 +03001904 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001905 newenv = os.environ.copy()
1906 newenv["FRUIT=ORANGE"] = "lemon"
1907 with self.assertRaises(ValueError):
1908 os.execve(args[0], args, newenv)
1909
Alexey Izbyshev83460312018-10-20 03:28:22 +03001910 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1911 def test_execve_with_empty_path(self):
1912 # bpo-32890: Check GetLastError() misuse
1913 try:
1914 os.execve('', ['arg'], {})
1915 except OSError as e:
1916 self.assertTrue(e.winerror is None or e.winerror != 0)
1917 else:
1918 self.fail('No OSError raised')
1919
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001920
Serhiy Storchaka43767632013-11-03 21:31:38 +02001921@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001922class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001923 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001924 try:
1925 os.stat(support.TESTFN)
1926 except FileNotFoundError:
1927 exists = False
1928 except OSError as exc:
1929 exists = True
1930 self.fail("file %s must not exist; os.stat failed with %s"
1931 % (support.TESTFN, exc))
1932 else:
1933 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001934
Thomas Wouters477c8d52006-05-27 19:21:47 +00001935 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001936 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001937
1938 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001939 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001940
1941 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001942 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001943
1944 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001945 self.addCleanup(support.unlink, support.TESTFN)
1946
Victor Stinnere77c9742016-03-25 10:28:23 +01001947 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001948 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001949
1950 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001951 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001952
Thomas Wouters477c8d52006-05-27 19:21:47 +00001953 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001954 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001955
Victor Stinnere77c9742016-03-25 10:28:23 +01001956
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001957class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001958 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001959 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1960 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001961 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001962 def get_single(f):
1963 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001964 if hasattr(os, f):
1965 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001966 return helper
1967 for f in singles:
1968 locals()["test_"+f] = get_single(f)
1969
Benjamin Peterson7522c742009-01-19 21:00:09 +00001970 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001971 try:
1972 f(support.make_bad_fd(), *args)
1973 except OSError as e:
1974 self.assertEqual(e.errno, errno.EBADF)
1975 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001976 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001977 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001978
Serhiy Storchaka43767632013-11-03 21:31:38 +02001979 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001980 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001981 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001982
Serhiy Storchaka43767632013-11-03 21:31:38 +02001983 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001984 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001985 fd = support.make_bad_fd()
1986 # Make sure none of the descriptors we are about to close are
1987 # currently valid (issue 6542).
1988 for i in range(10):
1989 try: os.fstat(fd+i)
1990 except OSError:
1991 pass
1992 else:
1993 break
1994 if i < 2:
1995 raise unittest.SkipTest(
1996 "Unable to acquire a range of invalid file descriptors")
1997 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001998
Serhiy Storchaka43767632013-11-03 21:31:38 +02001999 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002000 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002001 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002002
Serhiy Storchaka43767632013-11-03 21:31:38 +02002003 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002004 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002005 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002006
Serhiy Storchaka43767632013-11-03 21:31:38 +02002007 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002008 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002009 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002010
Serhiy Storchaka43767632013-11-03 21:31:38 +02002011 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002012 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002013 self.check(os.pathconf, "PC_NAME_MAX")
2014 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002015
Serhiy Storchaka43767632013-11-03 21:31:38 +02002016 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002017 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002018 self.check(os.truncate, 0)
2019 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002020
Serhiy Storchaka43767632013-11-03 21:31:38 +02002021 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002022 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002023 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002024
Serhiy Storchaka43767632013-11-03 21:31:38 +02002025 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002026 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002027 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002028
Victor Stinner57ddf782014-01-08 15:21:28 +01002029 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2030 def test_readv(self):
2031 buf = bytearray(10)
2032 self.check(os.readv, [buf])
2033
Serhiy Storchaka43767632013-11-03 21:31:38 +02002034 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002035 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002036 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002037
Serhiy Storchaka43767632013-11-03 21:31:38 +02002038 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002039 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002040 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002041
Victor Stinner57ddf782014-01-08 15:21:28 +01002042 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2043 def test_writev(self):
2044 self.check(os.writev, [b'abc'])
2045
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002046 def test_inheritable(self):
2047 self.check(os.get_inheritable)
2048 self.check(os.set_inheritable, True)
2049
2050 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2051 'needs os.get_blocking() and os.set_blocking()')
2052 def test_blocking(self):
2053 self.check(os.get_blocking)
2054 self.check(os.set_blocking, True)
2055
Brian Curtin1b9df392010-11-24 20:24:31 +00002056
2057class LinkTests(unittest.TestCase):
2058 def setUp(self):
2059 self.file1 = support.TESTFN
2060 self.file2 = os.path.join(support.TESTFN + "2")
2061
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002062 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002063 for file in (self.file1, self.file2):
2064 if os.path.exists(file):
2065 os.unlink(file)
2066
Brian Curtin1b9df392010-11-24 20:24:31 +00002067 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002068 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002069
xdegaye6a55d092017-11-12 17:57:04 +01002070 try:
2071 os.link(file1, file2)
2072 except PermissionError as e:
2073 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002074 with open(file1, "r") as f1, open(file2, "r") as f2:
2075 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2076
2077 def test_link(self):
2078 self._test_link(self.file1, self.file2)
2079
2080 def test_link_bytes(self):
2081 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2082 bytes(self.file2, sys.getfilesystemencoding()))
2083
Brian Curtinf498b752010-11-30 15:54:04 +00002084 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002085 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002086 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002087 except UnicodeError:
2088 raise unittest.SkipTest("Unable to encode for this platform.")
2089
Brian Curtinf498b752010-11-30 15:54:04 +00002090 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002091 self.file2 = self.file1 + "2"
2092 self._test_link(self.file1, self.file2)
2093
Serhiy Storchaka43767632013-11-03 21:31:38 +02002094@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2095class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002096 # uid_t and gid_t are 32-bit unsigned integers on Linux
2097 UID_OVERFLOW = (1 << 32)
2098 GID_OVERFLOW = (1 << 32)
2099
Serhiy Storchaka43767632013-11-03 21:31:38 +02002100 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2101 def test_setuid(self):
2102 if os.getuid() != 0:
2103 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002104 self.assertRaises(TypeError, os.setuid, 'not an int')
2105 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002106
Serhiy Storchaka43767632013-11-03 21:31:38 +02002107 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2108 def test_setgid(self):
2109 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2110 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002111 self.assertRaises(TypeError, os.setgid, 'not an int')
2112 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002113
Serhiy Storchaka43767632013-11-03 21:31:38 +02002114 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2115 def test_seteuid(self):
2116 if os.getuid() != 0:
2117 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002118 self.assertRaises(TypeError, os.setegid, 'not an int')
2119 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002120
Serhiy Storchaka43767632013-11-03 21:31:38 +02002121 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2122 def test_setegid(self):
2123 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2124 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002125 self.assertRaises(TypeError, os.setegid, 'not an int')
2126 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002127
Serhiy Storchaka43767632013-11-03 21:31:38 +02002128 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2129 def test_setreuid(self):
2130 if os.getuid() != 0:
2131 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002132 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2133 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2134 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2135 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002136
Serhiy Storchaka43767632013-11-03 21:31:38 +02002137 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2138 def test_setreuid_neg1(self):
2139 # Needs to accept -1. We run this in a subprocess to avoid
2140 # altering the test runner's process state (issue8045).
2141 subprocess.check_call([
2142 sys.executable, '-c',
2143 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002144
Serhiy Storchaka43767632013-11-03 21:31:38 +02002145 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2146 def test_setregid(self):
2147 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2148 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002149 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2150 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2151 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2152 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002153
Serhiy Storchaka43767632013-11-03 21:31:38 +02002154 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2155 def test_setregid_neg1(self):
2156 # Needs to accept -1. We run this in a subprocess to avoid
2157 # altering the test runner's process state (issue8045).
2158 subprocess.check_call([
2159 sys.executable, '-c',
2160 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002161
Serhiy Storchaka43767632013-11-03 21:31:38 +02002162@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2163class Pep383Tests(unittest.TestCase):
2164 def setUp(self):
2165 if support.TESTFN_UNENCODABLE:
2166 self.dir = support.TESTFN_UNENCODABLE
2167 elif support.TESTFN_NONASCII:
2168 self.dir = support.TESTFN_NONASCII
2169 else:
2170 self.dir = support.TESTFN
2171 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002172
Serhiy Storchaka43767632013-11-03 21:31:38 +02002173 bytesfn = []
2174 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002175 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002176 fn = os.fsencode(fn)
2177 except UnicodeEncodeError:
2178 return
2179 bytesfn.append(fn)
2180 add_filename(support.TESTFN_UNICODE)
2181 if support.TESTFN_UNENCODABLE:
2182 add_filename(support.TESTFN_UNENCODABLE)
2183 if support.TESTFN_NONASCII:
2184 add_filename(support.TESTFN_NONASCII)
2185 if not bytesfn:
2186 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002187
Serhiy Storchaka43767632013-11-03 21:31:38 +02002188 self.unicodefn = set()
2189 os.mkdir(self.dir)
2190 try:
2191 for fn in bytesfn:
2192 support.create_empty_file(os.path.join(self.bdir, fn))
2193 fn = os.fsdecode(fn)
2194 if fn in self.unicodefn:
2195 raise ValueError("duplicate filename")
2196 self.unicodefn.add(fn)
2197 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002198 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002199 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002200
Serhiy Storchaka43767632013-11-03 21:31:38 +02002201 def tearDown(self):
2202 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002203
Serhiy Storchaka43767632013-11-03 21:31:38 +02002204 def test_listdir(self):
2205 expected = self.unicodefn
2206 found = set(os.listdir(self.dir))
2207 self.assertEqual(found, expected)
2208 # test listdir without arguments
2209 current_directory = os.getcwd()
2210 try:
2211 os.chdir(os.sep)
2212 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2213 finally:
2214 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002215
Serhiy Storchaka43767632013-11-03 21:31:38 +02002216 def test_open(self):
2217 for fn in self.unicodefn:
2218 f = open(os.path.join(self.dir, fn), 'rb')
2219 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002220
Serhiy Storchaka43767632013-11-03 21:31:38 +02002221 @unittest.skipUnless(hasattr(os, 'statvfs'),
2222 "need os.statvfs()")
2223 def test_statvfs(self):
2224 # issue #9645
2225 for fn in self.unicodefn:
2226 # should not fail with file not found error
2227 fullname = os.path.join(self.dir, fn)
2228 os.statvfs(fullname)
2229
2230 def test_stat(self):
2231 for fn in self.unicodefn:
2232 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002233
Brian Curtineb24d742010-04-12 17:16:38 +00002234@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2235class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002236 def _kill(self, sig):
2237 # Start sys.executable as a subprocess and communicate from the
2238 # subprocess to the parent that the interpreter is ready. When it
2239 # becomes ready, send *sig* via os.kill to the subprocess and check
2240 # that the return code is equal to *sig*.
2241 import ctypes
2242 from ctypes import wintypes
2243 import msvcrt
2244
2245 # Since we can't access the contents of the process' stdout until the
2246 # process has exited, use PeekNamedPipe to see what's inside stdout
2247 # without waiting. This is done so we can tell that the interpreter
2248 # is started and running at a point where it could handle a signal.
2249 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2250 PeekNamedPipe.restype = wintypes.BOOL
2251 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2252 ctypes.POINTER(ctypes.c_char), # stdout buf
2253 wintypes.DWORD, # Buffer size
2254 ctypes.POINTER(wintypes.DWORD), # bytes read
2255 ctypes.POINTER(wintypes.DWORD), # bytes avail
2256 ctypes.POINTER(wintypes.DWORD)) # bytes left
2257 msg = "running"
2258 proc = subprocess.Popen([sys.executable, "-c",
2259 "import sys;"
2260 "sys.stdout.write('{}');"
2261 "sys.stdout.flush();"
2262 "input()".format(msg)],
2263 stdout=subprocess.PIPE,
2264 stderr=subprocess.PIPE,
2265 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002266 self.addCleanup(proc.stdout.close)
2267 self.addCleanup(proc.stderr.close)
2268 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002269
2270 count, max = 0, 100
2271 while count < max and proc.poll() is None:
2272 # Create a string buffer to store the result of stdout from the pipe
2273 buf = ctypes.create_string_buffer(len(msg))
2274 # Obtain the text currently in proc.stdout
2275 # Bytes read/avail/left are left as NULL and unused
2276 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2277 buf, ctypes.sizeof(buf), None, None, None)
2278 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2279 if buf.value:
2280 self.assertEqual(msg, buf.value.decode())
2281 break
2282 time.sleep(0.1)
2283 count += 1
2284 else:
2285 self.fail("Did not receive communication from the subprocess")
2286
Brian Curtineb24d742010-04-12 17:16:38 +00002287 os.kill(proc.pid, sig)
2288 self.assertEqual(proc.wait(), sig)
2289
2290 def test_kill_sigterm(self):
2291 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002292 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002293
2294 def test_kill_int(self):
2295 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002296 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002297
2298 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002299 tagname = "test_os_%s" % uuid.uuid1()
2300 m = mmap.mmap(-1, 1, tagname)
2301 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002302 # Run a script which has console control handling enabled.
2303 proc = subprocess.Popen([sys.executable,
2304 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002305 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002306 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2307 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002308 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002309 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002310 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002311 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002312 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002313 count += 1
2314 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002315 # Forcefully kill the process if we weren't able to signal it.
2316 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002317 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002318 os.kill(proc.pid, event)
2319 # proc.send_signal(event) could also be done here.
2320 # Allow time for the signal to be passed and the process to exit.
2321 time.sleep(0.5)
2322 if not proc.poll():
2323 # Forcefully kill the process if we weren't able to signal it.
2324 os.kill(proc.pid, signal.SIGINT)
2325 self.fail("subprocess did not stop on {}".format(name))
2326
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002327 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002328 def test_CTRL_C_EVENT(self):
2329 from ctypes import wintypes
2330 import ctypes
2331
2332 # Make a NULL value by creating a pointer with no argument.
2333 NULL = ctypes.POINTER(ctypes.c_int)()
2334 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2335 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2336 wintypes.BOOL)
2337 SetConsoleCtrlHandler.restype = wintypes.BOOL
2338
2339 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002340 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002341 # by subprocesses.
2342 SetConsoleCtrlHandler(NULL, 0)
2343
2344 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2345
2346 def test_CTRL_BREAK_EVENT(self):
2347 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2348
2349
Brian Curtind40e6f72010-07-08 21:39:08 +00002350@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002351class Win32ListdirTests(unittest.TestCase):
2352 """Test listdir on Windows."""
2353
2354 def setUp(self):
2355 self.created_paths = []
2356 for i in range(2):
2357 dir_name = 'SUB%d' % i
2358 dir_path = os.path.join(support.TESTFN, dir_name)
2359 file_name = 'FILE%d' % i
2360 file_path = os.path.join(support.TESTFN, file_name)
2361 os.makedirs(dir_path)
2362 with open(file_path, 'w') as f:
2363 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2364 self.created_paths.extend([dir_name, file_name])
2365 self.created_paths.sort()
2366
2367 def tearDown(self):
2368 shutil.rmtree(support.TESTFN)
2369
2370 def test_listdir_no_extended_path(self):
2371 """Test when the path is not an "extended" path."""
2372 # unicode
2373 self.assertEqual(
2374 sorted(os.listdir(support.TESTFN)),
2375 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002376
Tim Golden781bbeb2013-10-25 20:24:06 +01002377 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002378 self.assertEqual(
2379 sorted(os.listdir(os.fsencode(support.TESTFN))),
2380 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002381
2382 def test_listdir_extended_path(self):
2383 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002384 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002385 # unicode
2386 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2387 self.assertEqual(
2388 sorted(os.listdir(path)),
2389 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002390
Tim Golden781bbeb2013-10-25 20:24:06 +01002391 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002392 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2393 self.assertEqual(
2394 sorted(os.listdir(path)),
2395 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002396
2397
Berker Peksage0b5b202018-08-15 13:03:41 +03002398@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2399class ReadlinkTests(unittest.TestCase):
2400 filelink = 'readlinktest'
2401 filelink_target = os.path.abspath(__file__)
2402 filelinkb = os.fsencode(filelink)
2403 filelinkb_target = os.fsencode(filelink_target)
2404
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002405 def assertPathEqual(self, left, right):
2406 left = os.path.normcase(left)
2407 right = os.path.normcase(right)
2408 if sys.platform == 'win32':
2409 # Bad practice to blindly strip the prefix as it may be required to
2410 # correctly refer to the file, but we're only comparing paths here.
2411 has_prefix = lambda p: p.startswith(
2412 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2413 if has_prefix(left):
2414 left = left[4:]
2415 if has_prefix(right):
2416 right = right[4:]
2417 self.assertEqual(left, right)
2418
Berker Peksage0b5b202018-08-15 13:03:41 +03002419 def setUp(self):
2420 self.assertTrue(os.path.exists(self.filelink_target))
2421 self.assertTrue(os.path.exists(self.filelinkb_target))
2422 self.assertFalse(os.path.exists(self.filelink))
2423 self.assertFalse(os.path.exists(self.filelinkb))
2424
2425 def test_not_symlink(self):
2426 filelink_target = FakePath(self.filelink_target)
2427 self.assertRaises(OSError, os.readlink, self.filelink_target)
2428 self.assertRaises(OSError, os.readlink, filelink_target)
2429
2430 def test_missing_link(self):
2431 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2432 self.assertRaises(FileNotFoundError, os.readlink,
2433 FakePath('missing-link'))
2434
2435 @support.skip_unless_symlink
2436 def test_pathlike(self):
2437 os.symlink(self.filelink_target, self.filelink)
2438 self.addCleanup(support.unlink, self.filelink)
2439 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002440 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002441
2442 @support.skip_unless_symlink
2443 def test_pathlike_bytes(self):
2444 os.symlink(self.filelinkb_target, self.filelinkb)
2445 self.addCleanup(support.unlink, self.filelinkb)
2446 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002447 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002448 self.assertIsInstance(path, bytes)
2449
2450 @support.skip_unless_symlink
2451 def test_bytes(self):
2452 os.symlink(self.filelinkb_target, self.filelinkb)
2453 self.addCleanup(support.unlink, self.filelinkb)
2454 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002455 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002456 self.assertIsInstance(path, bytes)
2457
2458
Tim Golden781bbeb2013-10-25 20:24:06 +01002459@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002460@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002461class Win32SymlinkTests(unittest.TestCase):
2462 filelink = 'filelinktest'
2463 filelink_target = os.path.abspath(__file__)
2464 dirlink = 'dirlinktest'
2465 dirlink_target = os.path.dirname(filelink_target)
2466 missing_link = 'missing link'
2467
2468 def setUp(self):
2469 assert os.path.exists(self.dirlink_target)
2470 assert os.path.exists(self.filelink_target)
2471 assert not os.path.exists(self.dirlink)
2472 assert not os.path.exists(self.filelink)
2473 assert not os.path.exists(self.missing_link)
2474
2475 def tearDown(self):
2476 if os.path.exists(self.filelink):
2477 os.remove(self.filelink)
2478 if os.path.exists(self.dirlink):
2479 os.rmdir(self.dirlink)
2480 if os.path.lexists(self.missing_link):
2481 os.remove(self.missing_link)
2482
2483 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002484 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002485 self.assertTrue(os.path.exists(self.dirlink))
2486 self.assertTrue(os.path.isdir(self.dirlink))
2487 self.assertTrue(os.path.islink(self.dirlink))
2488 self.check_stat(self.dirlink, self.dirlink_target)
2489
2490 def test_file_link(self):
2491 os.symlink(self.filelink_target, self.filelink)
2492 self.assertTrue(os.path.exists(self.filelink))
2493 self.assertTrue(os.path.isfile(self.filelink))
2494 self.assertTrue(os.path.islink(self.filelink))
2495 self.check_stat(self.filelink, self.filelink_target)
2496
2497 def _create_missing_dir_link(self):
2498 'Create a "directory" link to a non-existent target'
2499 linkname = self.missing_link
2500 if os.path.lexists(linkname):
2501 os.remove(linkname)
2502 target = r'c:\\target does not exist.29r3c740'
2503 assert not os.path.exists(target)
2504 target_is_dir = True
2505 os.symlink(target, linkname, target_is_dir)
2506
2507 def test_remove_directory_link_to_missing_target(self):
2508 self._create_missing_dir_link()
2509 # For compatibility with Unix, os.remove will check the
2510 # directory status and call RemoveDirectory if the symlink
2511 # was created with target_is_dir==True.
2512 os.remove(self.missing_link)
2513
Brian Curtind40e6f72010-07-08 21:39:08 +00002514 def test_isdir_on_directory_link_to_missing_target(self):
2515 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002516 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002517
Brian Curtind40e6f72010-07-08 21:39:08 +00002518 def test_rmdir_on_directory_link_to_missing_target(self):
2519 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002520 os.rmdir(self.missing_link)
2521
2522 def check_stat(self, link, target):
2523 self.assertEqual(os.stat(link), os.stat(target))
2524 self.assertNotEqual(os.lstat(link), os.stat(link))
2525
Brian Curtind25aef52011-06-13 15:16:04 -05002526 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002527 self.assertEqual(os.stat(bytes_link), os.stat(target))
2528 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002529
2530 def test_12084(self):
2531 level1 = os.path.abspath(support.TESTFN)
2532 level2 = os.path.join(level1, "level2")
2533 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002534 self.addCleanup(support.rmtree, level1)
2535
2536 os.mkdir(level1)
2537 os.mkdir(level2)
2538 os.mkdir(level3)
2539
2540 file1 = os.path.abspath(os.path.join(level1, "file1"))
2541 create_file(file1)
2542
2543 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002544 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002545 os.chdir(level2)
2546 link = os.path.join(level2, "link")
2547 os.symlink(os.path.relpath(file1), "link")
2548 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002549
Victor Stinnerae39d232016-03-24 17:12:55 +01002550 # Check os.stat calls from the same dir as the link
2551 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002552
Victor Stinnerae39d232016-03-24 17:12:55 +01002553 # Check os.stat calls from a dir below the link
2554 os.chdir(level1)
2555 self.assertEqual(os.stat(file1),
2556 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002557
Victor Stinnerae39d232016-03-24 17:12:55 +01002558 # Check os.stat calls from a dir above the link
2559 os.chdir(level3)
2560 self.assertEqual(os.stat(file1),
2561 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002562 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002563 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002564
SSE43c34aad2018-02-13 00:10:35 +07002565 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2566 and os.path.exists(r'C:\ProgramData'),
2567 'Test directories not found')
2568 def test_29248(self):
2569 # os.symlink() calls CreateSymbolicLink, which creates
2570 # the reparse data buffer with the print name stored
2571 # first, so the offset is always 0. CreateSymbolicLink
2572 # stores the "PrintName" DOS path (e.g. "C:\") first,
2573 # with an offset of 0, followed by the "SubstituteName"
2574 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2575 # the other hand, seems to have been created manually
2576 # with an inverted order.
2577 target = os.readlink(r'C:\Users\All Users')
2578 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2579
Steve Dower6921e732018-03-05 14:26:08 -08002580 def test_buffer_overflow(self):
2581 # Older versions would have a buffer overflow when detecting
2582 # whether a link source was a directory. This test ensures we
2583 # no longer crash, but does not otherwise validate the behavior
2584 segment = 'X' * 27
2585 path = os.path.join(*[segment] * 10)
2586 test_cases = [
2587 # overflow with absolute src
2588 ('\\' + path, segment),
2589 # overflow dest with relative src
2590 (segment, path),
2591 # overflow when joining src
2592 (path[:180], path[:180]),
2593 ]
2594 for src, dest in test_cases:
2595 try:
2596 os.symlink(src, dest)
2597 except FileNotFoundError:
2598 pass
2599 else:
2600 try:
2601 os.remove(dest)
2602 except OSError:
2603 pass
2604 # Also test with bytes, since that is a separate code path.
2605 try:
2606 os.symlink(os.fsencode(src), os.fsencode(dest))
2607 except FileNotFoundError:
2608 pass
2609 else:
2610 try:
2611 os.remove(dest)
2612 except OSError:
2613 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002614
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002615 def test_appexeclink(self):
2616 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002617 if not os.path.isdir(root):
2618 self.skipTest("test requires a WindowsApps directory")
2619
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002620 aliases = [os.path.join(root, a)
2621 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2622
2623 for alias in aliases:
2624 if support.verbose:
2625 print()
2626 print("Testing with", alias)
2627 st = os.lstat(alias)
2628 self.assertEqual(st, os.stat(alias))
2629 self.assertFalse(stat.S_ISLNK(st.st_mode))
2630 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2631 # testing the first one we see is sufficient
2632 break
2633 else:
2634 self.skipTest("test requires an app execution alias")
2635
Tim Golden0321cf22014-05-05 19:46:17 +01002636@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2637class Win32JunctionTests(unittest.TestCase):
2638 junction = 'junctiontest'
2639 junction_target = os.path.dirname(os.path.abspath(__file__))
2640
2641 def setUp(self):
2642 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002643 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002644
2645 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002646 if os.path.lexists(self.junction):
2647 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002648
2649 def test_create_junction(self):
2650 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002651 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002652 self.assertTrue(os.path.exists(self.junction))
2653 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002654 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2655 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002656
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002657 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002658 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002659 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2660 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002661
2662 def test_unlink_removes_junction(self):
2663 _winapi.CreateJunction(self.junction_target, self.junction)
2664 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002665 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002666
2667 os.unlink(self.junction)
2668 self.assertFalse(os.path.exists(self.junction))
2669
Mark Becwarb82bfac2019-02-02 16:08:23 -05002670@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2671class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002672 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002673 nt = support.import_module('nt')
2674 ctypes = support.import_module('ctypes')
2675 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002676
2677 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2678 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2679
2680 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2681 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2682 ctypes.wintypes.LPDWORD)
2683
2684 # This is a pseudo-handle that doesn't need to be closed
2685 hproc = kernel.GetCurrentProcess()
2686
2687 handle_count = ctypes.wintypes.DWORD()
2688 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2689 self.assertEqual(1, ok)
2690
2691 before_count = handle_count.value
2692
2693 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002694 filenames = [
2695 r'\\?\C:',
2696 r'\\?\NUL',
2697 r'\\?\CONIN',
2698 __file__,
2699 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002700
Berker Peksag6ef726a2019-04-22 18:46:28 +03002701 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002702 for name in filenames:
2703 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002704 nt._getfinalpathname(name)
2705 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002706 # Failure is expected
2707 pass
2708 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002709 os.stat(name)
2710 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002711 pass
2712
2713 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2714 self.assertEqual(1, ok)
2715
2716 handle_delta = handle_count.value - before_count
2717
2718 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002719
Jason R. Coombs3a092862013-05-27 23:21:28 -04002720@support.skip_unless_symlink
2721class NonLocalSymlinkTests(unittest.TestCase):
2722
2723 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002724 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002725 Create this structure:
2726
2727 base
2728 \___ some_dir
2729 """
2730 os.makedirs('base/some_dir')
2731
2732 def tearDown(self):
2733 shutil.rmtree('base')
2734
2735 def test_directory_link_nonlocal(self):
2736 """
2737 The symlink target should resolve relative to the link, not relative
2738 to the current directory.
2739
2740 Then, link base/some_link -> base/some_dir and ensure that some_link
2741 is resolved as a directory.
2742
2743 In issue13772, it was discovered that directory detection failed if
2744 the symlink target was not specified relative to the current
2745 directory, which was a defect in the implementation.
2746 """
2747 src = os.path.join('base', 'some_link')
2748 os.symlink('some_dir', src)
2749 assert os.path.isdir(src)
2750
2751
Victor Stinnere8d51452010-08-19 01:05:19 +00002752class FSEncodingTests(unittest.TestCase):
2753 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002754 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2755 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002756
Victor Stinnere8d51452010-08-19 01:05:19 +00002757 def test_identity(self):
2758 # assert fsdecode(fsencode(x)) == x
2759 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2760 try:
2761 bytesfn = os.fsencode(fn)
2762 except UnicodeEncodeError:
2763 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002764 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002765
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002766
Brett Cannonefb00c02012-02-29 18:31:31 -05002767
2768class DeviceEncodingTests(unittest.TestCase):
2769
2770 def test_bad_fd(self):
2771 # Return None when an fd doesn't actually exist.
2772 self.assertIsNone(os.device_encoding(123456))
2773
Paul Monson62dfd7d2019-04-25 11:36:45 -07002774 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002775 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002776 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002777 def test_device_encoding(self):
2778 encoding = os.device_encoding(0)
2779 self.assertIsNotNone(encoding)
2780 self.assertTrue(codecs.lookup(encoding))
2781
2782
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002783class PidTests(unittest.TestCase):
2784 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2785 def test_getppid(self):
2786 p = subprocess.Popen([sys.executable, '-c',
2787 'import os; print(os.getppid())'],
2788 stdout=subprocess.PIPE)
2789 stdout, _ = p.communicate()
2790 # We are the parent of our subprocess
2791 self.assertEqual(int(stdout), os.getpid())
2792
Victor Stinner9bee32b2020-04-22 16:30:35 +02002793 def check_waitpid(self, code, exitcode, callback=None):
2794 if sys.platform == 'win32':
2795 # On Windows, os.spawnv() simply joins arguments with spaces:
2796 # arguments need to be quoted
2797 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2798 else:
2799 args = [sys.executable, '-c', code]
2800 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002801
Victor Stinner9bee32b2020-04-22 16:30:35 +02002802 if callback is not None:
2803 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002804
Victor Stinner9bee32b2020-04-22 16:30:35 +02002805 # don't use support.wait_process() to test directly os.waitpid()
2806 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002807 pid2, status = os.waitpid(pid, 0)
2808 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2809 self.assertEqual(pid2, pid)
2810
Victor Stinner9bee32b2020-04-22 16:30:35 +02002811 def test_waitpid(self):
2812 self.check_waitpid(code='pass', exitcode=0)
2813
2814 def test_waitstatus_to_exitcode(self):
2815 exitcode = 23
2816 code = f'import sys; sys.exit({exitcode})'
2817 self.check_waitpid(code, exitcode=exitcode)
2818
2819 with self.assertRaises(TypeError):
2820 os.waitstatus_to_exitcode(0.0)
2821
2822 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2823 def test_waitpid_windows(self):
2824 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2825 # with exit code larger than INT_MAX.
2826 STATUS_CONTROL_C_EXIT = 0xC000013A
2827 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2828 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2829
2830 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2831 def test_waitstatus_to_exitcode_windows(self):
2832 max_exitcode = 2 ** 32 - 1
2833 for exitcode in (0, 1, 5, max_exitcode):
2834 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2835 exitcode)
2836
2837 # invalid values
2838 with self.assertRaises(ValueError):
2839 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2840 with self.assertRaises(OverflowError):
2841 os.waitstatus_to_exitcode(-1)
2842
Victor Stinner65a796e2020-04-01 18:49:29 +02002843 # Skip the test on Windows
2844 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2845 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002846 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002847 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002848
Victor Stinner9bee32b2020-04-22 16:30:35 +02002849 def kill_process(pid):
2850 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002851
Victor Stinner9bee32b2020-04-22 16:30:35 +02002852 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002853
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002854
Victor Stinner4659ccf2016-09-14 10:57:00 +02002855class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002856 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002857 self.exitcode = 17
2858
2859 filename = support.TESTFN
2860 self.addCleanup(support.unlink, filename)
2861
2862 if not with_env:
2863 code = 'import sys; sys.exit(%s)' % self.exitcode
2864 else:
2865 self.env = dict(os.environ)
2866 # create an unique key
2867 self.key = str(uuid.uuid4())
2868 self.env[self.key] = self.key
2869 # read the variable from os.environ to check that it exists
2870 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2871 % (self.key, self.exitcode))
2872
2873 with open(filename, "w") as fp:
2874 fp.write(code)
2875
Berker Peksag81816462016-09-15 20:19:47 +03002876 args = [sys.executable, filename]
2877 if use_bytes:
2878 args = [os.fsencode(a) for a in args]
2879 self.env = {os.fsencode(k): os.fsencode(v)
2880 for k, v in self.env.items()}
2881
2882 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002883
Berker Peksag4af23d72016-09-15 20:32:44 +03002884 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002885 def test_spawnl(self):
2886 args = self.create_args()
2887 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2888 self.assertEqual(exitcode, self.exitcode)
2889
Berker Peksag4af23d72016-09-15 20:32:44 +03002890 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002891 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002892 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002893 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2894 self.assertEqual(exitcode, self.exitcode)
2895
Berker Peksag4af23d72016-09-15 20:32:44 +03002896 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002897 def test_spawnlp(self):
2898 args = self.create_args()
2899 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2900 self.assertEqual(exitcode, self.exitcode)
2901
Berker Peksag4af23d72016-09-15 20:32:44 +03002902 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002903 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002904 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002905 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2906 self.assertEqual(exitcode, self.exitcode)
2907
Berker Peksag4af23d72016-09-15 20:32:44 +03002908 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002909 def test_spawnv(self):
2910 args = self.create_args()
2911 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2912 self.assertEqual(exitcode, self.exitcode)
2913
Victor Stinner9bee32b2020-04-22 16:30:35 +02002914 # Test for PyUnicode_FSConverter()
2915 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
2916 self.assertEqual(exitcode, self.exitcode)
2917
Berker Peksag4af23d72016-09-15 20:32:44 +03002918 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002919 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002920 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002921 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2922 self.assertEqual(exitcode, self.exitcode)
2923
Berker Peksag4af23d72016-09-15 20:32:44 +03002924 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002925 def test_spawnvp(self):
2926 args = self.create_args()
2927 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2928 self.assertEqual(exitcode, self.exitcode)
2929
Berker Peksag4af23d72016-09-15 20:32:44 +03002930 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002931 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002932 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002933 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2934 self.assertEqual(exitcode, self.exitcode)
2935
Berker Peksag4af23d72016-09-15 20:32:44 +03002936 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002937 def test_nowait(self):
2938 args = self.create_args()
2939 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02002940 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002941
Berker Peksag4af23d72016-09-15 20:32:44 +03002942 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002943 def test_spawnve_bytes(self):
2944 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2945 args = self.create_args(with_env=True, use_bytes=True)
2946 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2947 self.assertEqual(exitcode, self.exitcode)
2948
Steve Dower859fd7b2016-11-19 18:53:19 -08002949 @requires_os_func('spawnl')
2950 def test_spawnl_noargs(self):
2951 args = self.create_args()
2952 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002953 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002954
2955 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002956 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002957 args = self.create_args()
2958 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002959 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002960
2961 @requires_os_func('spawnv')
2962 def test_spawnv_noargs(self):
2963 args = self.create_args()
2964 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2965 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002966 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2967 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002968
2969 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002970 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002971 args = self.create_args()
2972 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2973 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002974 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2975 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002976
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002977 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002978 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002979
Ville Skyttä49b27342017-08-03 09:00:59 +03002980 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002981 newenv = os.environ.copy()
2982 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2983 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002984 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002985 except ValueError:
2986 pass
2987 else:
2988 self.assertEqual(exitcode, 127)
2989
Ville Skyttä49b27342017-08-03 09:00:59 +03002990 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002991 newenv = os.environ.copy()
2992 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2993 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002994 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002995 except ValueError:
2996 pass
2997 else:
2998 self.assertEqual(exitcode, 127)
2999
Ville Skyttä49b27342017-08-03 09:00:59 +03003000 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003001 newenv = os.environ.copy()
3002 newenv["FRUIT=ORANGE"] = "lemon"
3003 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003004 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003005 except ValueError:
3006 pass
3007 else:
3008 self.assertEqual(exitcode, 127)
3009
Ville Skyttä49b27342017-08-03 09:00:59 +03003010 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003011 filename = support.TESTFN
3012 self.addCleanup(support.unlink, filename)
3013 with open(filename, "w") as fp:
3014 fp.write('import sys, os\n'
3015 'if os.getenv("FRUIT") != "orange=lemon":\n'
3016 ' raise AssertionError')
3017 args = [sys.executable, filename]
3018 newenv = os.environ.copy()
3019 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003020 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003021 self.assertEqual(exitcode, 0)
3022
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003023 @requires_os_func('spawnve')
3024 def test_spawnve_invalid_env(self):
3025 self._test_invalid_env(os.spawnve)
3026
3027 @requires_os_func('spawnvpe')
3028 def test_spawnvpe_invalid_env(self):
3029 self._test_invalid_env(os.spawnvpe)
3030
Serhiy Storchaka77703942017-06-25 07:33:01 +03003031
Brian Curtin0151b8e2010-09-24 13:43:43 +00003032# The introduction of this TestCase caused at least two different errors on
3033# *nix buildbots. Temporarily skip this to let the buildbots move along.
3034@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003035@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3036class LoginTests(unittest.TestCase):
3037 def test_getlogin(self):
3038 user_name = os.getlogin()
3039 self.assertNotEqual(len(user_name), 0)
3040
3041
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003042@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3043 "needs os.getpriority and os.setpriority")
3044class ProgramPriorityTests(unittest.TestCase):
3045 """Tests for os.getpriority() and os.setpriority()."""
3046
3047 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003048
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003049 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3050 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3051 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003052 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3053 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003054 raise unittest.SkipTest("unable to reliably test setpriority "
3055 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003056 else:
3057 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003058 finally:
3059 try:
3060 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3061 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003062 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003063 raise
3064
3065
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003066class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003069
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 def __init__(self, conn):
3071 asynchat.async_chat.__init__(self, conn)
3072 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003073 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074 self.closed = False
3075 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003076
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003077 def handle_read(self):
3078 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003079 if self.accumulate:
3080 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003081
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003082 def get_data(self):
3083 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003084
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003086 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003087 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003088
3089 def handle_error(self):
3090 raise
3091
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003092 def __init__(self, address):
3093 threading.Thread.__init__(self)
3094 asyncore.dispatcher.__init__(self)
3095 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3096 self.bind(address)
3097 self.listen(5)
3098 self.host, self.port = self.socket.getsockname()[:2]
3099 self.handler_instance = None
3100 self._active = False
3101 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003102
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003103 # --- public API
3104
3105 @property
3106 def running(self):
3107 return self._active
3108
3109 def start(self):
3110 assert not self.running
3111 self.__flag = threading.Event()
3112 threading.Thread.start(self)
3113 self.__flag.wait()
3114
3115 def stop(self):
3116 assert self.running
3117 self._active = False
3118 self.join()
3119
3120 def wait(self):
3121 # wait for handler connection to be closed, then stop the server
3122 while not getattr(self.handler_instance, "closed", False):
3123 time.sleep(0.001)
3124 self.stop()
3125
3126 # --- internals
3127
3128 def run(self):
3129 self._active = True
3130 self.__flag.set()
3131 while self._active and asyncore.socket_map:
3132 self._active_lock.acquire()
3133 asyncore.loop(timeout=0.001, count=1)
3134 self._active_lock.release()
3135 asyncore.close_all()
3136
3137 def handle_accept(self):
3138 conn, addr = self.accept()
3139 self.handler_instance = self.Handler(conn)
3140
3141 def handle_connect(self):
3142 self.close()
3143 handle_read = handle_connect
3144
3145 def writable(self):
3146 return 0
3147
3148 def handle_error(self):
3149 raise
3150
3151
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003152@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3153class TestSendfile(unittest.TestCase):
3154
Victor Stinner8c663fd2017-11-08 14:44:44 -08003155 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003156 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003157 not sys.platform.startswith("solaris") and \
3158 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003159 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3160 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003161 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3162 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003163
3164 @classmethod
3165 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003166 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01003167 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003168
3169 @classmethod
3170 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003171 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003172 support.unlink(support.TESTFN)
3173
3174 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003175 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003176 self.server.start()
3177 self.client = socket.socket()
3178 self.client.connect((self.server.host, self.server.port))
3179 self.client.settimeout(1)
3180 # synchronize by waiting for "220 ready" response
3181 self.client.recv(1024)
3182 self.sockno = self.client.fileno()
3183 self.file = open(support.TESTFN, 'rb')
3184 self.fileno = self.file.fileno()
3185
3186 def tearDown(self):
3187 self.file.close()
3188 self.client.close()
3189 if self.server.running:
3190 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003191 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003192
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003193 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003194 """A higher level wrapper representing how an application is
3195 supposed to use sendfile().
3196 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003197 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003198 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003199 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003200 except OSError as err:
3201 if err.errno == errno.ECONNRESET:
3202 # disconnected
3203 raise
3204 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3205 # we have to retry send data
3206 continue
3207 else:
3208 raise
3209
3210 def test_send_whole_file(self):
3211 # normal send
3212 total_sent = 0
3213 offset = 0
3214 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003215 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003216 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3217 if sent == 0:
3218 break
3219 offset += sent
3220 total_sent += sent
3221 self.assertTrue(sent <= nbytes)
3222 self.assertEqual(offset, total_sent)
3223
3224 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003225 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003226 self.client.close()
3227 self.server.wait()
3228 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003229 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003230 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003231
3232 def test_send_at_certain_offset(self):
3233 # start sending a file at a certain offset
3234 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003235 offset = len(self.DATA) // 2
3236 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003237 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003238 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003239 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3240 if sent == 0:
3241 break
3242 offset += sent
3243 total_sent += sent
3244 self.assertTrue(sent <= nbytes)
3245
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003246 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003247 self.client.close()
3248 self.server.wait()
3249 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003250 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003251 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003252 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003253 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003254
3255 def test_offset_overflow(self):
3256 # specify an offset > file size
3257 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003258 try:
3259 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3260 except OSError as e:
3261 # Solaris can raise EINVAL if offset >= file length, ignore.
3262 if e.errno != errno.EINVAL:
3263 raise
3264 else:
3265 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003266 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003267 self.client.close()
3268 self.server.wait()
3269 data = self.server.handler_instance.get_data()
3270 self.assertEqual(data, b'')
3271
3272 def test_invalid_offset(self):
3273 with self.assertRaises(OSError) as cm:
3274 os.sendfile(self.sockno, self.fileno, -1, 4096)
3275 self.assertEqual(cm.exception.errno, errno.EINVAL)
3276
Martin Panterbf19d162015-09-09 01:01:13 +00003277 def test_keywords(self):
3278 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003279 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3280 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003281 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003282 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3283 offset=0, count=4096,
3284 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003285
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003286 # --- headers / trailers tests
3287
Serhiy Storchaka43767632013-11-03 21:31:38 +02003288 @requires_headers_trailers
3289 def test_headers(self):
3290 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003291 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003292 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003293 headers=[b"x" * 512, b"y" * 256])
3294 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003295 total_sent += sent
3296 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003297 while total_sent < len(expected_data):
3298 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003299 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3300 offset, nbytes)
3301 if sent == 0:
3302 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003303 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003304 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003305 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003306
Serhiy Storchaka43767632013-11-03 21:31:38 +02003307 self.assertEqual(total_sent, len(expected_data))
3308 self.client.close()
3309 self.server.wait()
3310 data = self.server.handler_instance.get_data()
3311 self.assertEqual(hash(data), hash(expected_data))
3312
3313 @requires_headers_trailers
3314 def test_trailers(self):
3315 TESTFN2 = support.TESTFN + "2"
3316 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003317
3318 self.addCleanup(support.unlink, TESTFN2)
3319 create_file(TESTFN2, file_data)
3320
3321 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003322 os.sendfile(self.sockno, f.fileno(), 0, 5,
3323 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003324 self.client.close()
3325 self.server.wait()
3326 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003327 self.assertEqual(data, b"abcde123456789")
3328
3329 @requires_headers_trailers
3330 @requires_32b
3331 def test_headers_overflow_32bits(self):
3332 self.server.handler_instance.accumulate = False
3333 with self.assertRaises(OSError) as cm:
3334 os.sendfile(self.sockno, self.fileno, 0, 0,
3335 headers=[b"x" * 2**16] * 2**15)
3336 self.assertEqual(cm.exception.errno, errno.EINVAL)
3337
3338 @requires_headers_trailers
3339 @requires_32b
3340 def test_trailers_overflow_32bits(self):
3341 self.server.handler_instance.accumulate = False
3342 with self.assertRaises(OSError) as cm:
3343 os.sendfile(self.sockno, self.fileno, 0, 0,
3344 trailers=[b"x" * 2**16] * 2**15)
3345 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003346
Serhiy Storchaka43767632013-11-03 21:31:38 +02003347 @requires_headers_trailers
3348 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3349 'test needs os.SF_NODISKIO')
3350 def test_flags(self):
3351 try:
3352 os.sendfile(self.sockno, self.fileno, 0, 4096,
3353 flags=os.SF_NODISKIO)
3354 except OSError as err:
3355 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3356 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003357
3358
Larry Hastings9cf065c2012-06-22 16:30:09 -07003359def supports_extended_attributes():
3360 if not hasattr(os, "setxattr"):
3361 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003362
Larry Hastings9cf065c2012-06-22 16:30:09 -07003363 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003364 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003365 try:
3366 os.setxattr(fp.fileno(), b"user.test", b"")
3367 except OSError:
3368 return False
3369 finally:
3370 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003371
3372 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003373
3374
3375@unittest.skipUnless(supports_extended_attributes(),
3376 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003377# Kernels < 2.6.39 don't respect setxattr flags.
3378@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003379class ExtendedAttributeTests(unittest.TestCase):
3380
Larry Hastings9cf065c2012-06-22 16:30:09 -07003381 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003382 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003383 self.addCleanup(support.unlink, fn)
3384 create_file(fn)
3385
Benjamin Peterson799bd802011-08-31 22:15:17 -04003386 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003387 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003388 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003389
Victor Stinnerf12e5062011-10-16 22:12:03 +02003390 init_xattr = listxattr(fn)
3391 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003392
Larry Hastings9cf065c2012-06-22 16:30:09 -07003393 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003394 xattr = set(init_xattr)
3395 xattr.add("user.test")
3396 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003397 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3398 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3399 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003400
Benjamin Peterson799bd802011-08-31 22:15:17 -04003401 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003402 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003403 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003404
Benjamin Peterson799bd802011-08-31 22:15:17 -04003405 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003406 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003407 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003408
Larry Hastings9cf065c2012-06-22 16:30:09 -07003409 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003410 xattr.add("user.test2")
3411 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003412 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003413
Benjamin Peterson799bd802011-08-31 22:15:17 -04003414 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003415 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003416 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003417
Victor Stinnerf12e5062011-10-16 22:12:03 +02003418 xattr.remove("user.test")
3419 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003420 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3421 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3422 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3423 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003424 many = sorted("user.test{}".format(i) for i in range(100))
3425 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003426 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003427 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003428
Larry Hastings9cf065c2012-06-22 16:30:09 -07003429 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003430 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003431 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003432
3433 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3434 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003435
3436 def test_simple(self):
3437 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3438 os.listxattr)
3439
3440 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003441 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3442 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003443
3444 def test_fds(self):
3445 def getxattr(path, *args):
3446 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003447 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003448 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003449 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003450 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003451 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003452 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003453 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003454 def listxattr(path, *args):
3455 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003456 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003457 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3458
3459
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003460@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3461class TermsizeTests(unittest.TestCase):
3462 def test_does_not_crash(self):
3463 """Check if get_terminal_size() returns a meaningful value.
3464
3465 There's no easy portable way to actually check the size of the
3466 terminal, so let's check if it returns something sensible instead.
3467 """
3468 try:
3469 size = os.get_terminal_size()
3470 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003471 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003472 # Under win32 a generic OSError can be thrown if the
3473 # handle cannot be retrieved
3474 self.skipTest("failed to query terminal size")
3475 raise
3476
Antoine Pitroucfade362012-02-08 23:48:59 +01003477 self.assertGreaterEqual(size.columns, 0)
3478 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003479
3480 def test_stty_match(self):
3481 """Check if stty returns the same results
3482
3483 stty actually tests stdin, so get_terminal_size is invoked on
3484 stdin explicitly. If stty succeeded, then get_terminal_size()
3485 should work too.
3486 """
3487 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003488 size = (
3489 subprocess.check_output(
3490 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3491 ).split()
3492 )
xdegaye6a55d092017-11-12 17:57:04 +01003493 except (FileNotFoundError, subprocess.CalledProcessError,
3494 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003495 self.skipTest("stty invocation failed")
3496 expected = (int(size[1]), int(size[0])) # reversed order
3497
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003498 try:
3499 actual = os.get_terminal_size(sys.__stdin__.fileno())
3500 except OSError as e:
3501 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3502 # Under win32 a generic OSError can be thrown if the
3503 # handle cannot be retrieved
3504 self.skipTest("failed to query terminal size")
3505 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003506 self.assertEqual(expected, actual)
3507
3508
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003509@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003510@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003511class MemfdCreateTests(unittest.TestCase):
3512 def test_memfd_create(self):
3513 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3514 self.assertNotEqual(fd, -1)
3515 self.addCleanup(os.close, fd)
3516 self.assertFalse(os.get_inheritable(fd))
3517 with open(fd, "wb", closefd=False) as f:
3518 f.write(b'memfd_create')
3519 self.assertEqual(f.tell(), 12)
3520
3521 fd2 = os.memfd_create("Hi")
3522 self.addCleanup(os.close, fd2)
3523 self.assertFalse(os.get_inheritable(fd2))
3524
3525
Victor Stinner292c8352012-10-30 02:17:38 +01003526class OSErrorTests(unittest.TestCase):
3527 def setUp(self):
3528 class Str(str):
3529 pass
3530
Victor Stinnerafe17062012-10-31 22:47:43 +01003531 self.bytes_filenames = []
3532 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003533 if support.TESTFN_UNENCODABLE is not None:
3534 decoded = support.TESTFN_UNENCODABLE
3535 else:
3536 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003537 self.unicode_filenames.append(decoded)
3538 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003539 if support.TESTFN_UNDECODABLE is not None:
3540 encoded = support.TESTFN_UNDECODABLE
3541 else:
3542 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003543 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003544 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003545 self.bytes_filenames.append(memoryview(encoded))
3546
3547 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003548
3549 def test_oserror_filename(self):
3550 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003551 (self.filenames, os.chdir,),
3552 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003553 (self.filenames, os.lstat,),
3554 (self.filenames, os.open, os.O_RDONLY),
3555 (self.filenames, os.rmdir,),
3556 (self.filenames, os.stat,),
3557 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003558 ]
3559 if sys.platform == "win32":
3560 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003561 (self.bytes_filenames, os.rename, b"dst"),
3562 (self.bytes_filenames, os.replace, b"dst"),
3563 (self.unicode_filenames, os.rename, "dst"),
3564 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003565 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003566 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003567 else:
3568 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003569 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003570 (self.filenames, os.rename, "dst"),
3571 (self.filenames, os.replace, "dst"),
3572 ))
3573 if hasattr(os, "chown"):
3574 funcs.append((self.filenames, os.chown, 0, 0))
3575 if hasattr(os, "lchown"):
3576 funcs.append((self.filenames, os.lchown, 0, 0))
3577 if hasattr(os, "truncate"):
3578 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003579 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003580 funcs.append((self.filenames, os.chflags, 0))
3581 if hasattr(os, "lchflags"):
3582 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003583 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003584 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003585 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003586 if sys.platform == "win32":
3587 funcs.append((self.bytes_filenames, os.link, b"dst"))
3588 funcs.append((self.unicode_filenames, os.link, "dst"))
3589 else:
3590 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003591 if hasattr(os, "listxattr"):
3592 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003593 (self.filenames, os.listxattr,),
3594 (self.filenames, os.getxattr, "user.test"),
3595 (self.filenames, os.setxattr, "user.test", b'user'),
3596 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003597 ))
3598 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003599 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003600 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003601 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003602
Steve Dowercc16be82016-09-08 10:35:16 -07003603
Victor Stinnerafe17062012-10-31 22:47:43 +01003604 for filenames, func, *func_args in funcs:
3605 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003606 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003607 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003608 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003609 else:
3610 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3611 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003612 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003613 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003614 except UnicodeDecodeError:
3615 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003616 else:
3617 self.fail("No exception thrown by {}".format(func))
3618
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003619class CPUCountTests(unittest.TestCase):
3620 def test_cpu_count(self):
3621 cpus = os.cpu_count()
3622 if cpus is not None:
3623 self.assertIsInstance(cpus, int)
3624 self.assertGreater(cpus, 0)
3625 else:
3626 self.skipTest("Could not determine the number of CPUs")
3627
Victor Stinnerdaf45552013-08-28 00:53:59 +02003628
3629class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003630 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003631 fd = os.open(__file__, os.O_RDONLY)
3632 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003633 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003634
Victor Stinnerdaf45552013-08-28 00:53:59 +02003635 os.set_inheritable(fd, True)
3636 self.assertEqual(os.get_inheritable(fd), True)
3637
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003638 @unittest.skipIf(fcntl is None, "need fcntl")
3639 def test_get_inheritable_cloexec(self):
3640 fd = os.open(__file__, os.O_RDONLY)
3641 self.addCleanup(os.close, fd)
3642 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003643
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003644 # clear FD_CLOEXEC flag
3645 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3646 flags &= ~fcntl.FD_CLOEXEC
3647 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003648
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003649 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003650
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003651 @unittest.skipIf(fcntl is None, "need fcntl")
3652 def test_set_inheritable_cloexec(self):
3653 fd = os.open(__file__, os.O_RDONLY)
3654 self.addCleanup(os.close, fd)
3655 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3656 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003657
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003658 os.set_inheritable(fd, True)
3659 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3660 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003661
Victor Stinnerdaf45552013-08-28 00:53:59 +02003662 def test_open(self):
3663 fd = os.open(__file__, os.O_RDONLY)
3664 self.addCleanup(os.close, fd)
3665 self.assertEqual(os.get_inheritable(fd), False)
3666
3667 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3668 def test_pipe(self):
3669 rfd, wfd = os.pipe()
3670 self.addCleanup(os.close, rfd)
3671 self.addCleanup(os.close, wfd)
3672 self.assertEqual(os.get_inheritable(rfd), False)
3673 self.assertEqual(os.get_inheritable(wfd), False)
3674
3675 def test_dup(self):
3676 fd1 = os.open(__file__, os.O_RDONLY)
3677 self.addCleanup(os.close, fd1)
3678
3679 fd2 = os.dup(fd1)
3680 self.addCleanup(os.close, fd2)
3681 self.assertEqual(os.get_inheritable(fd2), False)
3682
Zackery Spytz5be66602019-08-23 12:38:41 -06003683 def test_dup_standard_stream(self):
3684 fd = os.dup(1)
3685 self.addCleanup(os.close, fd)
3686 self.assertGreater(fd, 0)
3687
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003688 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3689 def test_dup_nul(self):
3690 # os.dup() was creating inheritable fds for character files.
3691 fd1 = os.open('NUL', os.O_RDONLY)
3692 self.addCleanup(os.close, fd1)
3693 fd2 = os.dup(fd1)
3694 self.addCleanup(os.close, fd2)
3695 self.assertFalse(os.get_inheritable(fd2))
3696
Victor Stinnerdaf45552013-08-28 00:53:59 +02003697 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3698 def test_dup2(self):
3699 fd = os.open(__file__, os.O_RDONLY)
3700 self.addCleanup(os.close, fd)
3701
3702 # inheritable by default
3703 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003704 self.addCleanup(os.close, fd2)
3705 self.assertEqual(os.dup2(fd, fd2), fd2)
3706 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003707
3708 # force non-inheritable
3709 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003710 self.addCleanup(os.close, fd3)
3711 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3712 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003713
3714 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3715 def test_openpty(self):
3716 master_fd, slave_fd = os.openpty()
3717 self.addCleanup(os.close, master_fd)
3718 self.addCleanup(os.close, slave_fd)
3719 self.assertEqual(os.get_inheritable(master_fd), False)
3720 self.assertEqual(os.get_inheritable(slave_fd), False)
3721
3722
Brett Cannon3f9183b2016-08-26 14:44:48 -07003723class PathTConverterTests(unittest.TestCase):
3724 # tuples of (function name, allows fd arguments, additional arguments to
3725 # function, cleanup function)
3726 functions = [
3727 ('stat', True, (), None),
3728 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003729 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003730 ('chflags', False, (0,), None),
3731 ('lchflags', False, (0,), None),
3732 ('open', False, (0,), getattr(os, 'close', None)),
3733 ]
3734
3735 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003736 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003737 if os.name == 'nt':
3738 bytes_fspath = bytes_filename = None
3739 else:
3740 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003741 bytes_fspath = FakePath(bytes_filename)
3742 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003743 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003744 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003745
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003746 int_fspath = FakePath(fd)
3747 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003748
3749 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3750 with self.subTest(name=name):
3751 try:
3752 fn = getattr(os, name)
3753 except AttributeError:
3754 continue
3755
Brett Cannon8f96a302016-08-26 19:30:11 -07003756 for path in (str_filename, bytes_filename, str_fspath,
3757 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003758 if path is None:
3759 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003760 with self.subTest(name=name, path=path):
3761 result = fn(path, *extra_args)
3762 if cleanup_fn is not None:
3763 cleanup_fn(result)
3764
3765 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003766 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003767 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003768
3769 if allow_fd:
3770 result = fn(fd, *extra_args) # should not fail
3771 if cleanup_fn is not None:
3772 cleanup_fn(result)
3773 else:
3774 with self.assertRaisesRegex(
3775 TypeError,
3776 'os.PathLike'):
3777 fn(fd, *extra_args)
3778
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003779 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003780 msg = r'__fspath__\(\) to return str or bytes, not %s'
3781 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003782 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003783 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003784 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003785 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003786 os.stat(FakePath(object()))
3787
Brett Cannon3f9183b2016-08-26 14:44:48 -07003788
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003789@unittest.skipUnless(hasattr(os, 'get_blocking'),
3790 'needs os.get_blocking() and os.set_blocking()')
3791class BlockingTests(unittest.TestCase):
3792 def test_blocking(self):
3793 fd = os.open(__file__, os.O_RDONLY)
3794 self.addCleanup(os.close, fd)
3795 self.assertEqual(os.get_blocking(fd), True)
3796
3797 os.set_blocking(fd, False)
3798 self.assertEqual(os.get_blocking(fd), False)
3799
3800 os.set_blocking(fd, True)
3801 self.assertEqual(os.get_blocking(fd), True)
3802
3803
Yury Selivanov97e2e062014-09-26 12:33:06 -04003804
3805class ExportsTests(unittest.TestCase):
3806 def test_os_all(self):
3807 self.assertIn('open', os.__all__)
3808 self.assertIn('walk', os.__all__)
3809
3810
Eddie Elizondob3966632019-11-05 07:16:14 -08003811class TestDirEntry(unittest.TestCase):
3812 def setUp(self):
3813 self.path = os.path.realpath(support.TESTFN)
3814 self.addCleanup(support.rmtree, self.path)
3815 os.mkdir(self.path)
3816
3817 def test_uninstantiable(self):
3818 self.assertRaises(TypeError, os.DirEntry)
3819
3820 def test_unpickable(self):
3821 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3822 entry = [entry for entry in os.scandir(self.path)].pop()
3823 self.assertIsInstance(entry, os.DirEntry)
3824 self.assertEqual(entry.name, "file.txt")
3825 import pickle
3826 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3827
3828
Victor Stinner6036e442015-03-08 01:58:04 +01003829class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003830 check_no_resource_warning = support.check_no_resource_warning
3831
Victor Stinner6036e442015-03-08 01:58:04 +01003832 def setUp(self):
3833 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003834 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003835 self.addCleanup(support.rmtree, self.path)
3836 os.mkdir(self.path)
3837
3838 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003839 path = self.bytes_path if isinstance(name, bytes) else self.path
3840 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003841 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003842 return filename
3843
3844 def get_entries(self, names):
3845 entries = dict((entry.name, entry)
3846 for entry in os.scandir(self.path))
3847 self.assertEqual(sorted(entries.keys()), names)
3848 return entries
3849
3850 def assert_stat_equal(self, stat1, stat2, skip_fields):
3851 if skip_fields:
3852 for attr in dir(stat1):
3853 if not attr.startswith("st_"):
3854 continue
3855 if attr in ("st_dev", "st_ino", "st_nlink"):
3856 continue
3857 self.assertEqual(getattr(stat1, attr),
3858 getattr(stat2, attr),
3859 (stat1, stat2, attr))
3860 else:
3861 self.assertEqual(stat1, stat2)
3862
Eddie Elizondob3966632019-11-05 07:16:14 -08003863 def test_uninstantiable(self):
3864 scandir_iter = os.scandir(self.path)
3865 self.assertRaises(TypeError, type(scandir_iter))
3866 scandir_iter.close()
3867
3868 def test_unpickable(self):
3869 filename = self.create_file("file.txt")
3870 scandir_iter = os.scandir(self.path)
3871 import pickle
3872 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3873 scandir_iter.close()
3874
Victor Stinner6036e442015-03-08 01:58:04 +01003875 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003876 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003877 self.assertEqual(entry.name, name)
3878 self.assertEqual(entry.path, os.path.join(self.path, name))
3879 self.assertEqual(entry.inode(),
3880 os.stat(entry.path, follow_symlinks=False).st_ino)
3881
3882 entry_stat = os.stat(entry.path)
3883 self.assertEqual(entry.is_dir(),
3884 stat.S_ISDIR(entry_stat.st_mode))
3885 self.assertEqual(entry.is_file(),
3886 stat.S_ISREG(entry_stat.st_mode))
3887 self.assertEqual(entry.is_symlink(),
3888 os.path.islink(entry.path))
3889
3890 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3891 self.assertEqual(entry.is_dir(follow_symlinks=False),
3892 stat.S_ISDIR(entry_lstat.st_mode))
3893 self.assertEqual(entry.is_file(follow_symlinks=False),
3894 stat.S_ISREG(entry_lstat.st_mode))
3895
3896 self.assert_stat_equal(entry.stat(),
3897 entry_stat,
3898 os.name == 'nt' and not is_symlink)
3899 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3900 entry_lstat,
3901 os.name == 'nt')
3902
3903 def test_attributes(self):
3904 link = hasattr(os, 'link')
3905 symlink = support.can_symlink()
3906
3907 dirname = os.path.join(self.path, "dir")
3908 os.mkdir(dirname)
3909 filename = self.create_file("file.txt")
3910 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003911 try:
3912 os.link(filename, os.path.join(self.path, "link_file.txt"))
3913 except PermissionError as e:
3914 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003915 if symlink:
3916 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3917 target_is_directory=True)
3918 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3919
3920 names = ['dir', 'file.txt']
3921 if link:
3922 names.append('link_file.txt')
3923 if symlink:
3924 names.extend(('symlink_dir', 'symlink_file.txt'))
3925 entries = self.get_entries(names)
3926
3927 entry = entries['dir']
3928 self.check_entry(entry, 'dir', True, False, False)
3929
3930 entry = entries['file.txt']
3931 self.check_entry(entry, 'file.txt', False, True, False)
3932
3933 if link:
3934 entry = entries['link_file.txt']
3935 self.check_entry(entry, 'link_file.txt', False, True, False)
3936
3937 if symlink:
3938 entry = entries['symlink_dir']
3939 self.check_entry(entry, 'symlink_dir', True, False, True)
3940
3941 entry = entries['symlink_file.txt']
3942 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3943
3944 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003945 path = self.bytes_path if isinstance(name, bytes) else self.path
3946 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003947 self.assertEqual(len(entries), 1)
3948
3949 entry = entries[0]
3950 self.assertEqual(entry.name, name)
3951 return entry
3952
Brett Cannon96881cd2016-06-10 14:37:21 -07003953 def create_file_entry(self, name='file.txt'):
3954 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003955 return self.get_entry(os.path.basename(filename))
3956
3957 def test_current_directory(self):
3958 filename = self.create_file()
3959 old_dir = os.getcwd()
3960 try:
3961 os.chdir(self.path)
3962
3963 # call scandir() without parameter: it must list the content
3964 # of the current directory
3965 entries = dict((entry.name, entry) for entry in os.scandir())
3966 self.assertEqual(sorted(entries.keys()),
3967 [os.path.basename(filename)])
3968 finally:
3969 os.chdir(old_dir)
3970
3971 def test_repr(self):
3972 entry = self.create_file_entry()
3973 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3974
Brett Cannon96881cd2016-06-10 14:37:21 -07003975 def test_fspath_protocol(self):
3976 entry = self.create_file_entry()
3977 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3978
3979 def test_fspath_protocol_bytes(self):
3980 bytes_filename = os.fsencode('bytesfile.txt')
3981 bytes_entry = self.create_file_entry(name=bytes_filename)
3982 fspath = os.fspath(bytes_entry)
3983 self.assertIsInstance(fspath, bytes)
3984 self.assertEqual(fspath,
3985 os.path.join(os.fsencode(self.path),bytes_filename))
3986
Victor Stinner6036e442015-03-08 01:58:04 +01003987 def test_removed_dir(self):
3988 path = os.path.join(self.path, 'dir')
3989
3990 os.mkdir(path)
3991 entry = self.get_entry('dir')
3992 os.rmdir(path)
3993
3994 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3995 if os.name == 'nt':
3996 self.assertTrue(entry.is_dir())
3997 self.assertFalse(entry.is_file())
3998 self.assertFalse(entry.is_symlink())
3999 if os.name == 'nt':
4000 self.assertRaises(FileNotFoundError, entry.inode)
4001 # don't fail
4002 entry.stat()
4003 entry.stat(follow_symlinks=False)
4004 else:
4005 self.assertGreater(entry.inode(), 0)
4006 self.assertRaises(FileNotFoundError, entry.stat)
4007 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4008
4009 def test_removed_file(self):
4010 entry = self.create_file_entry()
4011 os.unlink(entry.path)
4012
4013 self.assertFalse(entry.is_dir())
4014 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4015 if os.name == 'nt':
4016 self.assertTrue(entry.is_file())
4017 self.assertFalse(entry.is_symlink())
4018 if os.name == 'nt':
4019 self.assertRaises(FileNotFoundError, entry.inode)
4020 # don't fail
4021 entry.stat()
4022 entry.stat(follow_symlinks=False)
4023 else:
4024 self.assertGreater(entry.inode(), 0)
4025 self.assertRaises(FileNotFoundError, entry.stat)
4026 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4027
4028 def test_broken_symlink(self):
4029 if not support.can_symlink():
4030 return self.skipTest('cannot create symbolic link')
4031
4032 filename = self.create_file("file.txt")
4033 os.symlink(filename,
4034 os.path.join(self.path, "symlink.txt"))
4035 entries = self.get_entries(['file.txt', 'symlink.txt'])
4036 entry = entries['symlink.txt']
4037 os.unlink(filename)
4038
4039 self.assertGreater(entry.inode(), 0)
4040 self.assertFalse(entry.is_dir())
4041 self.assertFalse(entry.is_file()) # broken symlink returns False
4042 self.assertFalse(entry.is_dir(follow_symlinks=False))
4043 self.assertFalse(entry.is_file(follow_symlinks=False))
4044 self.assertTrue(entry.is_symlink())
4045 self.assertRaises(FileNotFoundError, entry.stat)
4046 # don't fail
4047 entry.stat(follow_symlinks=False)
4048
4049 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004050 self.create_file("file.txt")
4051
4052 path_bytes = os.fsencode(self.path)
4053 entries = list(os.scandir(path_bytes))
4054 self.assertEqual(len(entries), 1, entries)
4055 entry = entries[0]
4056
4057 self.assertEqual(entry.name, b'file.txt')
4058 self.assertEqual(entry.path,
4059 os.fsencode(os.path.join(self.path, 'file.txt')))
4060
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004061 def test_bytes_like(self):
4062 self.create_file("file.txt")
4063
4064 for cls in bytearray, memoryview:
4065 path_bytes = cls(os.fsencode(self.path))
4066 with self.assertWarns(DeprecationWarning):
4067 entries = list(os.scandir(path_bytes))
4068 self.assertEqual(len(entries), 1, entries)
4069 entry = entries[0]
4070
4071 self.assertEqual(entry.name, b'file.txt')
4072 self.assertEqual(entry.path,
4073 os.fsencode(os.path.join(self.path, 'file.txt')))
4074 self.assertIs(type(entry.name), bytes)
4075 self.assertIs(type(entry.path), bytes)
4076
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004077 @unittest.skipUnless(os.listdir in os.supports_fd,
4078 'fd support for listdir required for this test.')
4079 def test_fd(self):
4080 self.assertIn(os.scandir, os.supports_fd)
4081 self.create_file('file.txt')
4082 expected_names = ['file.txt']
4083 if support.can_symlink():
4084 os.symlink('file.txt', os.path.join(self.path, 'link'))
4085 expected_names.append('link')
4086
4087 fd = os.open(self.path, os.O_RDONLY)
4088 try:
4089 with os.scandir(fd) as it:
4090 entries = list(it)
4091 names = [entry.name for entry in entries]
4092 self.assertEqual(sorted(names), expected_names)
4093 self.assertEqual(names, os.listdir(fd))
4094 for entry in entries:
4095 self.assertEqual(entry.path, entry.name)
4096 self.assertEqual(os.fspath(entry), entry.name)
4097 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4098 if os.stat in os.supports_dir_fd:
4099 st = os.stat(entry.name, dir_fd=fd)
4100 self.assertEqual(entry.stat(), st)
4101 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4102 self.assertEqual(entry.stat(follow_symlinks=False), st)
4103 finally:
4104 os.close(fd)
4105
Victor Stinner6036e442015-03-08 01:58:04 +01004106 def test_empty_path(self):
4107 self.assertRaises(FileNotFoundError, os.scandir, '')
4108
4109 def test_consume_iterator_twice(self):
4110 self.create_file("file.txt")
4111 iterator = os.scandir(self.path)
4112
4113 entries = list(iterator)
4114 self.assertEqual(len(entries), 1, entries)
4115
4116 # check than consuming the iterator twice doesn't raise exception
4117 entries2 = list(iterator)
4118 self.assertEqual(len(entries2), 0, entries2)
4119
4120 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004121 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004122 self.assertRaises(TypeError, os.scandir, obj)
4123
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004124 def test_close(self):
4125 self.create_file("file.txt")
4126 self.create_file("file2.txt")
4127 iterator = os.scandir(self.path)
4128 next(iterator)
4129 iterator.close()
4130 # multiple closes
4131 iterator.close()
4132 with self.check_no_resource_warning():
4133 del iterator
4134
4135 def test_context_manager(self):
4136 self.create_file("file.txt")
4137 self.create_file("file2.txt")
4138 with os.scandir(self.path) as iterator:
4139 next(iterator)
4140 with self.check_no_resource_warning():
4141 del iterator
4142
4143 def test_context_manager_close(self):
4144 self.create_file("file.txt")
4145 self.create_file("file2.txt")
4146 with os.scandir(self.path) as iterator:
4147 next(iterator)
4148 iterator.close()
4149
4150 def test_context_manager_exception(self):
4151 self.create_file("file.txt")
4152 self.create_file("file2.txt")
4153 with self.assertRaises(ZeroDivisionError):
4154 with os.scandir(self.path) as iterator:
4155 next(iterator)
4156 1/0
4157 with self.check_no_resource_warning():
4158 del iterator
4159
4160 def test_resource_warning(self):
4161 self.create_file("file.txt")
4162 self.create_file("file2.txt")
4163 iterator = os.scandir(self.path)
4164 next(iterator)
4165 with self.assertWarns(ResourceWarning):
4166 del iterator
4167 support.gc_collect()
4168 # exhausted iterator
4169 iterator = os.scandir(self.path)
4170 list(iterator)
4171 with self.check_no_resource_warning():
4172 del iterator
4173
Victor Stinner6036e442015-03-08 01:58:04 +01004174
Ethan Furmancdc08792016-06-02 15:06:09 -07004175class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004176
4177 # Abstracted so it can be overridden to test pure Python implementation
4178 # if a C version is provided.
4179 fspath = staticmethod(os.fspath)
4180
Ethan Furmancdc08792016-06-02 15:06:09 -07004181 def test_return_bytes(self):
4182 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004183 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004184
4185 def test_return_string(self):
4186 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004187 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004188
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004189 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004190 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004191 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004192
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004193 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004194 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4195 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4196
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004197 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004198 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4199 self.assertTrue(issubclass(FakePath, os.PathLike))
4200 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004201
Ethan Furmancdc08792016-06-02 15:06:09 -07004202 def test_garbage_in_exception_out(self):
4203 vapor = type('blah', (), {})
4204 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004205 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004206
4207 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004208 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004209
Brett Cannon044283a2016-07-15 10:41:49 -07004210 def test_bad_pathlike(self):
4211 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004212 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004213 # __fspath__ attribute that is not callable.
4214 c = type('foo', (), {})
4215 c.__fspath__ = 1
4216 self.assertRaises(TypeError, self.fspath, c())
4217 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004218 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004219 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004220
Bar Hareleae87e32019-12-22 11:57:27 +02004221 def test_pathlike_subclasshook(self):
4222 # bpo-38878: subclasshook causes subclass checks
4223 # true on abstract implementation.
4224 class A(os.PathLike):
4225 pass
4226 self.assertFalse(issubclass(FakePath, A))
4227 self.assertTrue(issubclass(FakePath, os.PathLike))
4228
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004229 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004230 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004231
Victor Stinnerc29b5852017-11-02 07:28:27 -07004232
4233class TimesTests(unittest.TestCase):
4234 def test_times(self):
4235 times = os.times()
4236 self.assertIsInstance(times, os.times_result)
4237
4238 for field in ('user', 'system', 'children_user', 'children_system',
4239 'elapsed'):
4240 value = getattr(times, field)
4241 self.assertIsInstance(value, float)
4242
4243 if os.name == 'nt':
4244 self.assertEqual(times.children_user, 0)
4245 self.assertEqual(times.children_system, 0)
4246 self.assertEqual(times.elapsed, 0)
4247
4248
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004249# Only test if the C version is provided, otherwise TestPEP519 already tested
4250# the pure Python implementation.
4251if hasattr(os, "_fspath"):
4252 class TestPEP519PurePython(TestPEP519):
4253
4254 """Explicitly test the pure Python implementation of os.fspath()."""
4255
4256 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004257
4258
Fred Drake2e2be372001-09-20 21:33:42 +00004259if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004260 unittest.main()