blob: ef2395d87a605cd65fa472bb7ec2794ef6df4f84 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070028import types
Victor Stinner47aacc82015-06-12 17:26:23 +020029import unittest
30import uuid
31import warnings
32from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +030033from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080034from test.support import threading_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070035from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020036
Antoine Pitrouec34ab52013-08-16 20:44:38 +020037try:
38 import resource
39except ImportError:
40 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020041try:
42 import fcntl
43except ImportError:
44 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010045try:
46 import _winapi
47except ImportError:
48 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020049try:
R David Murrayf2ad1732014-12-25 18:36:56 -050050 import pwd
51 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010052except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050053 all_users = []
54try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020055 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020056except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020057 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020058
Berker Peksagce643912015-05-06 06:33:17 +030059from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020060from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000061
Victor Stinner923590e2016-03-24 09:11:48 +010062
R David Murrayf2ad1732014-12-25 18:36:56 -050063root_in_posix = False
64if hasattr(os, 'geteuid'):
65 root_in_posix = (os.geteuid() == 0)
66
Mark Dickinson7cf03892010-04-16 13:45:35 +000067# Detect whether we're on a Linux system that uses the (now outdated
68# and unmaintained) linuxthreads threading library. There's an issue
69# when combining linuxthreads with a failed execv call: see
70# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020071if hasattr(sys, 'thread_info') and sys.thread_info.version:
72 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
73else:
74 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000075
Stefan Krahebee49a2013-01-17 15:31:00 +010076# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
77HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
78
Victor Stinner923590e2016-03-24 09:11:48 +010079
Berker Peksag4af23d72016-09-15 20:32:44 +030080def requires_os_func(name):
81 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
82
83
Victor Stinnerae39d232016-03-24 17:12:55 +010084def create_file(filename, content=b'content'):
85 with open(filename, "xb", 0) as fp:
86 fp.write(content)
87
88
Victor Stinner689830e2019-06-26 17:31:12 +020089class MiscTests(unittest.TestCase):
90 def test_getcwd(self):
91 cwd = os.getcwd()
92 self.assertIsInstance(cwd, str)
93
Victor Stinnerec3e20a2019-06-28 18:01:59 +020094 def test_getcwd_long_path(self):
95 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
96 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
97 # longer path if longer paths support is enabled. Internally, the os
98 # module uses MAXPATHLEN which is at least 1024.
99 #
100 # Use a directory name of 200 characters to fit into Windows MAX_PATH
101 # limit.
102 #
103 # On Windows, the test can stop when trying to create a path longer
104 # than MAX_PATH if long paths support is disabled:
105 # see RtlAreLongPathsEnabled().
106 min_len = 2000 # characters
107 dirlen = 200 # characters
108 dirname = 'python_test_dir_'
109 dirname = dirname + ('a' * (dirlen - len(dirname)))
110
111 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200112 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200113 expected = path
114
115 while True:
116 cwd = os.getcwd()
117 self.assertEqual(cwd, expected)
118
119 need = min_len - (len(cwd) + len(os.path.sep))
120 if need <= 0:
121 break
122 if len(dirname) > need and need > 0:
123 dirname = dirname[:need]
124
125 path = os.path.join(path, dirname)
126 try:
127 os.mkdir(path)
128 # On Windows, chdir() can fail
129 # even if mkdir() succeeded
130 os.chdir(path)
131 except FileNotFoundError:
132 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
133 # ERROR_FILENAME_EXCED_RANGE (206) errors
134 # ("The filename or extension is too long")
135 break
136 except OSError as exc:
137 if exc.errno == errno.ENAMETOOLONG:
138 break
139 else:
140 raise
141
142 expected = path
143
144 if support.verbose:
145 print(f"Tested current directory length: {len(cwd)}")
146
Victor Stinner689830e2019-06-26 17:31:12 +0200147 def test_getcwdb(self):
148 cwd = os.getcwdb()
149 self.assertIsInstance(cwd, bytes)
150 self.assertEqual(os.fsdecode(cwd), os.getcwd())
151
152
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000153# Tests creating TESTFN
154class FileTests(unittest.TestCase):
155 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000156 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000157 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000158 tearDown = setUp
159
160 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000161 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000162 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000163 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000164
Christian Heimesfdab48e2008-01-20 09:06:41 +0000165 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000166 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
167 # We must allocate two consecutive file descriptors, otherwise
168 # it will mess up other file descriptors (perhaps even the three
169 # standard ones).
170 second = os.dup(first)
171 try:
172 retries = 0
173 while second != first + 1:
174 os.close(first)
175 retries += 1
176 if retries > 10:
177 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000178 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000179 first, second = second, os.dup(second)
180 finally:
181 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000182 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000183 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000184 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000185
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000186 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000187 def test_rename(self):
188 path = support.TESTFN
189 old = sys.getrefcount(path)
190 self.assertRaises(TypeError, os.rename, path, 0)
191 new = sys.getrefcount(path)
192 self.assertEqual(old, new)
193
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000194 def test_read(self):
195 with open(support.TESTFN, "w+b") as fobj:
196 fobj.write(b"spam")
197 fobj.flush()
198 fd = fobj.fileno()
199 os.lseek(fd, 0, 0)
200 s = os.read(fd, 4)
201 self.assertEqual(type(s), bytes)
202 self.assertEqual(s, b"spam")
203
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200204 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200205 # Skip the test on 32-bit platforms: the number of bytes must fit in a
206 # Py_ssize_t type
207 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
208 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200209 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
210 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200211 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100212 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213
214 # Issue #21932: Make sure that os.read() does not raise an
215 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200216 with open(support.TESTFN, "rb") as fp:
217 data = os.read(fp.fileno(), size)
218
Victor Stinner8c663fd2017-11-08 14:44:44 -0800219 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200220 # operating system is free to return less bytes than requested.
221 self.assertEqual(data, b'test')
222
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000223 def test_write(self):
224 # os.write() accepts bytes- and buffer-like objects but not strings
225 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
226 self.assertRaises(TypeError, os.write, fd, "beans")
227 os.write(fd, b"bacon\n")
228 os.write(fd, bytearray(b"eggs\n"))
229 os.write(fd, memoryview(b"spam\n"))
230 os.close(fd)
231 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000232 self.assertEqual(fobj.read().splitlines(),
233 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000234
Victor Stinnere0daff12011-03-20 23:36:35 +0100235 def write_windows_console(self, *args):
236 retcode = subprocess.call(args,
237 # use a new console to not flood the test output
238 creationflags=subprocess.CREATE_NEW_CONSOLE,
239 # use a shell to hide the console window (SW_HIDE)
240 shell=True)
241 self.assertEqual(retcode, 0)
242
243 @unittest.skipUnless(sys.platform == 'win32',
244 'test specific to the Windows console')
245 def test_write_windows_console(self):
246 # Issue #11395: the Windows console returns an error (12: not enough
247 # space error) on writing into stdout if stdout mode is binary and the
248 # length is greater than 66,000 bytes (or less, depending on heap
249 # usage).
250 code = "print('x' * 100000)"
251 self.write_windows_console(sys.executable, "-c", code)
252 self.write_windows_console(sys.executable, "-u", "-c", code)
253
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000254 def fdopen_helper(self, *args):
255 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256 f = os.fdopen(fd, *args)
257 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000258
259 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200260 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
261 os.close(fd)
262
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000263 self.fdopen_helper()
264 self.fdopen_helper('r')
265 self.fdopen_helper('r', 100)
266
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100267 def test_replace(self):
268 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100269 self.addCleanup(support.unlink, support.TESTFN)
270 self.addCleanup(support.unlink, TESTFN2)
271
272 create_file(support.TESTFN, b"1")
273 create_file(TESTFN2, b"2")
274
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100275 os.replace(support.TESTFN, TESTFN2)
276 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
277 with open(TESTFN2, 'r') as f:
278 self.assertEqual(f.read(), "1")
279
Martin Panterbf19d162015-09-09 01:01:13 +0000280 def test_open_keywords(self):
281 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
282 dir_fd=None)
283 os.close(f)
284
285 def test_symlink_keywords(self):
286 symlink = support.get_attribute(os, "symlink")
287 try:
288 symlink(src='target', dst=support.TESTFN,
289 target_is_directory=False, dir_fd=None)
290 except (NotImplementedError, OSError):
291 pass # No OS support or unprivileged user
292
Pablo Galindoaac4d032019-05-31 19:39:47 +0100293 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
294 def test_copy_file_range_invalid_values(self):
295 with self.assertRaises(ValueError):
296 os.copy_file_range(0, 1, -10)
297
298 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
299 def test_copy_file_range(self):
300 TESTFN2 = support.TESTFN + ".3"
301 data = b'0123456789'
302
303 create_file(support.TESTFN, data)
304 self.addCleanup(support.unlink, support.TESTFN)
305
306 in_file = open(support.TESTFN, 'rb')
307 self.addCleanup(in_file.close)
308 in_fd = in_file.fileno()
309
310 out_file = open(TESTFN2, 'w+b')
311 self.addCleanup(support.unlink, TESTFN2)
312 self.addCleanup(out_file.close)
313 out_fd = out_file.fileno()
314
315 try:
316 i = os.copy_file_range(in_fd, out_fd, 5)
317 except OSError as e:
318 # Handle the case in which Python was compiled
319 # in a system with the syscall but without support
320 # in the kernel.
321 if e.errno != errno.ENOSYS:
322 raise
323 self.skipTest(e)
324 else:
325 # The number of copied bytes can be less than
326 # the number of bytes originally requested.
327 self.assertIn(i, range(0, 6));
328
329 with open(TESTFN2, 'rb') as in_file:
330 self.assertEqual(in_file.read(), data[:i])
331
332 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
333 def test_copy_file_range_offset(self):
334 TESTFN4 = support.TESTFN + ".4"
335 data = b'0123456789'
336 bytes_to_copy = 6
337 in_skip = 3
338 out_seek = 5
339
340 create_file(support.TESTFN, data)
341 self.addCleanup(support.unlink, support.TESTFN)
342
343 in_file = open(support.TESTFN, 'rb')
344 self.addCleanup(in_file.close)
345 in_fd = in_file.fileno()
346
347 out_file = open(TESTFN4, 'w+b')
348 self.addCleanup(support.unlink, TESTFN4)
349 self.addCleanup(out_file.close)
350 out_fd = out_file.fileno()
351
352 try:
353 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
354 offset_src=in_skip,
355 offset_dst=out_seek)
356 except OSError as e:
357 # Handle the case in which Python was compiled
358 # in a system with the syscall but without support
359 # in the kernel.
360 if e.errno != errno.ENOSYS:
361 raise
362 self.skipTest(e)
363 else:
364 # The number of copied bytes can be less than
365 # the number of bytes originally requested.
366 self.assertIn(i, range(0, bytes_to_copy+1));
367
368 with open(TESTFN4, 'rb') as in_file:
369 read = in_file.read()
370 # seeked bytes (5) are zero'ed
371 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
372 # 012 are skipped (in_skip)
373 # 345678 are copied in the file (in_skip + bytes_to_copy)
374 self.assertEqual(read[out_seek:],
375 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200376
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000377# Test attributes on return values from os.*stat* family.
378class StatAttributeTests(unittest.TestCase):
379 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200380 self.fname = support.TESTFN
381 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100382 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
Antoine Pitrou38425292010-09-21 18:19:07 +0000384 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000385 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386
387 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000388 self.assertEqual(result[stat.ST_SIZE], 3)
389 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000391 # Make sure all the attributes are there
392 members = dir(result)
393 for name in dir(stat):
394 if name[:3] == 'ST_':
395 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000396 if name.endswith("TIME"):
397 def trunc(x): return int(x)
398 else:
399 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000400 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000401 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000402 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000403
Larry Hastings6fe20b32012-04-19 15:07:49 -0700404 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700405 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700406 for name in 'st_atime st_mtime st_ctime'.split():
407 floaty = int(getattr(result, name) * 100000)
408 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700409 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700410
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 try:
412 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200413 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000414 except IndexError:
415 pass
416
417 # Make sure that assignment fails
418 try:
419 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200420 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000421 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000422 pass
423
424 try:
425 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200426 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000427 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000428 pass
429
430 try:
431 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200432 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000433 except AttributeError:
434 pass
435
436 # Use the stat_result constructor with a too-short tuple.
437 try:
438 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200439 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000440 except TypeError:
441 pass
442
Ezio Melotti42da6632011-03-15 05:18:48 +0200443 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000444 try:
445 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
446 except TypeError:
447 pass
448
Antoine Pitrou38425292010-09-21 18:19:07 +0000449 def test_stat_attributes(self):
450 self.check_stat_attributes(self.fname)
451
452 def test_stat_attributes_bytes(self):
453 try:
454 fname = self.fname.encode(sys.getfilesystemencoding())
455 except UnicodeEncodeError:
456 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700457 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000458
Christian Heimes25827622013-10-12 01:27:08 +0200459 def test_stat_result_pickle(self):
460 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200461 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
462 p = pickle.dumps(result, proto)
463 self.assertIn(b'stat_result', p)
464 if proto < 4:
465 self.assertIn(b'cos\nstat_result\n', p)
466 unpickled = pickle.loads(p)
467 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200468
Serhiy Storchaka43767632013-11-03 21:31:38 +0200469 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000470 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700471 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
473 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000474 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000475
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000476 # Make sure all the attributes are there.
477 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
478 'ffree', 'favail', 'flag', 'namemax')
479 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000480 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000481
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100482 self.assertTrue(isinstance(result.f_fsid, int))
483
484 # Test that the size of the tuple doesn't change
485 self.assertEqual(len(result), 10)
486
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000487 # Make sure that assignment really fails
488 try:
489 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200490 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000491 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000492 pass
493
494 try:
495 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200496 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000497 except AttributeError:
498 pass
499
500 # Use the constructor with a too-short tuple.
501 try:
502 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200503 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000504 except TypeError:
505 pass
506
Ezio Melotti42da6632011-03-15 05:18:48 +0200507 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000508 try:
509 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
510 except TypeError:
511 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000512
Christian Heimes25827622013-10-12 01:27:08 +0200513 @unittest.skipUnless(hasattr(os, 'statvfs'),
514 "need os.statvfs()")
515 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700516 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200517
Serhiy Storchakabad12572014-12-15 14:03:42 +0200518 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
519 p = pickle.dumps(result, proto)
520 self.assertIn(b'statvfs_result', p)
521 if proto < 4:
522 self.assertIn(b'cos\nstatvfs_result\n', p)
523 unpickled = pickle.loads(p)
524 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200525
Serhiy Storchaka43767632013-11-03 21:31:38 +0200526 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
527 def test_1686475(self):
528 # Verify that an open file can be stat'ed
529 try:
530 os.stat(r"c:\pagefile.sys")
531 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600532 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 except OSError as e:
534 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000535
Serhiy Storchaka43767632013-11-03 21:31:38 +0200536 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
537 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
538 def test_15261(self):
539 # Verify that stat'ing a closed fd does not cause crash
540 r, w = os.pipe()
541 try:
542 os.stat(r) # should not raise error
543 finally:
544 os.close(r)
545 os.close(w)
546 with self.assertRaises(OSError) as ctx:
547 os.stat(r)
548 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100549
Zachary Ware63f277b2014-06-19 09:46:37 -0500550 def check_file_attributes(self, result):
551 self.assertTrue(hasattr(result, 'st_file_attributes'))
552 self.assertTrue(isinstance(result.st_file_attributes, int))
553 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
554
555 @unittest.skipUnless(sys.platform == "win32",
556 "st_file_attributes is Win32 specific")
557 def test_file_attributes(self):
558 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
559 result = os.stat(self.fname)
560 self.check_file_attributes(result)
561 self.assertEqual(
562 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
563 0)
564
565 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200566 dirname = support.TESTFN + "dir"
567 os.mkdir(dirname)
568 self.addCleanup(os.rmdir, dirname)
569
570 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500571 self.check_file_attributes(result)
572 self.assertEqual(
573 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
574 stat.FILE_ATTRIBUTE_DIRECTORY)
575
Berker Peksag0b4dc482016-09-17 15:49:59 +0300576 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
577 def test_access_denied(self):
578 # Default to FindFirstFile WIN32_FIND_DATA when access is
579 # denied. See issue 28075.
580 # os.environ['TEMP'] should be located on a volume that
581 # supports file ACLs.
582 fname = os.path.join(os.environ['TEMP'], self.fname)
583 self.addCleanup(support.unlink, fname)
584 create_file(fname, b'ABC')
585 # Deny the right to [S]YNCHRONIZE on the file to
586 # force CreateFile to fail with ERROR_ACCESS_DENIED.
587 DETACHED_PROCESS = 8
588 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500589 # bpo-30584: Use security identifier *S-1-5-32-545 instead
590 # of localized "Users" to not depend on the locale.
591 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300592 creationflags=DETACHED_PROCESS
593 )
594 result = os.stat(fname)
595 self.assertNotEqual(result.st_size, 0)
596
Steve Dower772ec0f2019-09-04 14:42:54 -0700597 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
598 def test_stat_block_device(self):
599 # bpo-38030: os.stat fails for block devices
600 # Test a filename like "//./C:"
601 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
602 result = os.stat(fname)
603 self.assertEqual(result.st_mode, stat.S_IFBLK)
604
Victor Stinner47aacc82015-06-12 17:26:23 +0200605
606class UtimeTests(unittest.TestCase):
607 def setUp(self):
608 self.dirname = support.TESTFN
609 self.fname = os.path.join(self.dirname, "f1")
610
611 self.addCleanup(support.rmtree, self.dirname)
612 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100613 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200614
Victor Stinner47aacc82015-06-12 17:26:23 +0200615 def support_subsecond(self, filename):
616 # Heuristic to check if the filesystem supports timestamp with
617 # subsecond resolution: check if float and int timestamps are different
618 st = os.stat(filename)
619 return ((st.st_atime != st[7])
620 or (st.st_mtime != st[8])
621 or (st.st_ctime != st[9]))
622
623 def _test_utime(self, set_time, filename=None):
624 if not filename:
625 filename = self.fname
626
627 support_subsecond = self.support_subsecond(filename)
628 if support_subsecond:
629 # Timestamp with a resolution of 1 microsecond (10^-6).
630 #
631 # The resolution of the C internal function used by os.utime()
632 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
633 # test with a resolution of 1 ns requires more work:
634 # see the issue #15745.
635 atime_ns = 1002003000 # 1.002003 seconds
636 mtime_ns = 4005006000 # 4.005006 seconds
637 else:
638 # use a resolution of 1 second
639 atime_ns = 5 * 10**9
640 mtime_ns = 8 * 10**9
641
642 set_time(filename, (atime_ns, mtime_ns))
643 st = os.stat(filename)
644
645 if support_subsecond:
646 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
647 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
648 else:
649 self.assertEqual(st.st_atime, atime_ns * 1e-9)
650 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
651 self.assertEqual(st.st_atime_ns, atime_ns)
652 self.assertEqual(st.st_mtime_ns, mtime_ns)
653
654 def test_utime(self):
655 def set_time(filename, ns):
656 # test the ns keyword parameter
657 os.utime(filename, ns=ns)
658 self._test_utime(set_time)
659
660 @staticmethod
661 def ns_to_sec(ns):
662 # Convert a number of nanosecond (int) to a number of seconds (float).
663 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
664 # issue, os.utime() rounds towards minus infinity.
665 return (ns * 1e-9) + 0.5e-9
666
667 def test_utime_by_indexed(self):
668 # pass times as floating point seconds as the second indexed parameter
669 def set_time(filename, ns):
670 atime_ns, mtime_ns = ns
671 atime = self.ns_to_sec(atime_ns)
672 mtime = self.ns_to_sec(mtime_ns)
673 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
674 # or utime(time_t)
675 os.utime(filename, (atime, mtime))
676 self._test_utime(set_time)
677
678 def test_utime_by_times(self):
679 def set_time(filename, ns):
680 atime_ns, mtime_ns = ns
681 atime = self.ns_to_sec(atime_ns)
682 mtime = self.ns_to_sec(mtime_ns)
683 # test the times keyword parameter
684 os.utime(filename, times=(atime, mtime))
685 self._test_utime(set_time)
686
687 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
688 "follow_symlinks support for utime required "
689 "for this test.")
690 def test_utime_nofollow_symlinks(self):
691 def set_time(filename, ns):
692 # use follow_symlinks=False to test utimensat(timespec)
693 # or lutimes(timeval)
694 os.utime(filename, ns=ns, follow_symlinks=False)
695 self._test_utime(set_time)
696
697 @unittest.skipUnless(os.utime in os.supports_fd,
698 "fd support for utime required for this test.")
699 def test_utime_fd(self):
700 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100701 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200702 # use a file descriptor to test futimens(timespec)
703 # or futimes(timeval)
704 os.utime(fp.fileno(), ns=ns)
705 self._test_utime(set_time)
706
707 @unittest.skipUnless(os.utime in os.supports_dir_fd,
708 "dir_fd support for utime required for this test.")
709 def test_utime_dir_fd(self):
710 def set_time(filename, ns):
711 dirname, name = os.path.split(filename)
712 dirfd = os.open(dirname, os.O_RDONLY)
713 try:
714 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
715 os.utime(name, dir_fd=dirfd, ns=ns)
716 finally:
717 os.close(dirfd)
718 self._test_utime(set_time)
719
720 def test_utime_directory(self):
721 def set_time(filename, ns):
722 # test calling os.utime() on a directory
723 os.utime(filename, ns=ns)
724 self._test_utime(set_time, filename=self.dirname)
725
726 def _test_utime_current(self, set_time):
727 # Get the system clock
728 current = time.time()
729
730 # Call os.utime() to set the timestamp to the current system clock
731 set_time(self.fname)
732
733 if not self.support_subsecond(self.fname):
734 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700735 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200736 # On Windows, the usual resolution of time.time() is 15.6 ms.
737 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700738 #
739 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
740 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200741 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200742 st = os.stat(self.fname)
743 msg = ("st_time=%r, current=%r, dt=%r"
744 % (st.st_mtime, current, st.st_mtime - current))
745 self.assertAlmostEqual(st.st_mtime, current,
746 delta=delta, msg=msg)
747
748 def test_utime_current(self):
749 def set_time(filename):
750 # Set to the current time in the new way
751 os.utime(self.fname)
752 self._test_utime_current(set_time)
753
754 def test_utime_current_old(self):
755 def set_time(filename):
756 # Set to the current time in the old explicit way.
757 os.utime(self.fname, None)
758 self._test_utime_current(set_time)
759
760 def get_file_system(self, path):
761 if sys.platform == 'win32':
762 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
763 import ctypes
764 kernel32 = ctypes.windll.kernel32
765 buf = ctypes.create_unicode_buffer("", 100)
766 ok = kernel32.GetVolumeInformationW(root, None, 0,
767 None, None, None,
768 buf, len(buf))
769 if ok:
770 return buf.value
771 # return None if the filesystem is unknown
772
773 def test_large_time(self):
774 # Many filesystems are limited to the year 2038. At least, the test
775 # pass with NTFS filesystem.
776 if self.get_file_system(self.dirname) != "NTFS":
777 self.skipTest("requires NTFS")
778
779 large = 5000000000 # some day in 2128
780 os.utime(self.fname, (large, large))
781 self.assertEqual(os.stat(self.fname).st_mtime, large)
782
783 def test_utime_invalid_arguments(self):
784 # seconds and nanoseconds parameters are mutually exclusive
785 with self.assertRaises(ValueError):
786 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200787 with self.assertRaises(TypeError):
788 os.utime(self.fname, [5, 5])
789 with self.assertRaises(TypeError):
790 os.utime(self.fname, (5,))
791 with self.assertRaises(TypeError):
792 os.utime(self.fname, (5, 5, 5))
793 with self.assertRaises(TypeError):
794 os.utime(self.fname, ns=[5, 5])
795 with self.assertRaises(TypeError):
796 os.utime(self.fname, ns=(5,))
797 with self.assertRaises(TypeError):
798 os.utime(self.fname, ns=(5, 5, 5))
799
800 if os.utime not in os.supports_follow_symlinks:
801 with self.assertRaises(NotImplementedError):
802 os.utime(self.fname, (5, 5), follow_symlinks=False)
803 if os.utime not in os.supports_fd:
804 with open(self.fname, 'wb', 0) as fp:
805 with self.assertRaises(TypeError):
806 os.utime(fp.fileno(), (5, 5))
807 if os.utime not in os.supports_dir_fd:
808 with self.assertRaises(NotImplementedError):
809 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200810
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300811 @support.cpython_only
812 def test_issue31577(self):
813 # The interpreter shouldn't crash in case utime() received a bad
814 # ns argument.
815 def get_bad_int(divmod_ret_val):
816 class BadInt:
817 def __divmod__(*args):
818 return divmod_ret_val
819 return BadInt()
820 with self.assertRaises(TypeError):
821 os.utime(self.fname, ns=(get_bad_int(42), 1))
822 with self.assertRaises(TypeError):
823 os.utime(self.fname, ns=(get_bad_int(()), 1))
824 with self.assertRaises(TypeError):
825 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
826
Victor Stinner47aacc82015-06-12 17:26:23 +0200827
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000828from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000829
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000830class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000832 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000833
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000834 def setUp(self):
835 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000836 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000837 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000838 for key, value in self._reference().items():
839 os.environ[key] = value
840
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000841 def tearDown(self):
842 os.environ.clear()
843 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000844 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000845 os.environb.clear()
846 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000847
Christian Heimes90333392007-11-01 19:08:42 +0000848 def _reference(self):
849 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
850
851 def _empty_mapping(self):
852 os.environ.clear()
853 return os.environ
854
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000855 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200856 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
857 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000858 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000859 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300860 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200861 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300862 value = popen.read().strip()
863 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000864
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
866 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000867 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200868 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
869 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300870 it = iter(popen)
871 self.assertEqual(next(it), "line1\n")
872 self.assertEqual(next(it), "line2\n")
873 self.assertEqual(next(it), "line3\n")
874 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000875
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000876 # Verify environ keys and values from the OS are of the
877 # correct str type.
878 def test_keyvalue_types(self):
879 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000880 self.assertEqual(type(key), str)
881 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000882
Christian Heimes90333392007-11-01 19:08:42 +0000883 def test_items(self):
884 for key, value in self._reference().items():
885 self.assertEqual(os.environ.get(key), value)
886
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000887 # Issue 7310
888 def test___repr__(self):
889 """Check that the repr() of os.environ looks like environ({...})."""
890 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000891 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
892 '{!r}: {!r}'.format(key, value)
893 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000894
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000895 def test_get_exec_path(self):
896 defpath_list = os.defpath.split(os.pathsep)
897 test_path = ['/monty', '/python', '', '/flying/circus']
898 test_env = {'PATH': os.pathsep.join(test_path)}
899
900 saved_environ = os.environ
901 try:
902 os.environ = dict(test_env)
903 # Test that defaulting to os.environ works.
904 self.assertSequenceEqual(test_path, os.get_exec_path())
905 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
906 finally:
907 os.environ = saved_environ
908
909 # No PATH environment variable
910 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
911 # Empty PATH environment variable
912 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
913 # Supplied PATH environment variable
914 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
915
Victor Stinnerb745a742010-05-18 17:17:23 +0000916 if os.supports_bytes_environ:
917 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000918 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000919 # ignore BytesWarning warning
920 with warnings.catch_warnings(record=True):
921 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000922 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000923 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000924 pass
925 else:
926 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000927
928 # bytes key and/or value
929 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
930 ['abc'])
931 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
932 ['abc'])
933 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
934 ['abc'])
935
936 @unittest.skipUnless(os.supports_bytes_environ,
937 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000938 def test_environb(self):
939 # os.environ -> os.environb
940 value = 'euro\u20ac'
941 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000942 value_bytes = value.encode(sys.getfilesystemencoding(),
943 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000944 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000945 msg = "U+20AC character is not encodable to %s" % (
946 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000947 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000948 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000949 self.assertEqual(os.environ['unicode'], value)
950 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000951
952 # os.environb -> os.environ
953 value = b'\xff'
954 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000955 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000956 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000957 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000958
Victor Stinner161e7b32020-01-24 11:53:44 +0100959 def test_putenv_unsetenv(self):
960 name = "PYTHONTESTVAR"
961 value = "testvalue"
962 code = f'import os; print(repr(os.environ.get({name!r})))'
963
964 with support.EnvironmentVarGuard() as env:
965 env.pop(name, None)
966
967 os.putenv(name, value)
968 proc = subprocess.run([sys.executable, '-c', code], check=True,
969 stdout=subprocess.PIPE, text=True)
970 self.assertEqual(proc.stdout.rstrip(), repr(value))
971
972 os.unsetenv(name)
973 proc = subprocess.run([sys.executable, '-c', code], check=True,
974 stdout=subprocess.PIPE, text=True)
975 self.assertEqual(proc.stdout.rstrip(), repr(None))
976
Victor Stinner13ff2452018-01-22 18:32:50 +0100977 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100978 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100979 def test_putenv_unsetenv_error(self):
980 # Empty variable name is invalid.
981 # "=" and null character are not allowed in a variable name.
982 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
983 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
984 self.assertRaises((OSError, ValueError), os.unsetenv, name)
985
Victor Stinnerb73dd022020-01-22 21:11:17 +0100986 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100987 # On Windows, an environment variable string ("name=value" string)
988 # is limited to 32,767 characters
989 longstr = 'x' * 32_768
990 self.assertRaises(ValueError, os.putenv, longstr, "1")
991 self.assertRaises(ValueError, os.putenv, "X", longstr)
992 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +0100993
Victor Stinner6d101392013-04-14 16:35:04 +0200994 def test_key_type(self):
995 missing = 'missingkey'
996 self.assertNotIn(missing, os.environ)
997
Victor Stinner839e5ea2013-04-14 16:43:03 +0200998 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200999 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001000 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001001 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001002
Victor Stinner839e5ea2013-04-14 16:43:03 +02001003 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001004 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001005 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001006 self.assertTrue(cm.exception.__suppress_context__)
1007
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001008 def _test_environ_iteration(self, collection):
1009 iterator = iter(collection)
1010 new_key = "__new_key__"
1011
1012 next(iterator) # start iteration over os.environ.items
1013
1014 # add a new key in os.environ mapping
1015 os.environ[new_key] = "test_environ_iteration"
1016
1017 try:
1018 next(iterator) # force iteration over modified mapping
1019 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1020 finally:
1021 del os.environ[new_key]
1022
1023 def test_iter_error_when_changing_os_environ(self):
1024 self._test_environ_iteration(os.environ)
1025
1026 def test_iter_error_when_changing_os_environ_items(self):
1027 self._test_environ_iteration(os.environ.items())
1028
1029 def test_iter_error_when_changing_os_environ_values(self):
1030 self._test_environ_iteration(os.environ.values())
1031
Charles Burklandd648ef12020-03-13 09:04:43 -07001032 def _test_underlying_process_env(self, var, expected):
1033 if not (unix_shell and os.path.exists(unix_shell)):
1034 return
1035
1036 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1037 value = popen.read().strip()
1038
1039 self.assertEqual(expected, value)
1040
1041 def test_or_operator(self):
1042 overridden_key = '_TEST_VAR_'
1043 original_value = 'original_value'
1044 os.environ[overridden_key] = original_value
1045
1046 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1047 expected = dict(os.environ)
1048 expected.update(new_vars_dict)
1049
1050 actual = os.environ | new_vars_dict
1051 self.assertDictEqual(expected, actual)
1052 self.assertEqual('3', actual[overridden_key])
1053
1054 new_vars_items = new_vars_dict.items()
1055 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1056
1057 self._test_underlying_process_env('_A_', '')
1058 self._test_underlying_process_env(overridden_key, original_value)
1059
1060 def test_ior_operator(self):
1061 overridden_key = '_TEST_VAR_'
1062 os.environ[overridden_key] = 'original_value'
1063
1064 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1065 expected = dict(os.environ)
1066 expected.update(new_vars_dict)
1067
1068 os.environ |= new_vars_dict
1069 self.assertEqual(expected, os.environ)
1070 self.assertEqual('3', os.environ[overridden_key])
1071
1072 self._test_underlying_process_env('_A_', '1')
1073 self._test_underlying_process_env(overridden_key, '3')
1074
1075 def test_ior_operator_invalid_dicts(self):
1076 os_environ_copy = os.environ.copy()
1077 with self.assertRaises(TypeError):
1078 dict_with_bad_key = {1: '_A_'}
1079 os.environ |= dict_with_bad_key
1080
1081 with self.assertRaises(TypeError):
1082 dict_with_bad_val = {'_A_': 1}
1083 os.environ |= dict_with_bad_val
1084
1085 # Check nothing was added.
1086 self.assertEqual(os_environ_copy, os.environ)
1087
1088 def test_ior_operator_key_value_iterable(self):
1089 overridden_key = '_TEST_VAR_'
1090 os.environ[overridden_key] = 'original_value'
1091
1092 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1093 expected = dict(os.environ)
1094 expected.update(new_vars_items)
1095
1096 os.environ |= new_vars_items
1097 self.assertEqual(expected, os.environ)
1098 self.assertEqual('3', os.environ[overridden_key])
1099
1100 self._test_underlying_process_env('_A_', '1')
1101 self._test_underlying_process_env(overridden_key, '3')
1102
1103 def test_ror_operator(self):
1104 overridden_key = '_TEST_VAR_'
1105 original_value = 'original_value'
1106 os.environ[overridden_key] = original_value
1107
1108 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1109 expected = dict(new_vars_dict)
1110 expected.update(os.environ)
1111
1112 actual = new_vars_dict | os.environ
1113 self.assertDictEqual(expected, actual)
1114 self.assertEqual(original_value, actual[overridden_key])
1115
1116 new_vars_items = new_vars_dict.items()
1117 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1118
1119 self._test_underlying_process_env('_A_', '')
1120 self._test_underlying_process_env(overridden_key, original_value)
1121
Victor Stinner6d101392013-04-14 16:35:04 +02001122
Tim Petersc4e09402003-04-25 07:11:48 +00001123class WalkTests(unittest.TestCase):
1124 """Tests for os.walk()."""
1125
Victor Stinner0561c532015-03-12 10:28:24 +01001126 # Wrapper to hide minor differences between os.walk and os.fwalk
1127 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001128 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001129 if 'follow_symlinks' in kwargs:
1130 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001131 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001132
Charles-François Natali7372b062012-02-05 15:15:38 +01001133 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001134 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001135 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001136
1137 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001138 # TESTFN/
1139 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001140 # tmp1
1141 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001142 # tmp2
1143 # SUB11/ no kids
1144 # SUB2/ a file kid and a dirsymlink kid
1145 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001146 # SUB21/ not readable
1147 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001148 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001149 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001150 # broken_link2
1151 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001152 # TEST2/
1153 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001154 self.walk_path = join(support.TESTFN, "TEST1")
1155 self.sub1_path = join(self.walk_path, "SUB1")
1156 self.sub11_path = join(self.sub1_path, "SUB11")
1157 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001158 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001159 tmp1_path = join(self.walk_path, "tmp1")
1160 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001161 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001162 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001163 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001164 t2_path = join(support.TESTFN, "TEST2")
1165 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001166 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001167 broken_link2_path = join(sub2_path, "broken_link2")
1168 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001169
1170 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001171 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001172 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001173 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001174 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001175
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001176 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001177 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001178 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001179
Victor Stinner0561c532015-03-12 10:28:24 +01001180 if support.can_symlink():
1181 os.symlink(os.path.abspath(t2_path), self.link_path)
1182 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001183 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1184 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001185 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001186 ["broken_link", "broken_link2", "broken_link3",
1187 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001188 else:
pxinwr3e028b22019-02-15 13:04:47 +08001189 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001190
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001191 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001192 try:
1193 os.listdir(sub21_path)
1194 except PermissionError:
1195 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1196 else:
1197 os.chmod(sub21_path, stat.S_IRWXU)
1198 os.unlink(tmp5_path)
1199 os.rmdir(sub21_path)
1200 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001201
Victor Stinner0561c532015-03-12 10:28:24 +01001202 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001203 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001204 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001205
Tim Petersc4e09402003-04-25 07:11:48 +00001206 self.assertEqual(len(all), 4)
1207 # We can't know which order SUB1 and SUB2 will appear in.
1208 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1209 # flipped: TESTFN, SUB2, SUB1, SUB11
1210 flipped = all[0][1][0] != "SUB1"
1211 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001212 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001213 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001214 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1215 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1216 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1217 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001218
Brett Cannon3f9183b2016-08-26 14:44:48 -07001219 def test_walk_prune(self, walk_path=None):
1220 if walk_path is None:
1221 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001222 # Prune the search.
1223 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001224 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001225 all.append((root, dirs, files))
1226 # Don't descend into SUB1.
1227 if 'SUB1' in dirs:
1228 # Note that this also mutates the dirs we appended to all!
1229 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001230
Victor Stinner0561c532015-03-12 10:28:24 +01001231 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001232 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001233
1234 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001235 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001236 self.assertEqual(all[1], self.sub2_tree)
1237
Brett Cannon3f9183b2016-08-26 14:44:48 -07001238 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001239 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001240
Victor Stinner0561c532015-03-12 10:28:24 +01001241 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001242 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001243 all = list(self.walk(self.walk_path, topdown=False))
1244
Victor Stinner53b0a412016-03-26 01:12:36 +01001245 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001246 # We can't know which order SUB1 and SUB2 will appear in.
1247 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1248 # flipped: SUB2, SUB11, SUB1, TESTFN
1249 flipped = all[3][1][0] != "SUB1"
1250 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001251 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001252 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001253 self.assertEqual(all[3],
1254 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1255 self.assertEqual(all[flipped],
1256 (self.sub11_path, [], []))
1257 self.assertEqual(all[flipped + 1],
1258 (self.sub1_path, ["SUB11"], ["tmp2"]))
1259 self.assertEqual(all[2 - 2 * flipped],
1260 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001261
Victor Stinner0561c532015-03-12 10:28:24 +01001262 def test_walk_symlink(self):
1263 if not support.can_symlink():
1264 self.skipTest("need symlink support")
1265
1266 # Walk, following symlinks.
1267 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1268 for root, dirs, files in walk_it:
1269 if root == self.link_path:
1270 self.assertEqual(dirs, [])
1271 self.assertEqual(files, ["tmp4"])
1272 break
1273 else:
1274 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001275
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001276 def test_walk_bad_dir(self):
1277 # Walk top-down.
1278 errors = []
1279 walk_it = self.walk(self.walk_path, onerror=errors.append)
1280 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001281 self.assertEqual(errors, [])
1282 dir1 = 'SUB1'
1283 path1 = os.path.join(root, dir1)
1284 path1new = os.path.join(root, dir1 + '.new')
1285 os.rename(path1, path1new)
1286 try:
1287 roots = [r for r, d, f in walk_it]
1288 self.assertTrue(errors)
1289 self.assertNotIn(path1, roots)
1290 self.assertNotIn(path1new, roots)
1291 for dir2 in dirs:
1292 if dir2 != dir1:
1293 self.assertIn(os.path.join(root, dir2), roots)
1294 finally:
1295 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001296
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001297 def test_walk_many_open_files(self):
1298 depth = 30
1299 base = os.path.join(support.TESTFN, 'deep')
1300 p = os.path.join(base, *(['d']*depth))
1301 os.makedirs(p)
1302
1303 iters = [self.walk(base, topdown=False) for j in range(100)]
1304 for i in range(depth + 1):
1305 expected = (p, ['d'] if i else [], [])
1306 for it in iters:
1307 self.assertEqual(next(it), expected)
1308 p = os.path.dirname(p)
1309
1310 iters = [self.walk(base, topdown=True) for j in range(100)]
1311 p = base
1312 for i in range(depth + 1):
1313 expected = (p, ['d'] if i < depth else [], [])
1314 for it in iters:
1315 self.assertEqual(next(it), expected)
1316 p = os.path.join(p, 'd')
1317
Charles-François Natali7372b062012-02-05 15:15:38 +01001318
1319@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1320class FwalkTests(WalkTests):
1321 """Tests for os.fwalk()."""
1322
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001323 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001324 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001325 yield (root, dirs, files)
1326
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001327 def fwalk(self, *args, **kwargs):
1328 return os.fwalk(*args, **kwargs)
1329
Larry Hastingsc48fe982012-06-25 04:49:05 -07001330 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1331 """
1332 compare with walk() results.
1333 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001334 walk_kwargs = walk_kwargs.copy()
1335 fwalk_kwargs = fwalk_kwargs.copy()
1336 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1337 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1338 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001339
Charles-François Natali7372b062012-02-05 15:15:38 +01001340 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001341 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001342 expected[root] = (set(dirs), set(files))
1343
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001344 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001345 self.assertIn(root, expected)
1346 self.assertEqual(expected[root], (set(dirs), set(files)))
1347
Larry Hastingsc48fe982012-06-25 04:49:05 -07001348 def test_compare_to_walk(self):
1349 kwargs = {'top': support.TESTFN}
1350 self._compare_to_walk(kwargs, kwargs)
1351
Charles-François Natali7372b062012-02-05 15:15:38 +01001352 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001353 try:
1354 fd = os.open(".", os.O_RDONLY)
1355 walk_kwargs = {'top': support.TESTFN}
1356 fwalk_kwargs = walk_kwargs.copy()
1357 fwalk_kwargs['dir_fd'] = fd
1358 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1359 finally:
1360 os.close(fd)
1361
1362 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001363 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001364 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1365 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001366 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001367 # check that the FD is valid
1368 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001369 # redundant check
1370 os.stat(rootfd)
1371 # check that listdir() returns consistent information
1372 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001373
1374 def test_fd_leak(self):
1375 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1376 # we both check that calling fwalk() a large number of times doesn't
1377 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1378 minfd = os.dup(1)
1379 os.close(minfd)
1380 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001381 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001382 pass
1383 newfd = os.dup(1)
1384 self.addCleanup(os.close, newfd)
1385 self.assertEqual(newfd, minfd)
1386
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001387 # fwalk() keeps file descriptors open
1388 test_walk_many_open_files = None
1389
1390
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001391class BytesWalkTests(WalkTests):
1392 """Tests for os.walk() with bytes."""
1393 def walk(self, top, **kwargs):
1394 if 'follow_symlinks' in kwargs:
1395 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1396 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1397 root = os.fsdecode(broot)
1398 dirs = list(map(os.fsdecode, bdirs))
1399 files = list(map(os.fsdecode, bfiles))
1400 yield (root, dirs, files)
1401 bdirs[:] = list(map(os.fsencode, dirs))
1402 bfiles[:] = list(map(os.fsencode, files))
1403
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001404@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1405class BytesFwalkTests(FwalkTests):
1406 """Tests for os.walk() with bytes."""
1407 def fwalk(self, top='.', *args, **kwargs):
1408 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1409 root = os.fsdecode(broot)
1410 dirs = list(map(os.fsdecode, bdirs))
1411 files = list(map(os.fsdecode, bfiles))
1412 yield (root, dirs, files, topfd)
1413 bdirs[:] = list(map(os.fsencode, dirs))
1414 bfiles[:] = list(map(os.fsencode, files))
1415
Charles-François Natali7372b062012-02-05 15:15:38 +01001416
Guido van Rossume7ba4952007-06-06 23:52:48 +00001417class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001418 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001419 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001420
1421 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001422 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001423 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1424 os.makedirs(path) # Should work
1425 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1426 os.makedirs(path)
1427
1428 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001429 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001430 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1431 os.makedirs(path)
1432 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1433 'dir5', 'dir6')
1434 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001435
Serhiy Storchakae304e332017-03-24 13:27:42 +02001436 def test_mode(self):
1437 with support.temp_umask(0o002):
1438 base = support.TESTFN
1439 parent = os.path.join(base, 'dir1')
1440 path = os.path.join(parent, 'dir2')
1441 os.makedirs(path, 0o555)
1442 self.assertTrue(os.path.exists(path))
1443 self.assertTrue(os.path.isdir(path))
1444 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001445 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1446 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001447
Terry Reedy5a22b652010-12-02 07:05:56 +00001448 def test_exist_ok_existing_directory(self):
1449 path = os.path.join(support.TESTFN, 'dir1')
1450 mode = 0o777
1451 old_mask = os.umask(0o022)
1452 os.makedirs(path, mode)
1453 self.assertRaises(OSError, os.makedirs, path, mode)
1454 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001455 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001456 os.makedirs(path, mode=mode, exist_ok=True)
1457 os.umask(old_mask)
1458
Martin Pantera82642f2015-11-19 04:48:44 +00001459 # Issue #25583: A drive root could raise PermissionError on Windows
1460 os.makedirs(os.path.abspath('/'), exist_ok=True)
1461
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001462 def test_exist_ok_s_isgid_directory(self):
1463 path = os.path.join(support.TESTFN, 'dir1')
1464 S_ISGID = stat.S_ISGID
1465 mode = 0o777
1466 old_mask = os.umask(0o022)
1467 try:
1468 existing_testfn_mode = stat.S_IMODE(
1469 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001470 try:
1471 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001472 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001473 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001474 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1475 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1476 # The os should apply S_ISGID from the parent dir for us, but
1477 # this test need not depend on that behavior. Be explicit.
1478 os.makedirs(path, mode | S_ISGID)
1479 # http://bugs.python.org/issue14992
1480 # Should not fail when the bit is already set.
1481 os.makedirs(path, mode, exist_ok=True)
1482 # remove the bit.
1483 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001484 # May work even when the bit is not already set when demanded.
1485 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001486 finally:
1487 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001488
1489 def test_exist_ok_existing_regular_file(self):
1490 base = support.TESTFN
1491 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001492 with open(path, 'w') as f:
1493 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001494 self.assertRaises(OSError, os.makedirs, path)
1495 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1496 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1497 os.remove(path)
1498
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001499 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001500 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001501 'dir4', 'dir5', 'dir6')
1502 # If the tests failed, the bottom-most directory ('../dir6')
1503 # may not have been created, so we look for the outermost directory
1504 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001505 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001506 path = os.path.dirname(path)
1507
1508 os.removedirs(path)
1509
Andrew Svetlov405faed2012-12-25 12:18:09 +02001510
R David Murrayf2ad1732014-12-25 18:36:56 -05001511@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1512class ChownFileTests(unittest.TestCase):
1513
Berker Peksag036a71b2015-07-21 09:29:48 +03001514 @classmethod
1515 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001516 os.mkdir(support.TESTFN)
1517
1518 def test_chown_uid_gid_arguments_must_be_index(self):
1519 stat = os.stat(support.TESTFN)
1520 uid = stat.st_uid
1521 gid = stat.st_gid
1522 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1523 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1524 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1525 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1526 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1527
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001528 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1529 def test_chown_gid(self):
1530 groups = os.getgroups()
1531 if len(groups) < 2:
1532 self.skipTest("test needs at least 2 groups")
1533
R David Murrayf2ad1732014-12-25 18:36:56 -05001534 gid_1, gid_2 = groups[:2]
1535 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001536
R David Murrayf2ad1732014-12-25 18:36:56 -05001537 os.chown(support.TESTFN, uid, gid_1)
1538 gid = os.stat(support.TESTFN).st_gid
1539 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001540
R David Murrayf2ad1732014-12-25 18:36:56 -05001541 os.chown(support.TESTFN, uid, gid_2)
1542 gid = os.stat(support.TESTFN).st_gid
1543 self.assertEqual(gid, gid_2)
1544
1545 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1546 "test needs root privilege and more than one user")
1547 def test_chown_with_root(self):
1548 uid_1, uid_2 = all_users[:2]
1549 gid = os.stat(support.TESTFN).st_gid
1550 os.chown(support.TESTFN, uid_1, gid)
1551 uid = os.stat(support.TESTFN).st_uid
1552 self.assertEqual(uid, uid_1)
1553 os.chown(support.TESTFN, uid_2, gid)
1554 uid = os.stat(support.TESTFN).st_uid
1555 self.assertEqual(uid, uid_2)
1556
1557 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1558 "test needs non-root account and more than one user")
1559 def test_chown_without_permission(self):
1560 uid_1, uid_2 = all_users[:2]
1561 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001562 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001563 os.chown(support.TESTFN, uid_1, gid)
1564 os.chown(support.TESTFN, uid_2, gid)
1565
Berker Peksag036a71b2015-07-21 09:29:48 +03001566 @classmethod
1567 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001568 os.rmdir(support.TESTFN)
1569
1570
Andrew Svetlov405faed2012-12-25 12:18:09 +02001571class RemoveDirsTests(unittest.TestCase):
1572 def setUp(self):
1573 os.makedirs(support.TESTFN)
1574
1575 def tearDown(self):
1576 support.rmtree(support.TESTFN)
1577
1578 def test_remove_all(self):
1579 dira = os.path.join(support.TESTFN, 'dira')
1580 os.mkdir(dira)
1581 dirb = os.path.join(dira, 'dirb')
1582 os.mkdir(dirb)
1583 os.removedirs(dirb)
1584 self.assertFalse(os.path.exists(dirb))
1585 self.assertFalse(os.path.exists(dira))
1586 self.assertFalse(os.path.exists(support.TESTFN))
1587
1588 def test_remove_partial(self):
1589 dira = os.path.join(support.TESTFN, 'dira')
1590 os.mkdir(dira)
1591 dirb = os.path.join(dira, 'dirb')
1592 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001593 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001594 os.removedirs(dirb)
1595 self.assertFalse(os.path.exists(dirb))
1596 self.assertTrue(os.path.exists(dira))
1597 self.assertTrue(os.path.exists(support.TESTFN))
1598
1599 def test_remove_nothing(self):
1600 dira = os.path.join(support.TESTFN, 'dira')
1601 os.mkdir(dira)
1602 dirb = os.path.join(dira, 'dirb')
1603 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001604 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001605 with self.assertRaises(OSError):
1606 os.removedirs(dirb)
1607 self.assertTrue(os.path.exists(dirb))
1608 self.assertTrue(os.path.exists(dira))
1609 self.assertTrue(os.path.exists(support.TESTFN))
1610
1611
Guido van Rossume7ba4952007-06-06 23:52:48 +00001612class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001613 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001614 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001615 f.write(b'hello')
1616 f.close()
1617 with open(os.devnull, 'rb') as f:
1618 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001619
Andrew Svetlov405faed2012-12-25 12:18:09 +02001620
Guido van Rossume7ba4952007-06-06 23:52:48 +00001621class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001622 def test_urandom_length(self):
1623 self.assertEqual(len(os.urandom(0)), 0)
1624 self.assertEqual(len(os.urandom(1)), 1)
1625 self.assertEqual(len(os.urandom(10)), 10)
1626 self.assertEqual(len(os.urandom(100)), 100)
1627 self.assertEqual(len(os.urandom(1000)), 1000)
1628
1629 def test_urandom_value(self):
1630 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001631 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001632 data2 = os.urandom(16)
1633 self.assertNotEqual(data1, data2)
1634
1635 def get_urandom_subprocess(self, count):
1636 code = '\n'.join((
1637 'import os, sys',
1638 'data = os.urandom(%s)' % count,
1639 'sys.stdout.buffer.write(data)',
1640 'sys.stdout.buffer.flush()'))
1641 out = assert_python_ok('-c', code)
1642 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001643 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001644 return stdout
1645
1646 def test_urandom_subprocess(self):
1647 data1 = self.get_urandom_subprocess(16)
1648 data2 = self.get_urandom_subprocess(16)
1649 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001650
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001651
Victor Stinner9b1f4742016-09-06 16:18:52 -07001652@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1653class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001654 @classmethod
1655 def setUpClass(cls):
1656 try:
1657 os.getrandom(1)
1658 except OSError as exc:
1659 if exc.errno == errno.ENOSYS:
1660 # Python compiled on a more recent Linux version
1661 # than the current Linux kernel
1662 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1663 else:
1664 raise
1665
Victor Stinner9b1f4742016-09-06 16:18:52 -07001666 def test_getrandom_type(self):
1667 data = os.getrandom(16)
1668 self.assertIsInstance(data, bytes)
1669 self.assertEqual(len(data), 16)
1670
1671 def test_getrandom0(self):
1672 empty = os.getrandom(0)
1673 self.assertEqual(empty, b'')
1674
1675 def test_getrandom_random(self):
1676 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1677
1678 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1679 # resource /dev/random
1680
1681 def test_getrandom_nonblock(self):
1682 # The call must not fail. Check also that the flag exists
1683 try:
1684 os.getrandom(1, os.GRND_NONBLOCK)
1685 except BlockingIOError:
1686 # System urandom is not initialized yet
1687 pass
1688
1689 def test_getrandom_value(self):
1690 data1 = os.getrandom(16)
1691 data2 = os.getrandom(16)
1692 self.assertNotEqual(data1, data2)
1693
1694
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001695# os.urandom() doesn't use a file descriptor when it is implemented with the
1696# getentropy() function, the getrandom() function or the getrandom() syscall
1697OS_URANDOM_DONT_USE_FD = (
1698 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1699 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1700 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001701
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001702@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1703 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001704@unittest.skipIf(sys.platform == "vxworks",
1705 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001706class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001707 @unittest.skipUnless(resource, "test requires the resource module")
1708 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001709 # Check urandom() failing when it is not able to open /dev/random.
1710 # We spawn a new process to make the test more robust (if getrlimit()
1711 # failed to restore the file descriptor limit after this, the whole
1712 # test suite would crash; this actually happened on the OS X Tiger
1713 # buildbot).
1714 code = """if 1:
1715 import errno
1716 import os
1717 import resource
1718
1719 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1720 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1721 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001722 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001723 except OSError as e:
1724 assert e.errno == errno.EMFILE, e.errno
1725 else:
1726 raise AssertionError("OSError not raised")
1727 """
1728 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001729
Antoine Pitroue472aea2014-04-26 14:33:03 +02001730 def test_urandom_fd_closed(self):
1731 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1732 # closed.
1733 code = """if 1:
1734 import os
1735 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001736 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001737 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001738 with test.support.SuppressCrashReport():
1739 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001740 sys.stdout.buffer.write(os.urandom(4))
1741 """
1742 rc, out, err = assert_python_ok('-Sc', code)
1743
1744 def test_urandom_fd_reopened(self):
1745 # Issue #21207: urandom() should detect its fd to /dev/urandom
1746 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001747 self.addCleanup(support.unlink, support.TESTFN)
1748 create_file(support.TESTFN, b"x" * 256)
1749
Antoine Pitroue472aea2014-04-26 14:33:03 +02001750 code = """if 1:
1751 import os
1752 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001753 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001754 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001755 with test.support.SuppressCrashReport():
1756 for fd in range(3, 256):
1757 try:
1758 os.close(fd)
1759 except OSError:
1760 pass
1761 else:
1762 # Found the urandom fd (XXX hopefully)
1763 break
1764 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001765 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001766 new_fd = f.fileno()
1767 # Issue #26935: posix allows new_fd and fd to be equal but
1768 # some libc implementations have dup2 return an error in this
1769 # case.
1770 if new_fd != fd:
1771 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001772 sys.stdout.buffer.write(os.urandom(4))
1773 sys.stdout.buffer.write(os.urandom(4))
1774 """.format(TESTFN=support.TESTFN)
1775 rc, out, err = assert_python_ok('-Sc', code)
1776 self.assertEqual(len(out), 8)
1777 self.assertNotEqual(out[0:4], out[4:8])
1778 rc, out2, err2 = assert_python_ok('-Sc', code)
1779 self.assertEqual(len(out2), 8)
1780 self.assertNotEqual(out2, out)
1781
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001782
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001783@contextlib.contextmanager
1784def _execvpe_mockup(defpath=None):
1785 """
1786 Stubs out execv and execve functions when used as context manager.
1787 Records exec calls. The mock execv and execve functions always raise an
1788 exception as they would normally never return.
1789 """
1790 # A list of tuples containing (function name, first arg, args)
1791 # of calls to execv or execve that have been made.
1792 calls = []
1793
1794 def mock_execv(name, *args):
1795 calls.append(('execv', name, args))
1796 raise RuntimeError("execv called")
1797
1798 def mock_execve(name, *args):
1799 calls.append(('execve', name, args))
1800 raise OSError(errno.ENOTDIR, "execve called")
1801
1802 try:
1803 orig_execv = os.execv
1804 orig_execve = os.execve
1805 orig_defpath = os.defpath
1806 os.execv = mock_execv
1807 os.execve = mock_execve
1808 if defpath is not None:
1809 os.defpath = defpath
1810 yield calls
1811 finally:
1812 os.execv = orig_execv
1813 os.execve = orig_execve
1814 os.defpath = orig_defpath
1815
pxinwrf2d7ac72019-05-21 18:46:37 +08001816@unittest.skipUnless(hasattr(os, 'execv'),
1817 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001818class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001819 @unittest.skipIf(USING_LINUXTHREADS,
1820 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001821 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001822 self.assertRaises(OSError, os.execvpe, 'no such app-',
1823 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001824
Steve Dowerbce26262016-11-19 19:17:26 -08001825 def test_execv_with_bad_arglist(self):
1826 self.assertRaises(ValueError, os.execv, 'notepad', ())
1827 self.assertRaises(ValueError, os.execv, 'notepad', [])
1828 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1829 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1830
Thomas Heller6790d602007-08-30 17:15:14 +00001831 def test_execvpe_with_bad_arglist(self):
1832 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001833 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1834 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001835
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001836 @unittest.skipUnless(hasattr(os, '_execvpe'),
1837 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001838 def _test_internal_execvpe(self, test_type):
1839 program_path = os.sep + 'absolutepath'
1840 if test_type is bytes:
1841 program = b'executable'
1842 fullpath = os.path.join(os.fsencode(program_path), program)
1843 native_fullpath = fullpath
1844 arguments = [b'progname', 'arg1', 'arg2']
1845 else:
1846 program = 'executable'
1847 arguments = ['progname', 'arg1', 'arg2']
1848 fullpath = os.path.join(program_path, program)
1849 if os.name != "nt":
1850 native_fullpath = os.fsencode(fullpath)
1851 else:
1852 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001853 env = {'spam': 'beans'}
1854
Victor Stinnerb745a742010-05-18 17:17:23 +00001855 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001856 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001857 self.assertRaises(RuntimeError,
1858 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001859 self.assertEqual(len(calls), 1)
1860 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1861
Victor Stinnerb745a742010-05-18 17:17:23 +00001862 # test os._execvpe() with a relative path:
1863 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001864 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001865 self.assertRaises(OSError,
1866 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001867 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001868 self.assertSequenceEqual(calls[0],
1869 ('execve', native_fullpath, (arguments, env)))
1870
1871 # test os._execvpe() with a relative path:
1872 # os.get_exec_path() reads the 'PATH' variable
1873 with _execvpe_mockup() as calls:
1874 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001875 if test_type is bytes:
1876 env_path[b'PATH'] = program_path
1877 else:
1878 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001879 self.assertRaises(OSError,
1880 os._execvpe, program, arguments, env=env_path)
1881 self.assertEqual(len(calls), 1)
1882 self.assertSequenceEqual(calls[0],
1883 ('execve', native_fullpath, (arguments, env_path)))
1884
1885 def test_internal_execvpe_str(self):
1886 self._test_internal_execvpe(str)
1887 if os.name != "nt":
1888 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001889
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001890 def test_execve_invalid_env(self):
1891 args = [sys.executable, '-c', 'pass']
1892
Ville Skyttä49b27342017-08-03 09:00:59 +03001893 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001894 newenv = os.environ.copy()
1895 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1896 with self.assertRaises(ValueError):
1897 os.execve(args[0], args, newenv)
1898
Ville Skyttä49b27342017-08-03 09:00:59 +03001899 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001900 newenv = os.environ.copy()
1901 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1902 with self.assertRaises(ValueError):
1903 os.execve(args[0], args, newenv)
1904
Ville Skyttä49b27342017-08-03 09:00:59 +03001905 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001906 newenv = os.environ.copy()
1907 newenv["FRUIT=ORANGE"] = "lemon"
1908 with self.assertRaises(ValueError):
1909 os.execve(args[0], args, newenv)
1910
Alexey Izbyshev83460312018-10-20 03:28:22 +03001911 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1912 def test_execve_with_empty_path(self):
1913 # bpo-32890: Check GetLastError() misuse
1914 try:
1915 os.execve('', ['arg'], {})
1916 except OSError as e:
1917 self.assertTrue(e.winerror is None or e.winerror != 0)
1918 else:
1919 self.fail('No OSError raised')
1920
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001921
Serhiy Storchaka43767632013-11-03 21:31:38 +02001922@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001923class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001924 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001925 try:
1926 os.stat(support.TESTFN)
1927 except FileNotFoundError:
1928 exists = False
1929 except OSError as exc:
1930 exists = True
1931 self.fail("file %s must not exist; os.stat failed with %s"
1932 % (support.TESTFN, exc))
1933 else:
1934 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001935
Thomas Wouters477c8d52006-05-27 19:21:47 +00001936 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001937 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001938
1939 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001940 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001941
1942 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001943 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001944
1945 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001946 self.addCleanup(support.unlink, support.TESTFN)
1947
Victor Stinnere77c9742016-03-25 10:28:23 +01001948 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001949 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001950
1951 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001952 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001953
Thomas Wouters477c8d52006-05-27 19:21:47 +00001954 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001955 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001956
Victor Stinnere77c9742016-03-25 10:28:23 +01001957
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001958class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001959 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001960 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1961 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001962 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001963 def get_single(f):
1964 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001965 if hasattr(os, f):
1966 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001967 return helper
1968 for f in singles:
1969 locals()["test_"+f] = get_single(f)
1970
Benjamin Peterson7522c742009-01-19 21:00:09 +00001971 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001972 try:
1973 f(support.make_bad_fd(), *args)
1974 except OSError as e:
1975 self.assertEqual(e.errno, errno.EBADF)
1976 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001977 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001978 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001979
Serhiy Storchaka43767632013-11-03 21:31:38 +02001980 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001981 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001982 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001983
Serhiy Storchaka43767632013-11-03 21:31:38 +02001984 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001985 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001986 fd = support.make_bad_fd()
1987 # Make sure none of the descriptors we are about to close are
1988 # currently valid (issue 6542).
1989 for i in range(10):
1990 try: os.fstat(fd+i)
1991 except OSError:
1992 pass
1993 else:
1994 break
1995 if i < 2:
1996 raise unittest.SkipTest(
1997 "Unable to acquire a range of invalid file descriptors")
1998 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001999
Serhiy Storchaka43767632013-11-03 21:31:38 +02002000 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002001 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002002 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002003
Serhiy Storchaka43767632013-11-03 21:31:38 +02002004 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002005 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002007
Serhiy Storchaka43767632013-11-03 21:31:38 +02002008 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002009 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002010 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002011
Serhiy Storchaka43767632013-11-03 21:31:38 +02002012 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002013 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002014 self.check(os.pathconf, "PC_NAME_MAX")
2015 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002016
Serhiy Storchaka43767632013-11-03 21:31:38 +02002017 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002018 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002019 self.check(os.truncate, 0)
2020 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002021
Serhiy Storchaka43767632013-11-03 21:31:38 +02002022 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002023 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002024 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002025
Serhiy Storchaka43767632013-11-03 21:31:38 +02002026 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002027 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002028 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002029
Victor Stinner57ddf782014-01-08 15:21:28 +01002030 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2031 def test_readv(self):
2032 buf = bytearray(10)
2033 self.check(os.readv, [buf])
2034
Serhiy Storchaka43767632013-11-03 21:31:38 +02002035 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002036 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002037 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002038
Serhiy Storchaka43767632013-11-03 21:31:38 +02002039 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002040 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002041 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002042
Victor Stinner57ddf782014-01-08 15:21:28 +01002043 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2044 def test_writev(self):
2045 self.check(os.writev, [b'abc'])
2046
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002047 def test_inheritable(self):
2048 self.check(os.get_inheritable)
2049 self.check(os.set_inheritable, True)
2050
2051 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2052 'needs os.get_blocking() and os.set_blocking()')
2053 def test_blocking(self):
2054 self.check(os.get_blocking)
2055 self.check(os.set_blocking, True)
2056
Brian Curtin1b9df392010-11-24 20:24:31 +00002057
2058class LinkTests(unittest.TestCase):
2059 def setUp(self):
2060 self.file1 = support.TESTFN
2061 self.file2 = os.path.join(support.TESTFN + "2")
2062
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002063 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002064 for file in (self.file1, self.file2):
2065 if os.path.exists(file):
2066 os.unlink(file)
2067
Brian Curtin1b9df392010-11-24 20:24:31 +00002068 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002069 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002070
xdegaye6a55d092017-11-12 17:57:04 +01002071 try:
2072 os.link(file1, file2)
2073 except PermissionError as e:
2074 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002075 with open(file1, "r") as f1, open(file2, "r") as f2:
2076 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2077
2078 def test_link(self):
2079 self._test_link(self.file1, self.file2)
2080
2081 def test_link_bytes(self):
2082 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2083 bytes(self.file2, sys.getfilesystemencoding()))
2084
Brian Curtinf498b752010-11-30 15:54:04 +00002085 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002086 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002087 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002088 except UnicodeError:
2089 raise unittest.SkipTest("Unable to encode for this platform.")
2090
Brian Curtinf498b752010-11-30 15:54:04 +00002091 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002092 self.file2 = self.file1 + "2"
2093 self._test_link(self.file1, self.file2)
2094
Serhiy Storchaka43767632013-11-03 21:31:38 +02002095@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2096class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002097 # uid_t and gid_t are 32-bit unsigned integers on Linux
2098 UID_OVERFLOW = (1 << 32)
2099 GID_OVERFLOW = (1 << 32)
2100
Serhiy Storchaka43767632013-11-03 21:31:38 +02002101 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2102 def test_setuid(self):
2103 if os.getuid() != 0:
2104 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002105 self.assertRaises(TypeError, os.setuid, 'not an int')
2106 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002107
Serhiy Storchaka43767632013-11-03 21:31:38 +02002108 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2109 def test_setgid(self):
2110 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2111 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002112 self.assertRaises(TypeError, os.setgid, 'not an int')
2113 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002114
Serhiy Storchaka43767632013-11-03 21:31:38 +02002115 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2116 def test_seteuid(self):
2117 if os.getuid() != 0:
2118 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002119 self.assertRaises(TypeError, os.setegid, 'not an int')
2120 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002121
Serhiy Storchaka43767632013-11-03 21:31:38 +02002122 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2123 def test_setegid(self):
2124 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2125 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002126 self.assertRaises(TypeError, os.setegid, 'not an int')
2127 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002128
Serhiy Storchaka43767632013-11-03 21:31:38 +02002129 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2130 def test_setreuid(self):
2131 if os.getuid() != 0:
2132 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002133 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2134 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2135 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2136 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002137
Serhiy Storchaka43767632013-11-03 21:31:38 +02002138 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2139 def test_setreuid_neg1(self):
2140 # Needs to accept -1. We run this in a subprocess to avoid
2141 # altering the test runner's process state (issue8045).
2142 subprocess.check_call([
2143 sys.executable, '-c',
2144 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002145
Serhiy Storchaka43767632013-11-03 21:31:38 +02002146 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2147 def test_setregid(self):
2148 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2149 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002150 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2151 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2152 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2153 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002154
Serhiy Storchaka43767632013-11-03 21:31:38 +02002155 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2156 def test_setregid_neg1(self):
2157 # Needs to accept -1. We run this in a subprocess to avoid
2158 # altering the test runner's process state (issue8045).
2159 subprocess.check_call([
2160 sys.executable, '-c',
2161 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002162
Serhiy Storchaka43767632013-11-03 21:31:38 +02002163@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2164class Pep383Tests(unittest.TestCase):
2165 def setUp(self):
2166 if support.TESTFN_UNENCODABLE:
2167 self.dir = support.TESTFN_UNENCODABLE
2168 elif support.TESTFN_NONASCII:
2169 self.dir = support.TESTFN_NONASCII
2170 else:
2171 self.dir = support.TESTFN
2172 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002173
Serhiy Storchaka43767632013-11-03 21:31:38 +02002174 bytesfn = []
2175 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002176 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002177 fn = os.fsencode(fn)
2178 except UnicodeEncodeError:
2179 return
2180 bytesfn.append(fn)
2181 add_filename(support.TESTFN_UNICODE)
2182 if support.TESTFN_UNENCODABLE:
2183 add_filename(support.TESTFN_UNENCODABLE)
2184 if support.TESTFN_NONASCII:
2185 add_filename(support.TESTFN_NONASCII)
2186 if not bytesfn:
2187 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002188
Serhiy Storchaka43767632013-11-03 21:31:38 +02002189 self.unicodefn = set()
2190 os.mkdir(self.dir)
2191 try:
2192 for fn in bytesfn:
2193 support.create_empty_file(os.path.join(self.bdir, fn))
2194 fn = os.fsdecode(fn)
2195 if fn in self.unicodefn:
2196 raise ValueError("duplicate filename")
2197 self.unicodefn.add(fn)
2198 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002199 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002200 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002201
Serhiy Storchaka43767632013-11-03 21:31:38 +02002202 def tearDown(self):
2203 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002204
Serhiy Storchaka43767632013-11-03 21:31:38 +02002205 def test_listdir(self):
2206 expected = self.unicodefn
2207 found = set(os.listdir(self.dir))
2208 self.assertEqual(found, expected)
2209 # test listdir without arguments
2210 current_directory = os.getcwd()
2211 try:
2212 os.chdir(os.sep)
2213 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2214 finally:
2215 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002216
Serhiy Storchaka43767632013-11-03 21:31:38 +02002217 def test_open(self):
2218 for fn in self.unicodefn:
2219 f = open(os.path.join(self.dir, fn), 'rb')
2220 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002221
Serhiy Storchaka43767632013-11-03 21:31:38 +02002222 @unittest.skipUnless(hasattr(os, 'statvfs'),
2223 "need os.statvfs()")
2224 def test_statvfs(self):
2225 # issue #9645
2226 for fn in self.unicodefn:
2227 # should not fail with file not found error
2228 fullname = os.path.join(self.dir, fn)
2229 os.statvfs(fullname)
2230
2231 def test_stat(self):
2232 for fn in self.unicodefn:
2233 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002234
Brian Curtineb24d742010-04-12 17:16:38 +00002235@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2236class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002237 def _kill(self, sig):
2238 # Start sys.executable as a subprocess and communicate from the
2239 # subprocess to the parent that the interpreter is ready. When it
2240 # becomes ready, send *sig* via os.kill to the subprocess and check
2241 # that the return code is equal to *sig*.
2242 import ctypes
2243 from ctypes import wintypes
2244 import msvcrt
2245
2246 # Since we can't access the contents of the process' stdout until the
2247 # process has exited, use PeekNamedPipe to see what's inside stdout
2248 # without waiting. This is done so we can tell that the interpreter
2249 # is started and running at a point where it could handle a signal.
2250 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2251 PeekNamedPipe.restype = wintypes.BOOL
2252 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2253 ctypes.POINTER(ctypes.c_char), # stdout buf
2254 wintypes.DWORD, # Buffer size
2255 ctypes.POINTER(wintypes.DWORD), # bytes read
2256 ctypes.POINTER(wintypes.DWORD), # bytes avail
2257 ctypes.POINTER(wintypes.DWORD)) # bytes left
2258 msg = "running"
2259 proc = subprocess.Popen([sys.executable, "-c",
2260 "import sys;"
2261 "sys.stdout.write('{}');"
2262 "sys.stdout.flush();"
2263 "input()".format(msg)],
2264 stdout=subprocess.PIPE,
2265 stderr=subprocess.PIPE,
2266 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002267 self.addCleanup(proc.stdout.close)
2268 self.addCleanup(proc.stderr.close)
2269 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002270
2271 count, max = 0, 100
2272 while count < max and proc.poll() is None:
2273 # Create a string buffer to store the result of stdout from the pipe
2274 buf = ctypes.create_string_buffer(len(msg))
2275 # Obtain the text currently in proc.stdout
2276 # Bytes read/avail/left are left as NULL and unused
2277 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2278 buf, ctypes.sizeof(buf), None, None, None)
2279 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2280 if buf.value:
2281 self.assertEqual(msg, buf.value.decode())
2282 break
2283 time.sleep(0.1)
2284 count += 1
2285 else:
2286 self.fail("Did not receive communication from the subprocess")
2287
Brian Curtineb24d742010-04-12 17:16:38 +00002288 os.kill(proc.pid, sig)
2289 self.assertEqual(proc.wait(), sig)
2290
2291 def test_kill_sigterm(self):
2292 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002293 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002294
2295 def test_kill_int(self):
2296 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002297 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002298
2299 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002300 tagname = "test_os_%s" % uuid.uuid1()
2301 m = mmap.mmap(-1, 1, tagname)
2302 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002303 # Run a script which has console control handling enabled.
2304 proc = subprocess.Popen([sys.executable,
2305 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002306 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002307 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2308 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002309 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002310 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002311 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002312 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002313 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002314 count += 1
2315 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002316 # Forcefully kill the process if we weren't able to signal it.
2317 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002318 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002319 os.kill(proc.pid, event)
2320 # proc.send_signal(event) could also be done here.
2321 # Allow time for the signal to be passed and the process to exit.
2322 time.sleep(0.5)
2323 if not proc.poll():
2324 # Forcefully kill the process if we weren't able to signal it.
2325 os.kill(proc.pid, signal.SIGINT)
2326 self.fail("subprocess did not stop on {}".format(name))
2327
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002328 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002329 def test_CTRL_C_EVENT(self):
2330 from ctypes import wintypes
2331 import ctypes
2332
2333 # Make a NULL value by creating a pointer with no argument.
2334 NULL = ctypes.POINTER(ctypes.c_int)()
2335 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2336 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2337 wintypes.BOOL)
2338 SetConsoleCtrlHandler.restype = wintypes.BOOL
2339
2340 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002341 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002342 # by subprocesses.
2343 SetConsoleCtrlHandler(NULL, 0)
2344
2345 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2346
2347 def test_CTRL_BREAK_EVENT(self):
2348 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2349
2350
Brian Curtind40e6f72010-07-08 21:39:08 +00002351@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002352class Win32ListdirTests(unittest.TestCase):
2353 """Test listdir on Windows."""
2354
2355 def setUp(self):
2356 self.created_paths = []
2357 for i in range(2):
2358 dir_name = 'SUB%d' % i
2359 dir_path = os.path.join(support.TESTFN, dir_name)
2360 file_name = 'FILE%d' % i
2361 file_path = os.path.join(support.TESTFN, file_name)
2362 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002363 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002364 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2365 self.created_paths.extend([dir_name, file_name])
2366 self.created_paths.sort()
2367
2368 def tearDown(self):
2369 shutil.rmtree(support.TESTFN)
2370
2371 def test_listdir_no_extended_path(self):
2372 """Test when the path is not an "extended" path."""
2373 # unicode
2374 self.assertEqual(
2375 sorted(os.listdir(support.TESTFN)),
2376 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002377
Tim Golden781bbeb2013-10-25 20:24:06 +01002378 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002379 self.assertEqual(
2380 sorted(os.listdir(os.fsencode(support.TESTFN))),
2381 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002382
2383 def test_listdir_extended_path(self):
2384 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002385 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002386 # unicode
2387 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2388 self.assertEqual(
2389 sorted(os.listdir(path)),
2390 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002391
Tim Golden781bbeb2013-10-25 20:24:06 +01002392 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002393 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2394 self.assertEqual(
2395 sorted(os.listdir(path)),
2396 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002397
2398
Berker Peksage0b5b202018-08-15 13:03:41 +03002399@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2400class ReadlinkTests(unittest.TestCase):
2401 filelink = 'readlinktest'
2402 filelink_target = os.path.abspath(__file__)
2403 filelinkb = os.fsencode(filelink)
2404 filelinkb_target = os.fsencode(filelink_target)
2405
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002406 def assertPathEqual(self, left, right):
2407 left = os.path.normcase(left)
2408 right = os.path.normcase(right)
2409 if sys.platform == 'win32':
2410 # Bad practice to blindly strip the prefix as it may be required to
2411 # correctly refer to the file, but we're only comparing paths here.
2412 has_prefix = lambda p: p.startswith(
2413 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2414 if has_prefix(left):
2415 left = left[4:]
2416 if has_prefix(right):
2417 right = right[4:]
2418 self.assertEqual(left, right)
2419
Berker Peksage0b5b202018-08-15 13:03:41 +03002420 def setUp(self):
2421 self.assertTrue(os.path.exists(self.filelink_target))
2422 self.assertTrue(os.path.exists(self.filelinkb_target))
2423 self.assertFalse(os.path.exists(self.filelink))
2424 self.assertFalse(os.path.exists(self.filelinkb))
2425
2426 def test_not_symlink(self):
2427 filelink_target = FakePath(self.filelink_target)
2428 self.assertRaises(OSError, os.readlink, self.filelink_target)
2429 self.assertRaises(OSError, os.readlink, filelink_target)
2430
2431 def test_missing_link(self):
2432 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2433 self.assertRaises(FileNotFoundError, os.readlink,
2434 FakePath('missing-link'))
2435
2436 @support.skip_unless_symlink
2437 def test_pathlike(self):
2438 os.symlink(self.filelink_target, self.filelink)
2439 self.addCleanup(support.unlink, self.filelink)
2440 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002441 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002442
2443 @support.skip_unless_symlink
2444 def test_pathlike_bytes(self):
2445 os.symlink(self.filelinkb_target, self.filelinkb)
2446 self.addCleanup(support.unlink, self.filelinkb)
2447 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002448 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002449 self.assertIsInstance(path, bytes)
2450
2451 @support.skip_unless_symlink
2452 def test_bytes(self):
2453 os.symlink(self.filelinkb_target, self.filelinkb)
2454 self.addCleanup(support.unlink, self.filelinkb)
2455 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002456 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002457 self.assertIsInstance(path, bytes)
2458
2459
Tim Golden781bbeb2013-10-25 20:24:06 +01002460@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002461@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002462class Win32SymlinkTests(unittest.TestCase):
2463 filelink = 'filelinktest'
2464 filelink_target = os.path.abspath(__file__)
2465 dirlink = 'dirlinktest'
2466 dirlink_target = os.path.dirname(filelink_target)
2467 missing_link = 'missing link'
2468
2469 def setUp(self):
2470 assert os.path.exists(self.dirlink_target)
2471 assert os.path.exists(self.filelink_target)
2472 assert not os.path.exists(self.dirlink)
2473 assert not os.path.exists(self.filelink)
2474 assert not os.path.exists(self.missing_link)
2475
2476 def tearDown(self):
2477 if os.path.exists(self.filelink):
2478 os.remove(self.filelink)
2479 if os.path.exists(self.dirlink):
2480 os.rmdir(self.dirlink)
2481 if os.path.lexists(self.missing_link):
2482 os.remove(self.missing_link)
2483
2484 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002485 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002486 self.assertTrue(os.path.exists(self.dirlink))
2487 self.assertTrue(os.path.isdir(self.dirlink))
2488 self.assertTrue(os.path.islink(self.dirlink))
2489 self.check_stat(self.dirlink, self.dirlink_target)
2490
2491 def test_file_link(self):
2492 os.symlink(self.filelink_target, self.filelink)
2493 self.assertTrue(os.path.exists(self.filelink))
2494 self.assertTrue(os.path.isfile(self.filelink))
2495 self.assertTrue(os.path.islink(self.filelink))
2496 self.check_stat(self.filelink, self.filelink_target)
2497
2498 def _create_missing_dir_link(self):
2499 'Create a "directory" link to a non-existent target'
2500 linkname = self.missing_link
2501 if os.path.lexists(linkname):
2502 os.remove(linkname)
2503 target = r'c:\\target does not exist.29r3c740'
2504 assert not os.path.exists(target)
2505 target_is_dir = True
2506 os.symlink(target, linkname, target_is_dir)
2507
2508 def test_remove_directory_link_to_missing_target(self):
2509 self._create_missing_dir_link()
2510 # For compatibility with Unix, os.remove will check the
2511 # directory status and call RemoveDirectory if the symlink
2512 # was created with target_is_dir==True.
2513 os.remove(self.missing_link)
2514
Brian Curtind40e6f72010-07-08 21:39:08 +00002515 def test_isdir_on_directory_link_to_missing_target(self):
2516 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002517 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002518
Brian Curtind40e6f72010-07-08 21:39:08 +00002519 def test_rmdir_on_directory_link_to_missing_target(self):
2520 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002521 os.rmdir(self.missing_link)
2522
2523 def check_stat(self, link, target):
2524 self.assertEqual(os.stat(link), os.stat(target))
2525 self.assertNotEqual(os.lstat(link), os.stat(link))
2526
Brian Curtind25aef52011-06-13 15:16:04 -05002527 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002528 self.assertEqual(os.stat(bytes_link), os.stat(target))
2529 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002530
2531 def test_12084(self):
2532 level1 = os.path.abspath(support.TESTFN)
2533 level2 = os.path.join(level1, "level2")
2534 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002535 self.addCleanup(support.rmtree, level1)
2536
2537 os.mkdir(level1)
2538 os.mkdir(level2)
2539 os.mkdir(level3)
2540
2541 file1 = os.path.abspath(os.path.join(level1, "file1"))
2542 create_file(file1)
2543
2544 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002545 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002546 os.chdir(level2)
2547 link = os.path.join(level2, "link")
2548 os.symlink(os.path.relpath(file1), "link")
2549 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002550
Victor Stinnerae39d232016-03-24 17:12:55 +01002551 # Check os.stat calls from the same dir as the link
2552 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002553
Victor Stinnerae39d232016-03-24 17:12:55 +01002554 # Check os.stat calls from a dir below the link
2555 os.chdir(level1)
2556 self.assertEqual(os.stat(file1),
2557 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002558
Victor Stinnerae39d232016-03-24 17:12:55 +01002559 # Check os.stat calls from a dir above the link
2560 os.chdir(level3)
2561 self.assertEqual(os.stat(file1),
2562 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002563 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002564 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002565
SSE43c34aad2018-02-13 00:10:35 +07002566 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2567 and os.path.exists(r'C:\ProgramData'),
2568 'Test directories not found')
2569 def test_29248(self):
2570 # os.symlink() calls CreateSymbolicLink, which creates
2571 # the reparse data buffer with the print name stored
2572 # first, so the offset is always 0. CreateSymbolicLink
2573 # stores the "PrintName" DOS path (e.g. "C:\") first,
2574 # with an offset of 0, followed by the "SubstituteName"
2575 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2576 # the other hand, seems to have been created manually
2577 # with an inverted order.
2578 target = os.readlink(r'C:\Users\All Users')
2579 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2580
Steve Dower6921e732018-03-05 14:26:08 -08002581 def test_buffer_overflow(self):
2582 # Older versions would have a buffer overflow when detecting
2583 # whether a link source was a directory. This test ensures we
2584 # no longer crash, but does not otherwise validate the behavior
2585 segment = 'X' * 27
2586 path = os.path.join(*[segment] * 10)
2587 test_cases = [
2588 # overflow with absolute src
2589 ('\\' + path, segment),
2590 # overflow dest with relative src
2591 (segment, path),
2592 # overflow when joining src
2593 (path[:180], path[:180]),
2594 ]
2595 for src, dest in test_cases:
2596 try:
2597 os.symlink(src, dest)
2598 except FileNotFoundError:
2599 pass
2600 else:
2601 try:
2602 os.remove(dest)
2603 except OSError:
2604 pass
2605 # Also test with bytes, since that is a separate code path.
2606 try:
2607 os.symlink(os.fsencode(src), os.fsencode(dest))
2608 except FileNotFoundError:
2609 pass
2610 else:
2611 try:
2612 os.remove(dest)
2613 except OSError:
2614 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002615
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002616 def test_appexeclink(self):
2617 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002618 if not os.path.isdir(root):
2619 self.skipTest("test requires a WindowsApps directory")
2620
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002621 aliases = [os.path.join(root, a)
2622 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2623
2624 for alias in aliases:
2625 if support.verbose:
2626 print()
2627 print("Testing with", alias)
2628 st = os.lstat(alias)
2629 self.assertEqual(st, os.stat(alias))
2630 self.assertFalse(stat.S_ISLNK(st.st_mode))
2631 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2632 # testing the first one we see is sufficient
2633 break
2634 else:
2635 self.skipTest("test requires an app execution alias")
2636
Tim Golden0321cf22014-05-05 19:46:17 +01002637@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2638class Win32JunctionTests(unittest.TestCase):
2639 junction = 'junctiontest'
2640 junction_target = os.path.dirname(os.path.abspath(__file__))
2641
2642 def setUp(self):
2643 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002644 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002645
2646 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002647 if os.path.lexists(self.junction):
2648 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002649
2650 def test_create_junction(self):
2651 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002652 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002653 self.assertTrue(os.path.exists(self.junction))
2654 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002655 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2656 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002657
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002658 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002659 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002660 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2661 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002662
2663 def test_unlink_removes_junction(self):
2664 _winapi.CreateJunction(self.junction_target, self.junction)
2665 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002666 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002667
2668 os.unlink(self.junction)
2669 self.assertFalse(os.path.exists(self.junction))
2670
Mark Becwarb82bfac2019-02-02 16:08:23 -05002671@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2672class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002673 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002674 nt = support.import_module('nt')
2675 ctypes = support.import_module('ctypes')
2676 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002677
2678 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2679 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2680
2681 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2682 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2683 ctypes.wintypes.LPDWORD)
2684
2685 # This is a pseudo-handle that doesn't need to be closed
2686 hproc = kernel.GetCurrentProcess()
2687
2688 handle_count = ctypes.wintypes.DWORD()
2689 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2690 self.assertEqual(1, ok)
2691
2692 before_count = handle_count.value
2693
2694 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002695 filenames = [
2696 r'\\?\C:',
2697 r'\\?\NUL',
2698 r'\\?\CONIN',
2699 __file__,
2700 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002701
Berker Peksag6ef726a2019-04-22 18:46:28 +03002702 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002703 for name in filenames:
2704 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002705 nt._getfinalpathname(name)
2706 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002707 # Failure is expected
2708 pass
2709 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002710 os.stat(name)
2711 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002712 pass
2713
2714 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2715 self.assertEqual(1, ok)
2716
2717 handle_delta = handle_count.value - before_count
2718
2719 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002720
Jason R. Coombs3a092862013-05-27 23:21:28 -04002721@support.skip_unless_symlink
2722class NonLocalSymlinkTests(unittest.TestCase):
2723
2724 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002725 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002726 Create this structure:
2727
2728 base
2729 \___ some_dir
2730 """
2731 os.makedirs('base/some_dir')
2732
2733 def tearDown(self):
2734 shutil.rmtree('base')
2735
2736 def test_directory_link_nonlocal(self):
2737 """
2738 The symlink target should resolve relative to the link, not relative
2739 to the current directory.
2740
2741 Then, link base/some_link -> base/some_dir and ensure that some_link
2742 is resolved as a directory.
2743
2744 In issue13772, it was discovered that directory detection failed if
2745 the symlink target was not specified relative to the current
2746 directory, which was a defect in the implementation.
2747 """
2748 src = os.path.join('base', 'some_link')
2749 os.symlink('some_dir', src)
2750 assert os.path.isdir(src)
2751
2752
Victor Stinnere8d51452010-08-19 01:05:19 +00002753class FSEncodingTests(unittest.TestCase):
2754 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002755 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2756 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002757
Victor Stinnere8d51452010-08-19 01:05:19 +00002758 def test_identity(self):
2759 # assert fsdecode(fsencode(x)) == x
2760 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2761 try:
2762 bytesfn = os.fsencode(fn)
2763 except UnicodeEncodeError:
2764 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002765 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002766
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002767
Brett Cannonefb00c02012-02-29 18:31:31 -05002768
2769class DeviceEncodingTests(unittest.TestCase):
2770
2771 def test_bad_fd(self):
2772 # Return None when an fd doesn't actually exist.
2773 self.assertIsNone(os.device_encoding(123456))
2774
Paul Monson62dfd7d2019-04-25 11:36:45 -07002775 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002776 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002777 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002778 def test_device_encoding(self):
2779 encoding = os.device_encoding(0)
2780 self.assertIsNotNone(encoding)
2781 self.assertTrue(codecs.lookup(encoding))
2782
2783
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002784class PidTests(unittest.TestCase):
2785 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2786 def test_getppid(self):
2787 p = subprocess.Popen([sys.executable, '-c',
2788 'import os; print(os.getppid())'],
2789 stdout=subprocess.PIPE)
2790 stdout, _ = p.communicate()
2791 # We are the parent of our subprocess
2792 self.assertEqual(int(stdout), os.getpid())
2793
Victor Stinner9bee32b2020-04-22 16:30:35 +02002794 def check_waitpid(self, code, exitcode, callback=None):
2795 if sys.platform == 'win32':
2796 # On Windows, os.spawnv() simply joins arguments with spaces:
2797 # arguments need to be quoted
2798 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2799 else:
2800 args = [sys.executable, '-c', code]
2801 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002802
Victor Stinner9bee32b2020-04-22 16:30:35 +02002803 if callback is not None:
2804 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002805
Victor Stinner9bee32b2020-04-22 16:30:35 +02002806 # don't use support.wait_process() to test directly os.waitpid()
2807 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002808 pid2, status = os.waitpid(pid, 0)
2809 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2810 self.assertEqual(pid2, pid)
2811
Victor Stinner9bee32b2020-04-22 16:30:35 +02002812 def test_waitpid(self):
2813 self.check_waitpid(code='pass', exitcode=0)
2814
2815 def test_waitstatus_to_exitcode(self):
2816 exitcode = 23
2817 code = f'import sys; sys.exit({exitcode})'
2818 self.check_waitpid(code, exitcode=exitcode)
2819
2820 with self.assertRaises(TypeError):
2821 os.waitstatus_to_exitcode(0.0)
2822
2823 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2824 def test_waitpid_windows(self):
2825 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2826 # with exit code larger than INT_MAX.
2827 STATUS_CONTROL_C_EXIT = 0xC000013A
2828 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2829 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2830
2831 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2832 def test_waitstatus_to_exitcode_windows(self):
2833 max_exitcode = 2 ** 32 - 1
2834 for exitcode in (0, 1, 5, max_exitcode):
2835 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2836 exitcode)
2837
2838 # invalid values
2839 with self.assertRaises(ValueError):
2840 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2841 with self.assertRaises(OverflowError):
2842 os.waitstatus_to_exitcode(-1)
2843
Victor Stinner65a796e2020-04-01 18:49:29 +02002844 # Skip the test on Windows
2845 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2846 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002847 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002848 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002849
Victor Stinner9bee32b2020-04-22 16:30:35 +02002850 def kill_process(pid):
2851 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002852
Victor Stinner9bee32b2020-04-22 16:30:35 +02002853 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002854
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002855
Victor Stinner4659ccf2016-09-14 10:57:00 +02002856class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002857 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002858 self.exitcode = 17
2859
2860 filename = support.TESTFN
2861 self.addCleanup(support.unlink, filename)
2862
2863 if not with_env:
2864 code = 'import sys; sys.exit(%s)' % self.exitcode
2865 else:
2866 self.env = dict(os.environ)
2867 # create an unique key
2868 self.key = str(uuid.uuid4())
2869 self.env[self.key] = self.key
2870 # read the variable from os.environ to check that it exists
2871 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2872 % (self.key, self.exitcode))
2873
2874 with open(filename, "w") as fp:
2875 fp.write(code)
2876
Berker Peksag81816462016-09-15 20:19:47 +03002877 args = [sys.executable, filename]
2878 if use_bytes:
2879 args = [os.fsencode(a) for a in args]
2880 self.env = {os.fsencode(k): os.fsencode(v)
2881 for k, v in self.env.items()}
2882
2883 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002884
Berker Peksag4af23d72016-09-15 20:32:44 +03002885 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002886 def test_spawnl(self):
2887 args = self.create_args()
2888 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2889 self.assertEqual(exitcode, self.exitcode)
2890
Berker Peksag4af23d72016-09-15 20:32:44 +03002891 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002892 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002893 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002894 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2895 self.assertEqual(exitcode, self.exitcode)
2896
Berker Peksag4af23d72016-09-15 20:32:44 +03002897 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002898 def test_spawnlp(self):
2899 args = self.create_args()
2900 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2901 self.assertEqual(exitcode, self.exitcode)
2902
Berker Peksag4af23d72016-09-15 20:32:44 +03002903 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002904 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002905 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002906 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2907 self.assertEqual(exitcode, self.exitcode)
2908
Berker Peksag4af23d72016-09-15 20:32:44 +03002909 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002910 def test_spawnv(self):
2911 args = self.create_args()
2912 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2913 self.assertEqual(exitcode, self.exitcode)
2914
Victor Stinner9bee32b2020-04-22 16:30:35 +02002915 # Test for PyUnicode_FSConverter()
2916 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
2917 self.assertEqual(exitcode, self.exitcode)
2918
Berker Peksag4af23d72016-09-15 20:32:44 +03002919 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002920 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002921 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002922 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2923 self.assertEqual(exitcode, self.exitcode)
2924
Berker Peksag4af23d72016-09-15 20:32:44 +03002925 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002926 def test_spawnvp(self):
2927 args = self.create_args()
2928 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2929 self.assertEqual(exitcode, self.exitcode)
2930
Berker Peksag4af23d72016-09-15 20:32:44 +03002931 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002932 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002933 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002934 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2935 self.assertEqual(exitcode, self.exitcode)
2936
Berker Peksag4af23d72016-09-15 20:32:44 +03002937 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002938 def test_nowait(self):
2939 args = self.create_args()
2940 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02002941 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002942
Berker Peksag4af23d72016-09-15 20:32:44 +03002943 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002944 def test_spawnve_bytes(self):
2945 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2946 args = self.create_args(with_env=True, use_bytes=True)
2947 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2948 self.assertEqual(exitcode, self.exitcode)
2949
Steve Dower859fd7b2016-11-19 18:53:19 -08002950 @requires_os_func('spawnl')
2951 def test_spawnl_noargs(self):
2952 args = self.create_args()
2953 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002954 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002955
2956 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002957 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002958 args = self.create_args()
2959 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002960 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002961
2962 @requires_os_func('spawnv')
2963 def test_spawnv_noargs(self):
2964 args = self.create_args()
2965 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2966 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002967 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2968 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002969
2970 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002971 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002972 args = self.create_args()
2973 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2974 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002975 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2976 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002977
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002978 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002979 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002980
Ville Skyttä49b27342017-08-03 09:00:59 +03002981 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002982 newenv = os.environ.copy()
2983 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2984 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002985 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002986 except ValueError:
2987 pass
2988 else:
2989 self.assertEqual(exitcode, 127)
2990
Ville Skyttä49b27342017-08-03 09:00:59 +03002991 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002992 newenv = os.environ.copy()
2993 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2994 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002995 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002996 except ValueError:
2997 pass
2998 else:
2999 self.assertEqual(exitcode, 127)
3000
Ville Skyttä49b27342017-08-03 09:00:59 +03003001 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003002 newenv = os.environ.copy()
3003 newenv["FRUIT=ORANGE"] = "lemon"
3004 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003005 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003006 except ValueError:
3007 pass
3008 else:
3009 self.assertEqual(exitcode, 127)
3010
Ville Skyttä49b27342017-08-03 09:00:59 +03003011 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003012 filename = support.TESTFN
3013 self.addCleanup(support.unlink, filename)
3014 with open(filename, "w") as fp:
3015 fp.write('import sys, os\n'
3016 'if os.getenv("FRUIT") != "orange=lemon":\n'
3017 ' raise AssertionError')
3018 args = [sys.executable, filename]
3019 newenv = os.environ.copy()
3020 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003021 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003022 self.assertEqual(exitcode, 0)
3023
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003024 @requires_os_func('spawnve')
3025 def test_spawnve_invalid_env(self):
3026 self._test_invalid_env(os.spawnve)
3027
3028 @requires_os_func('spawnvpe')
3029 def test_spawnvpe_invalid_env(self):
3030 self._test_invalid_env(os.spawnvpe)
3031
Serhiy Storchaka77703942017-06-25 07:33:01 +03003032
Brian Curtin0151b8e2010-09-24 13:43:43 +00003033# The introduction of this TestCase caused at least two different errors on
3034# *nix buildbots. Temporarily skip this to let the buildbots move along.
3035@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003036@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3037class LoginTests(unittest.TestCase):
3038 def test_getlogin(self):
3039 user_name = os.getlogin()
3040 self.assertNotEqual(len(user_name), 0)
3041
3042
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003043@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3044 "needs os.getpriority and os.setpriority")
3045class ProgramPriorityTests(unittest.TestCase):
3046 """Tests for os.getpriority() and os.setpriority()."""
3047
3048 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003049
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003050 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3051 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3052 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003053 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3054 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003055 raise unittest.SkipTest("unable to reliably test setpriority "
3056 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003057 else:
3058 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003059 finally:
3060 try:
3061 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3062 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003063 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003064 raise
3065
3066
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003067class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003068
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003069 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003070
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003071 def __init__(self, conn):
3072 asynchat.async_chat.__init__(self, conn)
3073 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003074 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003075 self.closed = False
3076 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003077
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003078 def handle_read(self):
3079 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003080 if self.accumulate:
3081 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003082
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003083 def get_data(self):
3084 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003087 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003088 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003089
3090 def handle_error(self):
3091 raise
3092
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 def __init__(self, address):
3094 threading.Thread.__init__(self)
3095 asyncore.dispatcher.__init__(self)
3096 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3097 self.bind(address)
3098 self.listen(5)
3099 self.host, self.port = self.socket.getsockname()[:2]
3100 self.handler_instance = None
3101 self._active = False
3102 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003103
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003104 # --- public API
3105
3106 @property
3107 def running(self):
3108 return self._active
3109
3110 def start(self):
3111 assert not self.running
3112 self.__flag = threading.Event()
3113 threading.Thread.start(self)
3114 self.__flag.wait()
3115
3116 def stop(self):
3117 assert self.running
3118 self._active = False
3119 self.join()
3120
3121 def wait(self):
3122 # wait for handler connection to be closed, then stop the server
3123 while not getattr(self.handler_instance, "closed", False):
3124 time.sleep(0.001)
3125 self.stop()
3126
3127 # --- internals
3128
3129 def run(self):
3130 self._active = True
3131 self.__flag.set()
3132 while self._active and asyncore.socket_map:
3133 self._active_lock.acquire()
3134 asyncore.loop(timeout=0.001, count=1)
3135 self._active_lock.release()
3136 asyncore.close_all()
3137
3138 def handle_accept(self):
3139 conn, addr = self.accept()
3140 self.handler_instance = self.Handler(conn)
3141
3142 def handle_connect(self):
3143 self.close()
3144 handle_read = handle_connect
3145
3146 def writable(self):
3147 return 0
3148
3149 def handle_error(self):
3150 raise
3151
3152
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003153@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3154class TestSendfile(unittest.TestCase):
3155
Victor Stinner8c663fd2017-11-08 14:44:44 -08003156 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003157 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003158 not sys.platform.startswith("solaris") and \
3159 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003160 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3161 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003162 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3163 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003164
3165 @classmethod
3166 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003167 cls.key = threading_helper.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01003168 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003169
3170 @classmethod
3171 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003172 threading_helper.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003173 support.unlink(support.TESTFN)
3174
3175 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003176 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003177 self.server.start()
3178 self.client = socket.socket()
3179 self.client.connect((self.server.host, self.server.port))
3180 self.client.settimeout(1)
3181 # synchronize by waiting for "220 ready" response
3182 self.client.recv(1024)
3183 self.sockno = self.client.fileno()
3184 self.file = open(support.TESTFN, 'rb')
3185 self.fileno = self.file.fileno()
3186
3187 def tearDown(self):
3188 self.file.close()
3189 self.client.close()
3190 if self.server.running:
3191 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003192 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003193
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003194 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003195 """A higher level wrapper representing how an application is
3196 supposed to use sendfile().
3197 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003198 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003199 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003200 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003201 except OSError as err:
3202 if err.errno == errno.ECONNRESET:
3203 # disconnected
3204 raise
3205 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3206 # we have to retry send data
3207 continue
3208 else:
3209 raise
3210
3211 def test_send_whole_file(self):
3212 # normal send
3213 total_sent = 0
3214 offset = 0
3215 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003216 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003217 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3218 if sent == 0:
3219 break
3220 offset += sent
3221 total_sent += sent
3222 self.assertTrue(sent <= nbytes)
3223 self.assertEqual(offset, total_sent)
3224
3225 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003226 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003227 self.client.close()
3228 self.server.wait()
3229 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003230 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003231 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003232
3233 def test_send_at_certain_offset(self):
3234 # start sending a file at a certain offset
3235 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003236 offset = len(self.DATA) // 2
3237 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003238 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003239 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003240 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3241 if sent == 0:
3242 break
3243 offset += sent
3244 total_sent += sent
3245 self.assertTrue(sent <= nbytes)
3246
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003247 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003248 self.client.close()
3249 self.server.wait()
3250 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003251 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003252 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003253 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003254 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003255
3256 def test_offset_overflow(self):
3257 # specify an offset > file size
3258 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003259 try:
3260 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3261 except OSError as e:
3262 # Solaris can raise EINVAL if offset >= file length, ignore.
3263 if e.errno != errno.EINVAL:
3264 raise
3265 else:
3266 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003267 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003268 self.client.close()
3269 self.server.wait()
3270 data = self.server.handler_instance.get_data()
3271 self.assertEqual(data, b'')
3272
3273 def test_invalid_offset(self):
3274 with self.assertRaises(OSError) as cm:
3275 os.sendfile(self.sockno, self.fileno, -1, 4096)
3276 self.assertEqual(cm.exception.errno, errno.EINVAL)
3277
Martin Panterbf19d162015-09-09 01:01:13 +00003278 def test_keywords(self):
3279 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003280 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3281 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003282 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003283 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3284 offset=0, count=4096,
3285 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003286
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003287 # --- headers / trailers tests
3288
Serhiy Storchaka43767632013-11-03 21:31:38 +02003289 @requires_headers_trailers
3290 def test_headers(self):
3291 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003292 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003293 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003294 headers=[b"x" * 512, b"y" * 256])
3295 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003296 total_sent += sent
3297 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003298 while total_sent < len(expected_data):
3299 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003300 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3301 offset, nbytes)
3302 if sent == 0:
3303 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003304 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003305 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003306 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003307
Serhiy Storchaka43767632013-11-03 21:31:38 +02003308 self.assertEqual(total_sent, len(expected_data))
3309 self.client.close()
3310 self.server.wait()
3311 data = self.server.handler_instance.get_data()
3312 self.assertEqual(hash(data), hash(expected_data))
3313
3314 @requires_headers_trailers
3315 def test_trailers(self):
3316 TESTFN2 = support.TESTFN + "2"
3317 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003318
3319 self.addCleanup(support.unlink, TESTFN2)
3320 create_file(TESTFN2, file_data)
3321
3322 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003323 os.sendfile(self.sockno, f.fileno(), 0, 5,
3324 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003325 self.client.close()
3326 self.server.wait()
3327 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003328 self.assertEqual(data, b"abcde123456789")
3329
3330 @requires_headers_trailers
3331 @requires_32b
3332 def test_headers_overflow_32bits(self):
3333 self.server.handler_instance.accumulate = False
3334 with self.assertRaises(OSError) as cm:
3335 os.sendfile(self.sockno, self.fileno, 0, 0,
3336 headers=[b"x" * 2**16] * 2**15)
3337 self.assertEqual(cm.exception.errno, errno.EINVAL)
3338
3339 @requires_headers_trailers
3340 @requires_32b
3341 def test_trailers_overflow_32bits(self):
3342 self.server.handler_instance.accumulate = False
3343 with self.assertRaises(OSError) as cm:
3344 os.sendfile(self.sockno, self.fileno, 0, 0,
3345 trailers=[b"x" * 2**16] * 2**15)
3346 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003347
Serhiy Storchaka43767632013-11-03 21:31:38 +02003348 @requires_headers_trailers
3349 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3350 'test needs os.SF_NODISKIO')
3351 def test_flags(self):
3352 try:
3353 os.sendfile(self.sockno, self.fileno, 0, 4096,
3354 flags=os.SF_NODISKIO)
3355 except OSError as err:
3356 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3357 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003358
3359
Larry Hastings9cf065c2012-06-22 16:30:09 -07003360def supports_extended_attributes():
3361 if not hasattr(os, "setxattr"):
3362 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003363
Larry Hastings9cf065c2012-06-22 16:30:09 -07003364 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003365 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003366 try:
3367 os.setxattr(fp.fileno(), b"user.test", b"")
3368 except OSError:
3369 return False
3370 finally:
3371 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003372
3373 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003374
3375
3376@unittest.skipUnless(supports_extended_attributes(),
3377 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003378# Kernels < 2.6.39 don't respect setxattr flags.
3379@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003380class ExtendedAttributeTests(unittest.TestCase):
3381
Larry Hastings9cf065c2012-06-22 16:30:09 -07003382 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003383 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003384 self.addCleanup(support.unlink, fn)
3385 create_file(fn)
3386
Benjamin Peterson799bd802011-08-31 22:15:17 -04003387 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003388 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003389 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003390
Victor Stinnerf12e5062011-10-16 22:12:03 +02003391 init_xattr = listxattr(fn)
3392 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003393
Larry Hastings9cf065c2012-06-22 16:30:09 -07003394 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003395 xattr = set(init_xattr)
3396 xattr.add("user.test")
3397 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003398 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3399 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3400 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003401
Benjamin Peterson799bd802011-08-31 22:15:17 -04003402 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003403 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003404 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003405
Benjamin Peterson799bd802011-08-31 22:15:17 -04003406 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003407 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003408 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003409
Larry Hastings9cf065c2012-06-22 16:30:09 -07003410 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003411 xattr.add("user.test2")
3412 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003413 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003414
Benjamin Peterson799bd802011-08-31 22:15:17 -04003415 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003416 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003417 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003418
Victor Stinnerf12e5062011-10-16 22:12:03 +02003419 xattr.remove("user.test")
3420 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003421 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3422 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3423 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3424 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003425 many = sorted("user.test{}".format(i) for i in range(100))
3426 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003427 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003428 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003429
Larry Hastings9cf065c2012-06-22 16:30:09 -07003430 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003431 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003432 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003433
3434 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3435 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003436
3437 def test_simple(self):
3438 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3439 os.listxattr)
3440
3441 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003442 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3443 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003444
3445 def test_fds(self):
3446 def getxattr(path, *args):
3447 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003448 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003449 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003450 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003451 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003452 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003453 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003454 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003455 def listxattr(path, *args):
3456 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003457 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003458 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3459
3460
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003461@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3462class TermsizeTests(unittest.TestCase):
3463 def test_does_not_crash(self):
3464 """Check if get_terminal_size() returns a meaningful value.
3465
3466 There's no easy portable way to actually check the size of the
3467 terminal, so let's check if it returns something sensible instead.
3468 """
3469 try:
3470 size = os.get_terminal_size()
3471 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003472 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003473 # Under win32 a generic OSError can be thrown if the
3474 # handle cannot be retrieved
3475 self.skipTest("failed to query terminal size")
3476 raise
3477
Antoine Pitroucfade362012-02-08 23:48:59 +01003478 self.assertGreaterEqual(size.columns, 0)
3479 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003480
3481 def test_stty_match(self):
3482 """Check if stty returns the same results
3483
3484 stty actually tests stdin, so get_terminal_size is invoked on
3485 stdin explicitly. If stty succeeded, then get_terminal_size()
3486 should work too.
3487 """
3488 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003489 size = (
3490 subprocess.check_output(
3491 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3492 ).split()
3493 )
xdegaye6a55d092017-11-12 17:57:04 +01003494 except (FileNotFoundError, subprocess.CalledProcessError,
3495 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003496 self.skipTest("stty invocation failed")
3497 expected = (int(size[1]), int(size[0])) # reversed order
3498
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003499 try:
3500 actual = os.get_terminal_size(sys.__stdin__.fileno())
3501 except OSError as e:
3502 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3503 # Under win32 a generic OSError can be thrown if the
3504 # handle cannot be retrieved
3505 self.skipTest("failed to query terminal size")
3506 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003507 self.assertEqual(expected, actual)
3508
3509
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003510@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003511@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003512class MemfdCreateTests(unittest.TestCase):
3513 def test_memfd_create(self):
3514 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3515 self.assertNotEqual(fd, -1)
3516 self.addCleanup(os.close, fd)
3517 self.assertFalse(os.get_inheritable(fd))
3518 with open(fd, "wb", closefd=False) as f:
3519 f.write(b'memfd_create')
3520 self.assertEqual(f.tell(), 12)
3521
3522 fd2 = os.memfd_create("Hi")
3523 self.addCleanup(os.close, fd2)
3524 self.assertFalse(os.get_inheritable(fd2))
3525
3526
Victor Stinner292c8352012-10-30 02:17:38 +01003527class OSErrorTests(unittest.TestCase):
3528 def setUp(self):
3529 class Str(str):
3530 pass
3531
Victor Stinnerafe17062012-10-31 22:47:43 +01003532 self.bytes_filenames = []
3533 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003534 if support.TESTFN_UNENCODABLE is not None:
3535 decoded = support.TESTFN_UNENCODABLE
3536 else:
3537 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003538 self.unicode_filenames.append(decoded)
3539 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003540 if support.TESTFN_UNDECODABLE is not None:
3541 encoded = support.TESTFN_UNDECODABLE
3542 else:
3543 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003544 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003545 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003546 self.bytes_filenames.append(memoryview(encoded))
3547
3548 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003549
3550 def test_oserror_filename(self):
3551 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003552 (self.filenames, os.chdir,),
3553 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003554 (self.filenames, os.lstat,),
3555 (self.filenames, os.open, os.O_RDONLY),
3556 (self.filenames, os.rmdir,),
3557 (self.filenames, os.stat,),
3558 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003559 ]
3560 if sys.platform == "win32":
3561 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003562 (self.bytes_filenames, os.rename, b"dst"),
3563 (self.bytes_filenames, os.replace, b"dst"),
3564 (self.unicode_filenames, os.rename, "dst"),
3565 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003566 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003567 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003568 else:
3569 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003570 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003571 (self.filenames, os.rename, "dst"),
3572 (self.filenames, os.replace, "dst"),
3573 ))
3574 if hasattr(os, "chown"):
3575 funcs.append((self.filenames, os.chown, 0, 0))
3576 if hasattr(os, "lchown"):
3577 funcs.append((self.filenames, os.lchown, 0, 0))
3578 if hasattr(os, "truncate"):
3579 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003580 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003581 funcs.append((self.filenames, os.chflags, 0))
3582 if hasattr(os, "lchflags"):
3583 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003584 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003585 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003586 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003587 if sys.platform == "win32":
3588 funcs.append((self.bytes_filenames, os.link, b"dst"))
3589 funcs.append((self.unicode_filenames, os.link, "dst"))
3590 else:
3591 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003592 if hasattr(os, "listxattr"):
3593 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003594 (self.filenames, os.listxattr,),
3595 (self.filenames, os.getxattr, "user.test"),
3596 (self.filenames, os.setxattr, "user.test", b'user'),
3597 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003598 ))
3599 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003600 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003601 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003602 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003603
Steve Dowercc16be82016-09-08 10:35:16 -07003604
Victor Stinnerafe17062012-10-31 22:47:43 +01003605 for filenames, func, *func_args in funcs:
3606 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003607 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003608 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003609 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003610 else:
3611 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3612 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003613 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003614 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003615 except UnicodeDecodeError:
3616 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003617 else:
3618 self.fail("No exception thrown by {}".format(func))
3619
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003620class CPUCountTests(unittest.TestCase):
3621 def test_cpu_count(self):
3622 cpus = os.cpu_count()
3623 if cpus is not None:
3624 self.assertIsInstance(cpus, int)
3625 self.assertGreater(cpus, 0)
3626 else:
3627 self.skipTest("Could not determine the number of CPUs")
3628
Victor Stinnerdaf45552013-08-28 00:53:59 +02003629
3630class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003631 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003632 fd = os.open(__file__, os.O_RDONLY)
3633 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003634 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003635
Victor Stinnerdaf45552013-08-28 00:53:59 +02003636 os.set_inheritable(fd, True)
3637 self.assertEqual(os.get_inheritable(fd), True)
3638
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003639 @unittest.skipIf(fcntl is None, "need fcntl")
3640 def test_get_inheritable_cloexec(self):
3641 fd = os.open(__file__, os.O_RDONLY)
3642 self.addCleanup(os.close, fd)
3643 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003644
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003645 # clear FD_CLOEXEC flag
3646 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3647 flags &= ~fcntl.FD_CLOEXEC
3648 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003649
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003650 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003651
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003652 @unittest.skipIf(fcntl is None, "need fcntl")
3653 def test_set_inheritable_cloexec(self):
3654 fd = os.open(__file__, os.O_RDONLY)
3655 self.addCleanup(os.close, fd)
3656 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3657 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003658
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003659 os.set_inheritable(fd, True)
3660 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3661 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003662
Victor Stinnerdaf45552013-08-28 00:53:59 +02003663 def test_open(self):
3664 fd = os.open(__file__, os.O_RDONLY)
3665 self.addCleanup(os.close, fd)
3666 self.assertEqual(os.get_inheritable(fd), False)
3667
3668 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3669 def test_pipe(self):
3670 rfd, wfd = os.pipe()
3671 self.addCleanup(os.close, rfd)
3672 self.addCleanup(os.close, wfd)
3673 self.assertEqual(os.get_inheritable(rfd), False)
3674 self.assertEqual(os.get_inheritable(wfd), False)
3675
3676 def test_dup(self):
3677 fd1 = os.open(__file__, os.O_RDONLY)
3678 self.addCleanup(os.close, fd1)
3679
3680 fd2 = os.dup(fd1)
3681 self.addCleanup(os.close, fd2)
3682 self.assertEqual(os.get_inheritable(fd2), False)
3683
Zackery Spytz5be66602019-08-23 12:38:41 -06003684 def test_dup_standard_stream(self):
3685 fd = os.dup(1)
3686 self.addCleanup(os.close, fd)
3687 self.assertGreater(fd, 0)
3688
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003689 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3690 def test_dup_nul(self):
3691 # os.dup() was creating inheritable fds for character files.
3692 fd1 = os.open('NUL', os.O_RDONLY)
3693 self.addCleanup(os.close, fd1)
3694 fd2 = os.dup(fd1)
3695 self.addCleanup(os.close, fd2)
3696 self.assertFalse(os.get_inheritable(fd2))
3697
Victor Stinnerdaf45552013-08-28 00:53:59 +02003698 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3699 def test_dup2(self):
3700 fd = os.open(__file__, os.O_RDONLY)
3701 self.addCleanup(os.close, fd)
3702
3703 # inheritable by default
3704 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003705 self.addCleanup(os.close, fd2)
3706 self.assertEqual(os.dup2(fd, fd2), fd2)
3707 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003708
3709 # force non-inheritable
3710 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003711 self.addCleanup(os.close, fd3)
3712 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3713 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003714
3715 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3716 def test_openpty(self):
3717 master_fd, slave_fd = os.openpty()
3718 self.addCleanup(os.close, master_fd)
3719 self.addCleanup(os.close, slave_fd)
3720 self.assertEqual(os.get_inheritable(master_fd), False)
3721 self.assertEqual(os.get_inheritable(slave_fd), False)
3722
3723
Brett Cannon3f9183b2016-08-26 14:44:48 -07003724class PathTConverterTests(unittest.TestCase):
3725 # tuples of (function name, allows fd arguments, additional arguments to
3726 # function, cleanup function)
3727 functions = [
3728 ('stat', True, (), None),
3729 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003730 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003731 ('chflags', False, (0,), None),
3732 ('lchflags', False, (0,), None),
3733 ('open', False, (0,), getattr(os, 'close', None)),
3734 ]
3735
3736 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003737 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003738 if os.name == 'nt':
3739 bytes_fspath = bytes_filename = None
3740 else:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03003741 bytes_filename = os.fsencode(support.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003742 bytes_fspath = FakePath(bytes_filename)
3743 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003744 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003745 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003746
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003747 int_fspath = FakePath(fd)
3748 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003749
3750 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3751 with self.subTest(name=name):
3752 try:
3753 fn = getattr(os, name)
3754 except AttributeError:
3755 continue
3756
Brett Cannon8f96a302016-08-26 19:30:11 -07003757 for path in (str_filename, bytes_filename, str_fspath,
3758 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003759 if path is None:
3760 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003761 with self.subTest(name=name, path=path):
3762 result = fn(path, *extra_args)
3763 if cleanup_fn is not None:
3764 cleanup_fn(result)
3765
3766 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003767 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003768 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003769
3770 if allow_fd:
3771 result = fn(fd, *extra_args) # should not fail
3772 if cleanup_fn is not None:
3773 cleanup_fn(result)
3774 else:
3775 with self.assertRaisesRegex(
3776 TypeError,
3777 'os.PathLike'):
3778 fn(fd, *extra_args)
3779
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003780 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003781 msg = r'__fspath__\(\) to return str or bytes, not %s'
3782 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003783 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003784 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003785 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003786 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003787 os.stat(FakePath(object()))
3788
Brett Cannon3f9183b2016-08-26 14:44:48 -07003789
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003790@unittest.skipUnless(hasattr(os, 'get_blocking'),
3791 'needs os.get_blocking() and os.set_blocking()')
3792class BlockingTests(unittest.TestCase):
3793 def test_blocking(self):
3794 fd = os.open(__file__, os.O_RDONLY)
3795 self.addCleanup(os.close, fd)
3796 self.assertEqual(os.get_blocking(fd), True)
3797
3798 os.set_blocking(fd, False)
3799 self.assertEqual(os.get_blocking(fd), False)
3800
3801 os.set_blocking(fd, True)
3802 self.assertEqual(os.get_blocking(fd), True)
3803
3804
Yury Selivanov97e2e062014-09-26 12:33:06 -04003805
3806class ExportsTests(unittest.TestCase):
3807 def test_os_all(self):
3808 self.assertIn('open', os.__all__)
3809 self.assertIn('walk', os.__all__)
3810
3811
Eddie Elizondob3966632019-11-05 07:16:14 -08003812class TestDirEntry(unittest.TestCase):
3813 def setUp(self):
3814 self.path = os.path.realpath(support.TESTFN)
3815 self.addCleanup(support.rmtree, self.path)
3816 os.mkdir(self.path)
3817
3818 def test_uninstantiable(self):
3819 self.assertRaises(TypeError, os.DirEntry)
3820
3821 def test_unpickable(self):
3822 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3823 entry = [entry for entry in os.scandir(self.path)].pop()
3824 self.assertIsInstance(entry, os.DirEntry)
3825 self.assertEqual(entry.name, "file.txt")
3826 import pickle
3827 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3828
3829
Victor Stinner6036e442015-03-08 01:58:04 +01003830class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003831 check_no_resource_warning = support.check_no_resource_warning
3832
Victor Stinner6036e442015-03-08 01:58:04 +01003833 def setUp(self):
3834 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003835 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003836 self.addCleanup(support.rmtree, self.path)
3837 os.mkdir(self.path)
3838
3839 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003840 path = self.bytes_path if isinstance(name, bytes) else self.path
3841 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003842 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003843 return filename
3844
3845 def get_entries(self, names):
3846 entries = dict((entry.name, entry)
3847 for entry in os.scandir(self.path))
3848 self.assertEqual(sorted(entries.keys()), names)
3849 return entries
3850
3851 def assert_stat_equal(self, stat1, stat2, skip_fields):
3852 if skip_fields:
3853 for attr in dir(stat1):
3854 if not attr.startswith("st_"):
3855 continue
3856 if attr in ("st_dev", "st_ino", "st_nlink"):
3857 continue
3858 self.assertEqual(getattr(stat1, attr),
3859 getattr(stat2, attr),
3860 (stat1, stat2, attr))
3861 else:
3862 self.assertEqual(stat1, stat2)
3863
Eddie Elizondob3966632019-11-05 07:16:14 -08003864 def test_uninstantiable(self):
3865 scandir_iter = os.scandir(self.path)
3866 self.assertRaises(TypeError, type(scandir_iter))
3867 scandir_iter.close()
3868
3869 def test_unpickable(self):
3870 filename = self.create_file("file.txt")
3871 scandir_iter = os.scandir(self.path)
3872 import pickle
3873 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3874 scandir_iter.close()
3875
Victor Stinner6036e442015-03-08 01:58:04 +01003876 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003877 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003878 self.assertEqual(entry.name, name)
3879 self.assertEqual(entry.path, os.path.join(self.path, name))
3880 self.assertEqual(entry.inode(),
3881 os.stat(entry.path, follow_symlinks=False).st_ino)
3882
3883 entry_stat = os.stat(entry.path)
3884 self.assertEqual(entry.is_dir(),
3885 stat.S_ISDIR(entry_stat.st_mode))
3886 self.assertEqual(entry.is_file(),
3887 stat.S_ISREG(entry_stat.st_mode))
3888 self.assertEqual(entry.is_symlink(),
3889 os.path.islink(entry.path))
3890
3891 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3892 self.assertEqual(entry.is_dir(follow_symlinks=False),
3893 stat.S_ISDIR(entry_lstat.st_mode))
3894 self.assertEqual(entry.is_file(follow_symlinks=False),
3895 stat.S_ISREG(entry_lstat.st_mode))
3896
3897 self.assert_stat_equal(entry.stat(),
3898 entry_stat,
3899 os.name == 'nt' and not is_symlink)
3900 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3901 entry_lstat,
3902 os.name == 'nt')
3903
3904 def test_attributes(self):
3905 link = hasattr(os, 'link')
3906 symlink = support.can_symlink()
3907
3908 dirname = os.path.join(self.path, "dir")
3909 os.mkdir(dirname)
3910 filename = self.create_file("file.txt")
3911 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003912 try:
3913 os.link(filename, os.path.join(self.path, "link_file.txt"))
3914 except PermissionError as e:
3915 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003916 if symlink:
3917 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3918 target_is_directory=True)
3919 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3920
3921 names = ['dir', 'file.txt']
3922 if link:
3923 names.append('link_file.txt')
3924 if symlink:
3925 names.extend(('symlink_dir', 'symlink_file.txt'))
3926 entries = self.get_entries(names)
3927
3928 entry = entries['dir']
3929 self.check_entry(entry, 'dir', True, False, False)
3930
3931 entry = entries['file.txt']
3932 self.check_entry(entry, 'file.txt', False, True, False)
3933
3934 if link:
3935 entry = entries['link_file.txt']
3936 self.check_entry(entry, 'link_file.txt', False, True, False)
3937
3938 if symlink:
3939 entry = entries['symlink_dir']
3940 self.check_entry(entry, 'symlink_dir', True, False, True)
3941
3942 entry = entries['symlink_file.txt']
3943 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3944
3945 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003946 path = self.bytes_path if isinstance(name, bytes) else self.path
3947 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003948 self.assertEqual(len(entries), 1)
3949
3950 entry = entries[0]
3951 self.assertEqual(entry.name, name)
3952 return entry
3953
Brett Cannon96881cd2016-06-10 14:37:21 -07003954 def create_file_entry(self, name='file.txt'):
3955 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003956 return self.get_entry(os.path.basename(filename))
3957
3958 def test_current_directory(self):
3959 filename = self.create_file()
3960 old_dir = os.getcwd()
3961 try:
3962 os.chdir(self.path)
3963
3964 # call scandir() without parameter: it must list the content
3965 # of the current directory
3966 entries = dict((entry.name, entry) for entry in os.scandir())
3967 self.assertEqual(sorted(entries.keys()),
3968 [os.path.basename(filename)])
3969 finally:
3970 os.chdir(old_dir)
3971
3972 def test_repr(self):
3973 entry = self.create_file_entry()
3974 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3975
Brett Cannon96881cd2016-06-10 14:37:21 -07003976 def test_fspath_protocol(self):
3977 entry = self.create_file_entry()
3978 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3979
3980 def test_fspath_protocol_bytes(self):
3981 bytes_filename = os.fsencode('bytesfile.txt')
3982 bytes_entry = self.create_file_entry(name=bytes_filename)
3983 fspath = os.fspath(bytes_entry)
3984 self.assertIsInstance(fspath, bytes)
3985 self.assertEqual(fspath,
3986 os.path.join(os.fsencode(self.path),bytes_filename))
3987
Victor Stinner6036e442015-03-08 01:58:04 +01003988 def test_removed_dir(self):
3989 path = os.path.join(self.path, 'dir')
3990
3991 os.mkdir(path)
3992 entry = self.get_entry('dir')
3993 os.rmdir(path)
3994
3995 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3996 if os.name == 'nt':
3997 self.assertTrue(entry.is_dir())
3998 self.assertFalse(entry.is_file())
3999 self.assertFalse(entry.is_symlink())
4000 if os.name == 'nt':
4001 self.assertRaises(FileNotFoundError, entry.inode)
4002 # don't fail
4003 entry.stat()
4004 entry.stat(follow_symlinks=False)
4005 else:
4006 self.assertGreater(entry.inode(), 0)
4007 self.assertRaises(FileNotFoundError, entry.stat)
4008 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4009
4010 def test_removed_file(self):
4011 entry = self.create_file_entry()
4012 os.unlink(entry.path)
4013
4014 self.assertFalse(entry.is_dir())
4015 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4016 if os.name == 'nt':
4017 self.assertTrue(entry.is_file())
4018 self.assertFalse(entry.is_symlink())
4019 if os.name == 'nt':
4020 self.assertRaises(FileNotFoundError, entry.inode)
4021 # don't fail
4022 entry.stat()
4023 entry.stat(follow_symlinks=False)
4024 else:
4025 self.assertGreater(entry.inode(), 0)
4026 self.assertRaises(FileNotFoundError, entry.stat)
4027 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4028
4029 def test_broken_symlink(self):
4030 if not support.can_symlink():
4031 return self.skipTest('cannot create symbolic link')
4032
4033 filename = self.create_file("file.txt")
4034 os.symlink(filename,
4035 os.path.join(self.path, "symlink.txt"))
4036 entries = self.get_entries(['file.txt', 'symlink.txt'])
4037 entry = entries['symlink.txt']
4038 os.unlink(filename)
4039
4040 self.assertGreater(entry.inode(), 0)
4041 self.assertFalse(entry.is_dir())
4042 self.assertFalse(entry.is_file()) # broken symlink returns False
4043 self.assertFalse(entry.is_dir(follow_symlinks=False))
4044 self.assertFalse(entry.is_file(follow_symlinks=False))
4045 self.assertTrue(entry.is_symlink())
4046 self.assertRaises(FileNotFoundError, entry.stat)
4047 # don't fail
4048 entry.stat(follow_symlinks=False)
4049
4050 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004051 self.create_file("file.txt")
4052
4053 path_bytes = os.fsencode(self.path)
4054 entries = list(os.scandir(path_bytes))
4055 self.assertEqual(len(entries), 1, entries)
4056 entry = entries[0]
4057
4058 self.assertEqual(entry.name, b'file.txt')
4059 self.assertEqual(entry.path,
4060 os.fsencode(os.path.join(self.path, 'file.txt')))
4061
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004062 def test_bytes_like(self):
4063 self.create_file("file.txt")
4064
4065 for cls in bytearray, memoryview:
4066 path_bytes = cls(os.fsencode(self.path))
4067 with self.assertWarns(DeprecationWarning):
4068 entries = list(os.scandir(path_bytes))
4069 self.assertEqual(len(entries), 1, entries)
4070 entry = entries[0]
4071
4072 self.assertEqual(entry.name, b'file.txt')
4073 self.assertEqual(entry.path,
4074 os.fsencode(os.path.join(self.path, 'file.txt')))
4075 self.assertIs(type(entry.name), bytes)
4076 self.assertIs(type(entry.path), bytes)
4077
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004078 @unittest.skipUnless(os.listdir in os.supports_fd,
4079 'fd support for listdir required for this test.')
4080 def test_fd(self):
4081 self.assertIn(os.scandir, os.supports_fd)
4082 self.create_file('file.txt')
4083 expected_names = ['file.txt']
4084 if support.can_symlink():
4085 os.symlink('file.txt', os.path.join(self.path, 'link'))
4086 expected_names.append('link')
4087
4088 fd = os.open(self.path, os.O_RDONLY)
4089 try:
4090 with os.scandir(fd) as it:
4091 entries = list(it)
4092 names = [entry.name for entry in entries]
4093 self.assertEqual(sorted(names), expected_names)
4094 self.assertEqual(names, os.listdir(fd))
4095 for entry in entries:
4096 self.assertEqual(entry.path, entry.name)
4097 self.assertEqual(os.fspath(entry), entry.name)
4098 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4099 if os.stat in os.supports_dir_fd:
4100 st = os.stat(entry.name, dir_fd=fd)
4101 self.assertEqual(entry.stat(), st)
4102 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4103 self.assertEqual(entry.stat(follow_symlinks=False), st)
4104 finally:
4105 os.close(fd)
4106
Victor Stinner6036e442015-03-08 01:58:04 +01004107 def test_empty_path(self):
4108 self.assertRaises(FileNotFoundError, os.scandir, '')
4109
4110 def test_consume_iterator_twice(self):
4111 self.create_file("file.txt")
4112 iterator = os.scandir(self.path)
4113
4114 entries = list(iterator)
4115 self.assertEqual(len(entries), 1, entries)
4116
4117 # check than consuming the iterator twice doesn't raise exception
4118 entries2 = list(iterator)
4119 self.assertEqual(len(entries2), 0, entries2)
4120
4121 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004122 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004123 self.assertRaises(TypeError, os.scandir, obj)
4124
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004125 def test_close(self):
4126 self.create_file("file.txt")
4127 self.create_file("file2.txt")
4128 iterator = os.scandir(self.path)
4129 next(iterator)
4130 iterator.close()
4131 # multiple closes
4132 iterator.close()
4133 with self.check_no_resource_warning():
4134 del iterator
4135
4136 def test_context_manager(self):
4137 self.create_file("file.txt")
4138 self.create_file("file2.txt")
4139 with os.scandir(self.path) as iterator:
4140 next(iterator)
4141 with self.check_no_resource_warning():
4142 del iterator
4143
4144 def test_context_manager_close(self):
4145 self.create_file("file.txt")
4146 self.create_file("file2.txt")
4147 with os.scandir(self.path) as iterator:
4148 next(iterator)
4149 iterator.close()
4150
4151 def test_context_manager_exception(self):
4152 self.create_file("file.txt")
4153 self.create_file("file2.txt")
4154 with self.assertRaises(ZeroDivisionError):
4155 with os.scandir(self.path) as iterator:
4156 next(iterator)
4157 1/0
4158 with self.check_no_resource_warning():
4159 del iterator
4160
4161 def test_resource_warning(self):
4162 self.create_file("file.txt")
4163 self.create_file("file2.txt")
4164 iterator = os.scandir(self.path)
4165 next(iterator)
4166 with self.assertWarns(ResourceWarning):
4167 del iterator
4168 support.gc_collect()
4169 # exhausted iterator
4170 iterator = os.scandir(self.path)
4171 list(iterator)
4172 with self.check_no_resource_warning():
4173 del iterator
4174
Victor Stinner6036e442015-03-08 01:58:04 +01004175
Ethan Furmancdc08792016-06-02 15:06:09 -07004176class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004177
4178 # Abstracted so it can be overridden to test pure Python implementation
4179 # if a C version is provided.
4180 fspath = staticmethod(os.fspath)
4181
Ethan Furmancdc08792016-06-02 15:06:09 -07004182 def test_return_bytes(self):
4183 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004184 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004185
4186 def test_return_string(self):
4187 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004188 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004189
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004190 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004191 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004192 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004193
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004194 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004195 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4196 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4197
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004198 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004199 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4200 self.assertTrue(issubclass(FakePath, os.PathLike))
4201 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004202
Ethan Furmancdc08792016-06-02 15:06:09 -07004203 def test_garbage_in_exception_out(self):
4204 vapor = type('blah', (), {})
4205 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004206 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004207
4208 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004209 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004210
Brett Cannon044283a2016-07-15 10:41:49 -07004211 def test_bad_pathlike(self):
4212 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004213 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004214 # __fspath__ attribute that is not callable.
4215 c = type('foo', (), {})
4216 c.__fspath__ = 1
4217 self.assertRaises(TypeError, self.fspath, c())
4218 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004219 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004220 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004221
Bar Hareleae87e32019-12-22 11:57:27 +02004222 def test_pathlike_subclasshook(self):
4223 # bpo-38878: subclasshook causes subclass checks
4224 # true on abstract implementation.
4225 class A(os.PathLike):
4226 pass
4227 self.assertFalse(issubclass(FakePath, A))
4228 self.assertTrue(issubclass(FakePath, os.PathLike))
4229
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004230 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004231 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004232
Victor Stinnerc29b5852017-11-02 07:28:27 -07004233
4234class TimesTests(unittest.TestCase):
4235 def test_times(self):
4236 times = os.times()
4237 self.assertIsInstance(times, os.times_result)
4238
4239 for field in ('user', 'system', 'children_user', 'children_system',
4240 'elapsed'):
4241 value = getattr(times, field)
4242 self.assertIsInstance(value, float)
4243
4244 if os.name == 'nt':
4245 self.assertEqual(times.children_user, 0)
4246 self.assertEqual(times.children_system, 0)
4247 self.assertEqual(times.elapsed, 0)
4248
4249
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004250# Only test if the C version is provided, otherwise TestPEP519 already tested
4251# the pure Python implementation.
4252if hasattr(os, "_fspath"):
4253 class TestPEP519PurePython(TestPEP519):
4254
4255 """Explicitly test the pure Python implementation of os.fspath()."""
4256
4257 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004258
4259
Fred Drake2e2be372001-09-20 21:33:42 +00004260if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004261 unittest.main()