blob: dbdc00c2fea2295534326aa9f165969b3f24f185 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200109 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Victor Stinner689830e2019-06-26 17:31:12 +0200144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Steve Dower772ec0f2019-09-04 14:42:54 -0700594 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
595 def test_stat_block_device(self):
596 # bpo-38030: os.stat fails for block devices
597 # Test a filename like "//./C:"
598 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
599 result = os.stat(fname)
600 self.assertEqual(result.st_mode, stat.S_IFBLK)
601
Victor Stinner47aacc82015-06-12 17:26:23 +0200602
603class UtimeTests(unittest.TestCase):
604 def setUp(self):
605 self.dirname = support.TESTFN
606 self.fname = os.path.join(self.dirname, "f1")
607
608 self.addCleanup(support.rmtree, self.dirname)
609 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100610 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200611
Victor Stinner47aacc82015-06-12 17:26:23 +0200612 def support_subsecond(self, filename):
613 # Heuristic to check if the filesystem supports timestamp with
614 # subsecond resolution: check if float and int timestamps are different
615 st = os.stat(filename)
616 return ((st.st_atime != st[7])
617 or (st.st_mtime != st[8])
618 or (st.st_ctime != st[9]))
619
620 def _test_utime(self, set_time, filename=None):
621 if not filename:
622 filename = self.fname
623
624 support_subsecond = self.support_subsecond(filename)
625 if support_subsecond:
626 # Timestamp with a resolution of 1 microsecond (10^-6).
627 #
628 # The resolution of the C internal function used by os.utime()
629 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
630 # test with a resolution of 1 ns requires more work:
631 # see the issue #15745.
632 atime_ns = 1002003000 # 1.002003 seconds
633 mtime_ns = 4005006000 # 4.005006 seconds
634 else:
635 # use a resolution of 1 second
636 atime_ns = 5 * 10**9
637 mtime_ns = 8 * 10**9
638
639 set_time(filename, (atime_ns, mtime_ns))
640 st = os.stat(filename)
641
642 if support_subsecond:
643 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
644 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
645 else:
646 self.assertEqual(st.st_atime, atime_ns * 1e-9)
647 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
648 self.assertEqual(st.st_atime_ns, atime_ns)
649 self.assertEqual(st.st_mtime_ns, mtime_ns)
650
651 def test_utime(self):
652 def set_time(filename, ns):
653 # test the ns keyword parameter
654 os.utime(filename, ns=ns)
655 self._test_utime(set_time)
656
657 @staticmethod
658 def ns_to_sec(ns):
659 # Convert a number of nanosecond (int) to a number of seconds (float).
660 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
661 # issue, os.utime() rounds towards minus infinity.
662 return (ns * 1e-9) + 0.5e-9
663
664 def test_utime_by_indexed(self):
665 # pass times as floating point seconds as the second indexed parameter
666 def set_time(filename, ns):
667 atime_ns, mtime_ns = ns
668 atime = self.ns_to_sec(atime_ns)
669 mtime = self.ns_to_sec(mtime_ns)
670 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
671 # or utime(time_t)
672 os.utime(filename, (atime, mtime))
673 self._test_utime(set_time)
674
675 def test_utime_by_times(self):
676 def set_time(filename, ns):
677 atime_ns, mtime_ns = ns
678 atime = self.ns_to_sec(atime_ns)
679 mtime = self.ns_to_sec(mtime_ns)
680 # test the times keyword parameter
681 os.utime(filename, times=(atime, mtime))
682 self._test_utime(set_time)
683
684 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
685 "follow_symlinks support for utime required "
686 "for this test.")
687 def test_utime_nofollow_symlinks(self):
688 def set_time(filename, ns):
689 # use follow_symlinks=False to test utimensat(timespec)
690 # or lutimes(timeval)
691 os.utime(filename, ns=ns, follow_symlinks=False)
692 self._test_utime(set_time)
693
694 @unittest.skipUnless(os.utime in os.supports_fd,
695 "fd support for utime required for this test.")
696 def test_utime_fd(self):
697 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100698 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200699 # use a file descriptor to test futimens(timespec)
700 # or futimes(timeval)
701 os.utime(fp.fileno(), ns=ns)
702 self._test_utime(set_time)
703
704 @unittest.skipUnless(os.utime in os.supports_dir_fd,
705 "dir_fd support for utime required for this test.")
706 def test_utime_dir_fd(self):
707 def set_time(filename, ns):
708 dirname, name = os.path.split(filename)
709 dirfd = os.open(dirname, os.O_RDONLY)
710 try:
711 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
712 os.utime(name, dir_fd=dirfd, ns=ns)
713 finally:
714 os.close(dirfd)
715 self._test_utime(set_time)
716
717 def test_utime_directory(self):
718 def set_time(filename, ns):
719 # test calling os.utime() on a directory
720 os.utime(filename, ns=ns)
721 self._test_utime(set_time, filename=self.dirname)
722
723 def _test_utime_current(self, set_time):
724 # Get the system clock
725 current = time.time()
726
727 # Call os.utime() to set the timestamp to the current system clock
728 set_time(self.fname)
729
730 if not self.support_subsecond(self.fname):
731 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700732 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200733 # On Windows, the usual resolution of time.time() is 15.6 ms.
734 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700735 #
736 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
737 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200738 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200739 st = os.stat(self.fname)
740 msg = ("st_time=%r, current=%r, dt=%r"
741 % (st.st_mtime, current, st.st_mtime - current))
742 self.assertAlmostEqual(st.st_mtime, current,
743 delta=delta, msg=msg)
744
745 def test_utime_current(self):
746 def set_time(filename):
747 # Set to the current time in the new way
748 os.utime(self.fname)
749 self._test_utime_current(set_time)
750
751 def test_utime_current_old(self):
752 def set_time(filename):
753 # Set to the current time in the old explicit way.
754 os.utime(self.fname, None)
755 self._test_utime_current(set_time)
756
757 def get_file_system(self, path):
758 if sys.platform == 'win32':
759 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
760 import ctypes
761 kernel32 = ctypes.windll.kernel32
762 buf = ctypes.create_unicode_buffer("", 100)
763 ok = kernel32.GetVolumeInformationW(root, None, 0,
764 None, None, None,
765 buf, len(buf))
766 if ok:
767 return buf.value
768 # return None if the filesystem is unknown
769
770 def test_large_time(self):
771 # Many filesystems are limited to the year 2038. At least, the test
772 # pass with NTFS filesystem.
773 if self.get_file_system(self.dirname) != "NTFS":
774 self.skipTest("requires NTFS")
775
776 large = 5000000000 # some day in 2128
777 os.utime(self.fname, (large, large))
778 self.assertEqual(os.stat(self.fname).st_mtime, large)
779
780 def test_utime_invalid_arguments(self):
781 # seconds and nanoseconds parameters are mutually exclusive
782 with self.assertRaises(ValueError):
783 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200784 with self.assertRaises(TypeError):
785 os.utime(self.fname, [5, 5])
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, (5,))
788 with self.assertRaises(TypeError):
789 os.utime(self.fname, (5, 5, 5))
790 with self.assertRaises(TypeError):
791 os.utime(self.fname, ns=[5, 5])
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, ns=(5,))
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, ns=(5, 5, 5))
796
797 if os.utime not in os.supports_follow_symlinks:
798 with self.assertRaises(NotImplementedError):
799 os.utime(self.fname, (5, 5), follow_symlinks=False)
800 if os.utime not in os.supports_fd:
801 with open(self.fname, 'wb', 0) as fp:
802 with self.assertRaises(TypeError):
803 os.utime(fp.fileno(), (5, 5))
804 if os.utime not in os.supports_dir_fd:
805 with self.assertRaises(NotImplementedError):
806 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200807
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300808 @support.cpython_only
809 def test_issue31577(self):
810 # The interpreter shouldn't crash in case utime() received a bad
811 # ns argument.
812 def get_bad_int(divmod_ret_val):
813 class BadInt:
814 def __divmod__(*args):
815 return divmod_ret_val
816 return BadInt()
817 with self.assertRaises(TypeError):
818 os.utime(self.fname, ns=(get_bad_int(42), 1))
819 with self.assertRaises(TypeError):
820 os.utime(self.fname, ns=(get_bad_int(()), 1))
821 with self.assertRaises(TypeError):
822 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
823
Victor Stinner47aacc82015-06-12 17:26:23 +0200824
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000825from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000826
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000827class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000828 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000829 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000830
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 def setUp(self):
832 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000835 for key, value in self._reference().items():
836 os.environ[key] = value
837
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 def tearDown(self):
839 os.environ.clear()
840 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000841 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000842 os.environb.clear()
843 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000844
Christian Heimes90333392007-11-01 19:08:42 +0000845 def _reference(self):
846 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
847
848 def _empty_mapping(self):
849 os.environ.clear()
850 return os.environ
851
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000852 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000855 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000856 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300857 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200858 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 value = popen.read().strip()
860 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000861
Xavier de Gayed1415312016-07-22 12:15:29 +0200862 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
863 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000864 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
866 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300867 it = iter(popen)
868 self.assertEqual(next(it), "line1\n")
869 self.assertEqual(next(it), "line2\n")
870 self.assertEqual(next(it), "line3\n")
871 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000872
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000873 # Verify environ keys and values from the OS are of the
874 # correct str type.
875 def test_keyvalue_types(self):
876 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000877 self.assertEqual(type(key), str)
878 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000879
Christian Heimes90333392007-11-01 19:08:42 +0000880 def test_items(self):
881 for key, value in self._reference().items():
882 self.assertEqual(os.environ.get(key), value)
883
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000884 # Issue 7310
885 def test___repr__(self):
886 """Check that the repr() of os.environ looks like environ({...})."""
887 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000888 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
889 '{!r}: {!r}'.format(key, value)
890 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000892 def test_get_exec_path(self):
893 defpath_list = os.defpath.split(os.pathsep)
894 test_path = ['/monty', '/python', '', '/flying/circus']
895 test_env = {'PATH': os.pathsep.join(test_path)}
896
897 saved_environ = os.environ
898 try:
899 os.environ = dict(test_env)
900 # Test that defaulting to os.environ works.
901 self.assertSequenceEqual(test_path, os.get_exec_path())
902 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
903 finally:
904 os.environ = saved_environ
905
906 # No PATH environment variable
907 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
908 # Empty PATH environment variable
909 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
910 # Supplied PATH environment variable
911 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
912
Victor Stinnerb745a742010-05-18 17:17:23 +0000913 if os.supports_bytes_environ:
914 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000915 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000916 # ignore BytesWarning warning
917 with warnings.catch_warnings(record=True):
918 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000919 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000920 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000921 pass
922 else:
923 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000924
925 # bytes key and/or value
926 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
927 ['abc'])
928 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
929 ['abc'])
930 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
931 ['abc'])
932
933 @unittest.skipUnless(os.supports_bytes_environ,
934 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000935 def test_environb(self):
936 # os.environ -> os.environb
937 value = 'euro\u20ac'
938 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000939 value_bytes = value.encode(sys.getfilesystemencoding(),
940 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000941 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000942 msg = "U+20AC character is not encodable to %s" % (
943 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000944 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['unicode'], value)
947 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000948
949 # os.environb -> os.environ
950 value = b'\xff'
951 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000953 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000955
Victor Stinner161e7b32020-01-24 11:53:44 +0100956 @unittest.skipUnless(hasattr(os, 'putenv'), "Test needs os.putenv()")
957 @unittest.skipUnless(hasattr(os, 'unsetenv'), "Test needs os.unsetenv()")
958 def test_putenv_unsetenv(self):
959 name = "PYTHONTESTVAR"
960 value = "testvalue"
961 code = f'import os; print(repr(os.environ.get({name!r})))'
962
963 with support.EnvironmentVarGuard() as env:
964 env.pop(name, None)
965
966 os.putenv(name, value)
967 proc = subprocess.run([sys.executable, '-c', code], check=True,
968 stdout=subprocess.PIPE, text=True)
969 self.assertEqual(proc.stdout.rstrip(), repr(value))
970
971 os.unsetenv(name)
972 proc = subprocess.run([sys.executable, '-c', code], check=True,
973 stdout=subprocess.PIPE, text=True)
974 self.assertEqual(proc.stdout.rstrip(), repr(None))
975
Victor Stinner13ff2452018-01-22 18:32:50 +0100976 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100977 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100978 @unittest.skipUnless(hasattr(os, 'putenv'), "Test needs os.putenv()")
979 @unittest.skipUnless(hasattr(os, 'unsetenv'), "Test needs os.unsetenv()")
980 def test_putenv_unsetenv_error(self):
981 # Empty variable name is invalid.
982 # "=" and null character are not allowed in a variable name.
983 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
984 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
985 self.assertRaises((OSError, ValueError), os.unsetenv, name)
986
Victor Stinnerb73dd022020-01-22 21:11:17 +0100987 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100988 # On Windows, an environment variable string ("name=value" string)
989 # is limited to 32,767 characters
990 longstr = 'x' * 32_768
991 self.assertRaises(ValueError, os.putenv, longstr, "1")
992 self.assertRaises(ValueError, os.putenv, "X", longstr)
993 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +0100994
Victor Stinner6d101392013-04-14 16:35:04 +0200995 def test_key_type(self):
996 missing = 'missingkey'
997 self.assertNotIn(missing, os.environ)
998
Victor Stinner839e5ea2013-04-14 16:43:03 +0200999 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001000 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001001 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001002 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001003
Victor Stinner839e5ea2013-04-14 16:43:03 +02001004 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001005 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001006 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001007 self.assertTrue(cm.exception.__suppress_context__)
1008
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001009 def _test_environ_iteration(self, collection):
1010 iterator = iter(collection)
1011 new_key = "__new_key__"
1012
1013 next(iterator) # start iteration over os.environ.items
1014
1015 # add a new key in os.environ mapping
1016 os.environ[new_key] = "test_environ_iteration"
1017
1018 try:
1019 next(iterator) # force iteration over modified mapping
1020 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1021 finally:
1022 del os.environ[new_key]
1023
1024 def test_iter_error_when_changing_os_environ(self):
1025 self._test_environ_iteration(os.environ)
1026
1027 def test_iter_error_when_changing_os_environ_items(self):
1028 self._test_environ_iteration(os.environ.items())
1029
1030 def test_iter_error_when_changing_os_environ_values(self):
1031 self._test_environ_iteration(os.environ.values())
1032
Victor Stinner6d101392013-04-14 16:35:04 +02001033
Tim Petersc4e09402003-04-25 07:11:48 +00001034class WalkTests(unittest.TestCase):
1035 """Tests for os.walk()."""
1036
Victor Stinner0561c532015-03-12 10:28:24 +01001037 # Wrapper to hide minor differences between os.walk and os.fwalk
1038 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001039 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001040 if 'follow_symlinks' in kwargs:
1041 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001042 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001043
Charles-François Natali7372b062012-02-05 15:15:38 +01001044 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001045 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001046 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001047
1048 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001049 # TESTFN/
1050 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001051 # tmp1
1052 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001053 # tmp2
1054 # SUB11/ no kids
1055 # SUB2/ a file kid and a dirsymlink kid
1056 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001057 # SUB21/ not readable
1058 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001059 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001060 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001061 # broken_link2
1062 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001063 # TEST2/
1064 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001065 self.walk_path = join(support.TESTFN, "TEST1")
1066 self.sub1_path = join(self.walk_path, "SUB1")
1067 self.sub11_path = join(self.sub1_path, "SUB11")
1068 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001069 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001070 tmp1_path = join(self.walk_path, "tmp1")
1071 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001072 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001073 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001074 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001075 t2_path = join(support.TESTFN, "TEST2")
1076 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001077 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001078 broken_link2_path = join(sub2_path, "broken_link2")
1079 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001080
1081 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001082 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001083 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001084 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001085 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001086
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001087 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001088 with open(path, "x") as f:
1089 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001090
Victor Stinner0561c532015-03-12 10:28:24 +01001091 if support.can_symlink():
1092 os.symlink(os.path.abspath(t2_path), self.link_path)
1093 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001094 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1095 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001096 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001097 ["broken_link", "broken_link2", "broken_link3",
1098 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001099 else:
pxinwr3e028b22019-02-15 13:04:47 +08001100 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001101
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001102 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001103 try:
1104 os.listdir(sub21_path)
1105 except PermissionError:
1106 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1107 else:
1108 os.chmod(sub21_path, stat.S_IRWXU)
1109 os.unlink(tmp5_path)
1110 os.rmdir(sub21_path)
1111 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001112
Victor Stinner0561c532015-03-12 10:28:24 +01001113 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001114 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001115 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001116
Tim Petersc4e09402003-04-25 07:11:48 +00001117 self.assertEqual(len(all), 4)
1118 # We can't know which order SUB1 and SUB2 will appear in.
1119 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1120 # flipped: TESTFN, SUB2, SUB1, SUB11
1121 flipped = all[0][1][0] != "SUB1"
1122 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001123 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001124 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001125 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1126 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1127 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1128 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001129
Brett Cannon3f9183b2016-08-26 14:44:48 -07001130 def test_walk_prune(self, walk_path=None):
1131 if walk_path is None:
1132 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001133 # Prune the search.
1134 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001135 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001136 all.append((root, dirs, files))
1137 # Don't descend into SUB1.
1138 if 'SUB1' in dirs:
1139 # Note that this also mutates the dirs we appended to all!
1140 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001141
Victor Stinner0561c532015-03-12 10:28:24 +01001142 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001143 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001144
1145 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001146 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001147 self.assertEqual(all[1], self.sub2_tree)
1148
Brett Cannon3f9183b2016-08-26 14:44:48 -07001149 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001150 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001151
Victor Stinner0561c532015-03-12 10:28:24 +01001152 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001153 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001154 all = list(self.walk(self.walk_path, topdown=False))
1155
Victor Stinner53b0a412016-03-26 01:12:36 +01001156 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001157 # We can't know which order SUB1 and SUB2 will appear in.
1158 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1159 # flipped: SUB2, SUB11, SUB1, TESTFN
1160 flipped = all[3][1][0] != "SUB1"
1161 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001162 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001163 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001164 self.assertEqual(all[3],
1165 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1166 self.assertEqual(all[flipped],
1167 (self.sub11_path, [], []))
1168 self.assertEqual(all[flipped + 1],
1169 (self.sub1_path, ["SUB11"], ["tmp2"]))
1170 self.assertEqual(all[2 - 2 * flipped],
1171 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001172
Victor Stinner0561c532015-03-12 10:28:24 +01001173 def test_walk_symlink(self):
1174 if not support.can_symlink():
1175 self.skipTest("need symlink support")
1176
1177 # Walk, following symlinks.
1178 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1179 for root, dirs, files in walk_it:
1180 if root == self.link_path:
1181 self.assertEqual(dirs, [])
1182 self.assertEqual(files, ["tmp4"])
1183 break
1184 else:
1185 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001186
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001187 def test_walk_bad_dir(self):
1188 # Walk top-down.
1189 errors = []
1190 walk_it = self.walk(self.walk_path, onerror=errors.append)
1191 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001192 self.assertEqual(errors, [])
1193 dir1 = 'SUB1'
1194 path1 = os.path.join(root, dir1)
1195 path1new = os.path.join(root, dir1 + '.new')
1196 os.rename(path1, path1new)
1197 try:
1198 roots = [r for r, d, f in walk_it]
1199 self.assertTrue(errors)
1200 self.assertNotIn(path1, roots)
1201 self.assertNotIn(path1new, roots)
1202 for dir2 in dirs:
1203 if dir2 != dir1:
1204 self.assertIn(os.path.join(root, dir2), roots)
1205 finally:
1206 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001207
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001208 def test_walk_many_open_files(self):
1209 depth = 30
1210 base = os.path.join(support.TESTFN, 'deep')
1211 p = os.path.join(base, *(['d']*depth))
1212 os.makedirs(p)
1213
1214 iters = [self.walk(base, topdown=False) for j in range(100)]
1215 for i in range(depth + 1):
1216 expected = (p, ['d'] if i else [], [])
1217 for it in iters:
1218 self.assertEqual(next(it), expected)
1219 p = os.path.dirname(p)
1220
1221 iters = [self.walk(base, topdown=True) for j in range(100)]
1222 p = base
1223 for i in range(depth + 1):
1224 expected = (p, ['d'] if i < depth else [], [])
1225 for it in iters:
1226 self.assertEqual(next(it), expected)
1227 p = os.path.join(p, 'd')
1228
Charles-François Natali7372b062012-02-05 15:15:38 +01001229
1230@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1231class FwalkTests(WalkTests):
1232 """Tests for os.fwalk()."""
1233
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001234 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001235 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001236 yield (root, dirs, files)
1237
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001238 def fwalk(self, *args, **kwargs):
1239 return os.fwalk(*args, **kwargs)
1240
Larry Hastingsc48fe982012-06-25 04:49:05 -07001241 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1242 """
1243 compare with walk() results.
1244 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001245 walk_kwargs = walk_kwargs.copy()
1246 fwalk_kwargs = fwalk_kwargs.copy()
1247 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1248 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1249 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001250
Charles-François Natali7372b062012-02-05 15:15:38 +01001251 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001252 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001253 expected[root] = (set(dirs), set(files))
1254
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001255 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001256 self.assertIn(root, expected)
1257 self.assertEqual(expected[root], (set(dirs), set(files)))
1258
Larry Hastingsc48fe982012-06-25 04:49:05 -07001259 def test_compare_to_walk(self):
1260 kwargs = {'top': support.TESTFN}
1261 self._compare_to_walk(kwargs, kwargs)
1262
Charles-François Natali7372b062012-02-05 15:15:38 +01001263 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001264 try:
1265 fd = os.open(".", os.O_RDONLY)
1266 walk_kwargs = {'top': support.TESTFN}
1267 fwalk_kwargs = walk_kwargs.copy()
1268 fwalk_kwargs['dir_fd'] = fd
1269 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1270 finally:
1271 os.close(fd)
1272
1273 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001274 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001275 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1276 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001277 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001278 # check that the FD is valid
1279 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001280 # redundant check
1281 os.stat(rootfd)
1282 # check that listdir() returns consistent information
1283 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001284
1285 def test_fd_leak(self):
1286 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1287 # we both check that calling fwalk() a large number of times doesn't
1288 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1289 minfd = os.dup(1)
1290 os.close(minfd)
1291 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001292 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001293 pass
1294 newfd = os.dup(1)
1295 self.addCleanup(os.close, newfd)
1296 self.assertEqual(newfd, minfd)
1297
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001298 # fwalk() keeps file descriptors open
1299 test_walk_many_open_files = None
1300
1301
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001302class BytesWalkTests(WalkTests):
1303 """Tests for os.walk() with bytes."""
1304 def walk(self, top, **kwargs):
1305 if 'follow_symlinks' in kwargs:
1306 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1307 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1308 root = os.fsdecode(broot)
1309 dirs = list(map(os.fsdecode, bdirs))
1310 files = list(map(os.fsdecode, bfiles))
1311 yield (root, dirs, files)
1312 bdirs[:] = list(map(os.fsencode, dirs))
1313 bfiles[:] = list(map(os.fsencode, files))
1314
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001315@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1316class BytesFwalkTests(FwalkTests):
1317 """Tests for os.walk() with bytes."""
1318 def fwalk(self, top='.', *args, **kwargs):
1319 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1320 root = os.fsdecode(broot)
1321 dirs = list(map(os.fsdecode, bdirs))
1322 files = list(map(os.fsdecode, bfiles))
1323 yield (root, dirs, files, topfd)
1324 bdirs[:] = list(map(os.fsencode, dirs))
1325 bfiles[:] = list(map(os.fsencode, files))
1326
Charles-François Natali7372b062012-02-05 15:15:38 +01001327
Guido van Rossume7ba4952007-06-06 23:52:48 +00001328class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001329 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001330 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001331
1332 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001333 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001334 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1335 os.makedirs(path) # Should work
1336 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1337 os.makedirs(path)
1338
1339 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001340 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001341 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1342 os.makedirs(path)
1343 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1344 'dir5', 'dir6')
1345 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001346
Serhiy Storchakae304e332017-03-24 13:27:42 +02001347 def test_mode(self):
1348 with support.temp_umask(0o002):
1349 base = support.TESTFN
1350 parent = os.path.join(base, 'dir1')
1351 path = os.path.join(parent, 'dir2')
1352 os.makedirs(path, 0o555)
1353 self.assertTrue(os.path.exists(path))
1354 self.assertTrue(os.path.isdir(path))
1355 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001356 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1357 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001358
Terry Reedy5a22b652010-12-02 07:05:56 +00001359 def test_exist_ok_existing_directory(self):
1360 path = os.path.join(support.TESTFN, 'dir1')
1361 mode = 0o777
1362 old_mask = os.umask(0o022)
1363 os.makedirs(path, mode)
1364 self.assertRaises(OSError, os.makedirs, path, mode)
1365 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001366 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001367 os.makedirs(path, mode=mode, exist_ok=True)
1368 os.umask(old_mask)
1369
Martin Pantera82642f2015-11-19 04:48:44 +00001370 # Issue #25583: A drive root could raise PermissionError on Windows
1371 os.makedirs(os.path.abspath('/'), exist_ok=True)
1372
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001373 def test_exist_ok_s_isgid_directory(self):
1374 path = os.path.join(support.TESTFN, 'dir1')
1375 S_ISGID = stat.S_ISGID
1376 mode = 0o777
1377 old_mask = os.umask(0o022)
1378 try:
1379 existing_testfn_mode = stat.S_IMODE(
1380 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001381 try:
1382 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001383 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001384 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001385 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1386 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1387 # The os should apply S_ISGID from the parent dir for us, but
1388 # this test need not depend on that behavior. Be explicit.
1389 os.makedirs(path, mode | S_ISGID)
1390 # http://bugs.python.org/issue14992
1391 # Should not fail when the bit is already set.
1392 os.makedirs(path, mode, exist_ok=True)
1393 # remove the bit.
1394 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001395 # May work even when the bit is not already set when demanded.
1396 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001397 finally:
1398 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001399
1400 def test_exist_ok_existing_regular_file(self):
1401 base = support.TESTFN
1402 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001403 with open(path, 'w') as f:
1404 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001405 self.assertRaises(OSError, os.makedirs, path)
1406 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1407 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1408 os.remove(path)
1409
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001410 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001411 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001412 'dir4', 'dir5', 'dir6')
1413 # If the tests failed, the bottom-most directory ('../dir6')
1414 # may not have been created, so we look for the outermost directory
1415 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001416 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001417 path = os.path.dirname(path)
1418
1419 os.removedirs(path)
1420
Andrew Svetlov405faed2012-12-25 12:18:09 +02001421
R David Murrayf2ad1732014-12-25 18:36:56 -05001422@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1423class ChownFileTests(unittest.TestCase):
1424
Berker Peksag036a71b2015-07-21 09:29:48 +03001425 @classmethod
1426 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001427 os.mkdir(support.TESTFN)
1428
1429 def test_chown_uid_gid_arguments_must_be_index(self):
1430 stat = os.stat(support.TESTFN)
1431 uid = stat.st_uid
1432 gid = stat.st_gid
1433 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1434 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1435 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1436 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1437 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1438
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001439 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1440 def test_chown_gid(self):
1441 groups = os.getgroups()
1442 if len(groups) < 2:
1443 self.skipTest("test needs at least 2 groups")
1444
R David Murrayf2ad1732014-12-25 18:36:56 -05001445 gid_1, gid_2 = groups[:2]
1446 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001447
R David Murrayf2ad1732014-12-25 18:36:56 -05001448 os.chown(support.TESTFN, uid, gid_1)
1449 gid = os.stat(support.TESTFN).st_gid
1450 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001451
R David Murrayf2ad1732014-12-25 18:36:56 -05001452 os.chown(support.TESTFN, uid, gid_2)
1453 gid = os.stat(support.TESTFN).st_gid
1454 self.assertEqual(gid, gid_2)
1455
1456 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1457 "test needs root privilege and more than one user")
1458 def test_chown_with_root(self):
1459 uid_1, uid_2 = all_users[:2]
1460 gid = os.stat(support.TESTFN).st_gid
1461 os.chown(support.TESTFN, uid_1, gid)
1462 uid = os.stat(support.TESTFN).st_uid
1463 self.assertEqual(uid, uid_1)
1464 os.chown(support.TESTFN, uid_2, gid)
1465 uid = os.stat(support.TESTFN).st_uid
1466 self.assertEqual(uid, uid_2)
1467
1468 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1469 "test needs non-root account and more than one user")
1470 def test_chown_without_permission(self):
1471 uid_1, uid_2 = all_users[:2]
1472 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001473 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001474 os.chown(support.TESTFN, uid_1, gid)
1475 os.chown(support.TESTFN, uid_2, gid)
1476
Berker Peksag036a71b2015-07-21 09:29:48 +03001477 @classmethod
1478 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001479 os.rmdir(support.TESTFN)
1480
1481
Andrew Svetlov405faed2012-12-25 12:18:09 +02001482class RemoveDirsTests(unittest.TestCase):
1483 def setUp(self):
1484 os.makedirs(support.TESTFN)
1485
1486 def tearDown(self):
1487 support.rmtree(support.TESTFN)
1488
1489 def test_remove_all(self):
1490 dira = os.path.join(support.TESTFN, 'dira')
1491 os.mkdir(dira)
1492 dirb = os.path.join(dira, 'dirb')
1493 os.mkdir(dirb)
1494 os.removedirs(dirb)
1495 self.assertFalse(os.path.exists(dirb))
1496 self.assertFalse(os.path.exists(dira))
1497 self.assertFalse(os.path.exists(support.TESTFN))
1498
1499 def test_remove_partial(self):
1500 dira = os.path.join(support.TESTFN, 'dira')
1501 os.mkdir(dira)
1502 dirb = os.path.join(dira, 'dirb')
1503 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001504 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001505 os.removedirs(dirb)
1506 self.assertFalse(os.path.exists(dirb))
1507 self.assertTrue(os.path.exists(dira))
1508 self.assertTrue(os.path.exists(support.TESTFN))
1509
1510 def test_remove_nothing(self):
1511 dira = os.path.join(support.TESTFN, 'dira')
1512 os.mkdir(dira)
1513 dirb = os.path.join(dira, 'dirb')
1514 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001515 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001516 with self.assertRaises(OSError):
1517 os.removedirs(dirb)
1518 self.assertTrue(os.path.exists(dirb))
1519 self.assertTrue(os.path.exists(dira))
1520 self.assertTrue(os.path.exists(support.TESTFN))
1521
1522
Guido van Rossume7ba4952007-06-06 23:52:48 +00001523class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001524 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001525 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001526 f.write(b'hello')
1527 f.close()
1528 with open(os.devnull, 'rb') as f:
1529 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001530
Andrew Svetlov405faed2012-12-25 12:18:09 +02001531
Guido van Rossume7ba4952007-06-06 23:52:48 +00001532class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001533 def test_urandom_length(self):
1534 self.assertEqual(len(os.urandom(0)), 0)
1535 self.assertEqual(len(os.urandom(1)), 1)
1536 self.assertEqual(len(os.urandom(10)), 10)
1537 self.assertEqual(len(os.urandom(100)), 100)
1538 self.assertEqual(len(os.urandom(1000)), 1000)
1539
1540 def test_urandom_value(self):
1541 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001542 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001543 data2 = os.urandom(16)
1544 self.assertNotEqual(data1, data2)
1545
1546 def get_urandom_subprocess(self, count):
1547 code = '\n'.join((
1548 'import os, sys',
1549 'data = os.urandom(%s)' % count,
1550 'sys.stdout.buffer.write(data)',
1551 'sys.stdout.buffer.flush()'))
1552 out = assert_python_ok('-c', code)
1553 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001554 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001555 return stdout
1556
1557 def test_urandom_subprocess(self):
1558 data1 = self.get_urandom_subprocess(16)
1559 data2 = self.get_urandom_subprocess(16)
1560 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001561
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001562
Victor Stinner9b1f4742016-09-06 16:18:52 -07001563@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1564class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001565 @classmethod
1566 def setUpClass(cls):
1567 try:
1568 os.getrandom(1)
1569 except OSError as exc:
1570 if exc.errno == errno.ENOSYS:
1571 # Python compiled on a more recent Linux version
1572 # than the current Linux kernel
1573 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1574 else:
1575 raise
1576
Victor Stinner9b1f4742016-09-06 16:18:52 -07001577 def test_getrandom_type(self):
1578 data = os.getrandom(16)
1579 self.assertIsInstance(data, bytes)
1580 self.assertEqual(len(data), 16)
1581
1582 def test_getrandom0(self):
1583 empty = os.getrandom(0)
1584 self.assertEqual(empty, b'')
1585
1586 def test_getrandom_random(self):
1587 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1588
1589 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1590 # resource /dev/random
1591
1592 def test_getrandom_nonblock(self):
1593 # The call must not fail. Check also that the flag exists
1594 try:
1595 os.getrandom(1, os.GRND_NONBLOCK)
1596 except BlockingIOError:
1597 # System urandom is not initialized yet
1598 pass
1599
1600 def test_getrandom_value(self):
1601 data1 = os.getrandom(16)
1602 data2 = os.getrandom(16)
1603 self.assertNotEqual(data1, data2)
1604
1605
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001606# os.urandom() doesn't use a file descriptor when it is implemented with the
1607# getentropy() function, the getrandom() function or the getrandom() syscall
1608OS_URANDOM_DONT_USE_FD = (
1609 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1610 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1611 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001612
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001613@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1614 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001615@unittest.skipIf(sys.platform == "vxworks",
1616 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001617class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001618 @unittest.skipUnless(resource, "test requires the resource module")
1619 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001620 # Check urandom() failing when it is not able to open /dev/random.
1621 # We spawn a new process to make the test more robust (if getrlimit()
1622 # failed to restore the file descriptor limit after this, the whole
1623 # test suite would crash; this actually happened on the OS X Tiger
1624 # buildbot).
1625 code = """if 1:
1626 import errno
1627 import os
1628 import resource
1629
1630 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1631 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1632 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001633 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001634 except OSError as e:
1635 assert e.errno == errno.EMFILE, e.errno
1636 else:
1637 raise AssertionError("OSError not raised")
1638 """
1639 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001640
Antoine Pitroue472aea2014-04-26 14:33:03 +02001641 def test_urandom_fd_closed(self):
1642 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1643 # closed.
1644 code = """if 1:
1645 import os
1646 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001647 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001648 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001649 with test.support.SuppressCrashReport():
1650 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001651 sys.stdout.buffer.write(os.urandom(4))
1652 """
1653 rc, out, err = assert_python_ok('-Sc', code)
1654
1655 def test_urandom_fd_reopened(self):
1656 # Issue #21207: urandom() should detect its fd to /dev/urandom
1657 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001658 self.addCleanup(support.unlink, support.TESTFN)
1659 create_file(support.TESTFN, b"x" * 256)
1660
Antoine Pitroue472aea2014-04-26 14:33:03 +02001661 code = """if 1:
1662 import os
1663 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001664 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001665 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001666 with test.support.SuppressCrashReport():
1667 for fd in range(3, 256):
1668 try:
1669 os.close(fd)
1670 except OSError:
1671 pass
1672 else:
1673 # Found the urandom fd (XXX hopefully)
1674 break
1675 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001676 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001677 new_fd = f.fileno()
1678 # Issue #26935: posix allows new_fd and fd to be equal but
1679 # some libc implementations have dup2 return an error in this
1680 # case.
1681 if new_fd != fd:
1682 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001683 sys.stdout.buffer.write(os.urandom(4))
1684 sys.stdout.buffer.write(os.urandom(4))
1685 """.format(TESTFN=support.TESTFN)
1686 rc, out, err = assert_python_ok('-Sc', code)
1687 self.assertEqual(len(out), 8)
1688 self.assertNotEqual(out[0:4], out[4:8])
1689 rc, out2, err2 = assert_python_ok('-Sc', code)
1690 self.assertEqual(len(out2), 8)
1691 self.assertNotEqual(out2, out)
1692
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001693
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001694@contextlib.contextmanager
1695def _execvpe_mockup(defpath=None):
1696 """
1697 Stubs out execv and execve functions when used as context manager.
1698 Records exec calls. The mock execv and execve functions always raise an
1699 exception as they would normally never return.
1700 """
1701 # A list of tuples containing (function name, first arg, args)
1702 # of calls to execv or execve that have been made.
1703 calls = []
1704
1705 def mock_execv(name, *args):
1706 calls.append(('execv', name, args))
1707 raise RuntimeError("execv called")
1708
1709 def mock_execve(name, *args):
1710 calls.append(('execve', name, args))
1711 raise OSError(errno.ENOTDIR, "execve called")
1712
1713 try:
1714 orig_execv = os.execv
1715 orig_execve = os.execve
1716 orig_defpath = os.defpath
1717 os.execv = mock_execv
1718 os.execve = mock_execve
1719 if defpath is not None:
1720 os.defpath = defpath
1721 yield calls
1722 finally:
1723 os.execv = orig_execv
1724 os.execve = orig_execve
1725 os.defpath = orig_defpath
1726
pxinwrf2d7ac72019-05-21 18:46:37 +08001727@unittest.skipUnless(hasattr(os, 'execv'),
1728 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001729class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001730 @unittest.skipIf(USING_LINUXTHREADS,
1731 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001732 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001733 self.assertRaises(OSError, os.execvpe, 'no such app-',
1734 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001735
Steve Dowerbce26262016-11-19 19:17:26 -08001736 def test_execv_with_bad_arglist(self):
1737 self.assertRaises(ValueError, os.execv, 'notepad', ())
1738 self.assertRaises(ValueError, os.execv, 'notepad', [])
1739 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1740 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1741
Thomas Heller6790d602007-08-30 17:15:14 +00001742 def test_execvpe_with_bad_arglist(self):
1743 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001744 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1745 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001746
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001747 @unittest.skipUnless(hasattr(os, '_execvpe'),
1748 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001749 def _test_internal_execvpe(self, test_type):
1750 program_path = os.sep + 'absolutepath'
1751 if test_type is bytes:
1752 program = b'executable'
1753 fullpath = os.path.join(os.fsencode(program_path), program)
1754 native_fullpath = fullpath
1755 arguments = [b'progname', 'arg1', 'arg2']
1756 else:
1757 program = 'executable'
1758 arguments = ['progname', 'arg1', 'arg2']
1759 fullpath = os.path.join(program_path, program)
1760 if os.name != "nt":
1761 native_fullpath = os.fsencode(fullpath)
1762 else:
1763 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001764 env = {'spam': 'beans'}
1765
Victor Stinnerb745a742010-05-18 17:17:23 +00001766 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001767 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001768 self.assertRaises(RuntimeError,
1769 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001770 self.assertEqual(len(calls), 1)
1771 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1772
Victor Stinnerb745a742010-05-18 17:17:23 +00001773 # test os._execvpe() with a relative path:
1774 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001775 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001776 self.assertRaises(OSError,
1777 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001778 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001779 self.assertSequenceEqual(calls[0],
1780 ('execve', native_fullpath, (arguments, env)))
1781
1782 # test os._execvpe() with a relative path:
1783 # os.get_exec_path() reads the 'PATH' variable
1784 with _execvpe_mockup() as calls:
1785 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001786 if test_type is bytes:
1787 env_path[b'PATH'] = program_path
1788 else:
1789 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001790 self.assertRaises(OSError,
1791 os._execvpe, program, arguments, env=env_path)
1792 self.assertEqual(len(calls), 1)
1793 self.assertSequenceEqual(calls[0],
1794 ('execve', native_fullpath, (arguments, env_path)))
1795
1796 def test_internal_execvpe_str(self):
1797 self._test_internal_execvpe(str)
1798 if os.name != "nt":
1799 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001800
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001801 def test_execve_invalid_env(self):
1802 args = [sys.executable, '-c', 'pass']
1803
Ville Skyttä49b27342017-08-03 09:00:59 +03001804 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001805 newenv = os.environ.copy()
1806 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1807 with self.assertRaises(ValueError):
1808 os.execve(args[0], args, newenv)
1809
Ville Skyttä49b27342017-08-03 09:00:59 +03001810 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001811 newenv = os.environ.copy()
1812 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1813 with self.assertRaises(ValueError):
1814 os.execve(args[0], args, newenv)
1815
Ville Skyttä49b27342017-08-03 09:00:59 +03001816 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001817 newenv = os.environ.copy()
1818 newenv["FRUIT=ORANGE"] = "lemon"
1819 with self.assertRaises(ValueError):
1820 os.execve(args[0], args, newenv)
1821
Alexey Izbyshev83460312018-10-20 03:28:22 +03001822 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1823 def test_execve_with_empty_path(self):
1824 # bpo-32890: Check GetLastError() misuse
1825 try:
1826 os.execve('', ['arg'], {})
1827 except OSError as e:
1828 self.assertTrue(e.winerror is None or e.winerror != 0)
1829 else:
1830 self.fail('No OSError raised')
1831
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001832
Serhiy Storchaka43767632013-11-03 21:31:38 +02001833@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001834class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001835 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001836 try:
1837 os.stat(support.TESTFN)
1838 except FileNotFoundError:
1839 exists = False
1840 except OSError as exc:
1841 exists = True
1842 self.fail("file %s must not exist; os.stat failed with %s"
1843 % (support.TESTFN, exc))
1844 else:
1845 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001846
Thomas Wouters477c8d52006-05-27 19:21:47 +00001847 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001848 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001849
1850 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001851 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001852
1853 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001854 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001855
1856 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001857 self.addCleanup(support.unlink, support.TESTFN)
1858
Victor Stinnere77c9742016-03-25 10:28:23 +01001859 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001860 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001861
1862 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001863 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001864
Thomas Wouters477c8d52006-05-27 19:21:47 +00001865 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001866 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001867
Victor Stinnere77c9742016-03-25 10:28:23 +01001868
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001869class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001870 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001871 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1872 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001873 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001874 def get_single(f):
1875 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001876 if hasattr(os, f):
1877 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001878 return helper
1879 for f in singles:
1880 locals()["test_"+f] = get_single(f)
1881
Benjamin Peterson7522c742009-01-19 21:00:09 +00001882 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001883 try:
1884 f(support.make_bad_fd(), *args)
1885 except OSError as e:
1886 self.assertEqual(e.errno, errno.EBADF)
1887 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001888 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001889 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001890
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001892 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001893 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001894
Serhiy Storchaka43767632013-11-03 21:31:38 +02001895 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001896 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001897 fd = support.make_bad_fd()
1898 # Make sure none of the descriptors we are about to close are
1899 # currently valid (issue 6542).
1900 for i in range(10):
1901 try: os.fstat(fd+i)
1902 except OSError:
1903 pass
1904 else:
1905 break
1906 if i < 2:
1907 raise unittest.SkipTest(
1908 "Unable to acquire a range of invalid file descriptors")
1909 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001910
Serhiy Storchaka43767632013-11-03 21:31:38 +02001911 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001912 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001913 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001914
Serhiy Storchaka43767632013-11-03 21:31:38 +02001915 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001916 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001917 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001918
Serhiy Storchaka43767632013-11-03 21:31:38 +02001919 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001920 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001921 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001922
Serhiy Storchaka43767632013-11-03 21:31:38 +02001923 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001924 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001925 self.check(os.pathconf, "PC_NAME_MAX")
1926 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001927
Serhiy Storchaka43767632013-11-03 21:31:38 +02001928 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001929 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001930 self.check(os.truncate, 0)
1931 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001932
Serhiy Storchaka43767632013-11-03 21:31:38 +02001933 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001934 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001935 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001936
Serhiy Storchaka43767632013-11-03 21:31:38 +02001937 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001938 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001939 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001940
Victor Stinner57ddf782014-01-08 15:21:28 +01001941 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1942 def test_readv(self):
1943 buf = bytearray(10)
1944 self.check(os.readv, [buf])
1945
Serhiy Storchaka43767632013-11-03 21:31:38 +02001946 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001947 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001948 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001949
Serhiy Storchaka43767632013-11-03 21:31:38 +02001950 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001951 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001952 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001953
Victor Stinner57ddf782014-01-08 15:21:28 +01001954 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1955 def test_writev(self):
1956 self.check(os.writev, [b'abc'])
1957
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001958 def test_inheritable(self):
1959 self.check(os.get_inheritable)
1960 self.check(os.set_inheritable, True)
1961
1962 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1963 'needs os.get_blocking() and os.set_blocking()')
1964 def test_blocking(self):
1965 self.check(os.get_blocking)
1966 self.check(os.set_blocking, True)
1967
Brian Curtin1b9df392010-11-24 20:24:31 +00001968
1969class LinkTests(unittest.TestCase):
1970 def setUp(self):
1971 self.file1 = support.TESTFN
1972 self.file2 = os.path.join(support.TESTFN + "2")
1973
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001974 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001975 for file in (self.file1, self.file2):
1976 if os.path.exists(file):
1977 os.unlink(file)
1978
Brian Curtin1b9df392010-11-24 20:24:31 +00001979 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001980 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001981
xdegaye6a55d092017-11-12 17:57:04 +01001982 try:
1983 os.link(file1, file2)
1984 except PermissionError as e:
1985 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001986 with open(file1, "r") as f1, open(file2, "r") as f2:
1987 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1988
1989 def test_link(self):
1990 self._test_link(self.file1, self.file2)
1991
1992 def test_link_bytes(self):
1993 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1994 bytes(self.file2, sys.getfilesystemencoding()))
1995
Brian Curtinf498b752010-11-30 15:54:04 +00001996 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001997 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001998 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001999 except UnicodeError:
2000 raise unittest.SkipTest("Unable to encode for this platform.")
2001
Brian Curtinf498b752010-11-30 15:54:04 +00002002 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002003 self.file2 = self.file1 + "2"
2004 self._test_link(self.file1, self.file2)
2005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2007class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002008 # uid_t and gid_t are 32-bit unsigned integers on Linux
2009 UID_OVERFLOW = (1 << 32)
2010 GID_OVERFLOW = (1 << 32)
2011
Serhiy Storchaka43767632013-11-03 21:31:38 +02002012 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2013 def test_setuid(self):
2014 if os.getuid() != 0:
2015 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002016 self.assertRaises(TypeError, os.setuid, 'not an int')
2017 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002018
Serhiy Storchaka43767632013-11-03 21:31:38 +02002019 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2020 def test_setgid(self):
2021 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2022 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002023 self.assertRaises(TypeError, os.setgid, 'not an int')
2024 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002025
Serhiy Storchaka43767632013-11-03 21:31:38 +02002026 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2027 def test_seteuid(self):
2028 if os.getuid() != 0:
2029 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002030 self.assertRaises(TypeError, os.setegid, 'not an int')
2031 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002032
Serhiy Storchaka43767632013-11-03 21:31:38 +02002033 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2034 def test_setegid(self):
2035 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2036 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002037 self.assertRaises(TypeError, os.setegid, 'not an int')
2038 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002039
Serhiy Storchaka43767632013-11-03 21:31:38 +02002040 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2041 def test_setreuid(self):
2042 if os.getuid() != 0:
2043 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002044 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2045 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2046 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2047 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002048
Serhiy Storchaka43767632013-11-03 21:31:38 +02002049 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2050 def test_setreuid_neg1(self):
2051 # Needs to accept -1. We run this in a subprocess to avoid
2052 # altering the test runner's process state (issue8045).
2053 subprocess.check_call([
2054 sys.executable, '-c',
2055 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002056
Serhiy Storchaka43767632013-11-03 21:31:38 +02002057 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2058 def test_setregid(self):
2059 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2060 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002061 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2062 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2063 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2064 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002065
Serhiy Storchaka43767632013-11-03 21:31:38 +02002066 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2067 def test_setregid_neg1(self):
2068 # Needs to accept -1. We run this in a subprocess to avoid
2069 # altering the test runner's process state (issue8045).
2070 subprocess.check_call([
2071 sys.executable, '-c',
2072 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002073
Serhiy Storchaka43767632013-11-03 21:31:38 +02002074@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2075class Pep383Tests(unittest.TestCase):
2076 def setUp(self):
2077 if support.TESTFN_UNENCODABLE:
2078 self.dir = support.TESTFN_UNENCODABLE
2079 elif support.TESTFN_NONASCII:
2080 self.dir = support.TESTFN_NONASCII
2081 else:
2082 self.dir = support.TESTFN
2083 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002084
Serhiy Storchaka43767632013-11-03 21:31:38 +02002085 bytesfn = []
2086 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002087 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002088 fn = os.fsencode(fn)
2089 except UnicodeEncodeError:
2090 return
2091 bytesfn.append(fn)
2092 add_filename(support.TESTFN_UNICODE)
2093 if support.TESTFN_UNENCODABLE:
2094 add_filename(support.TESTFN_UNENCODABLE)
2095 if support.TESTFN_NONASCII:
2096 add_filename(support.TESTFN_NONASCII)
2097 if not bytesfn:
2098 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002099
Serhiy Storchaka43767632013-11-03 21:31:38 +02002100 self.unicodefn = set()
2101 os.mkdir(self.dir)
2102 try:
2103 for fn in bytesfn:
2104 support.create_empty_file(os.path.join(self.bdir, fn))
2105 fn = os.fsdecode(fn)
2106 if fn in self.unicodefn:
2107 raise ValueError("duplicate filename")
2108 self.unicodefn.add(fn)
2109 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002110 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002111 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002112
Serhiy Storchaka43767632013-11-03 21:31:38 +02002113 def tearDown(self):
2114 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002115
Serhiy Storchaka43767632013-11-03 21:31:38 +02002116 def test_listdir(self):
2117 expected = self.unicodefn
2118 found = set(os.listdir(self.dir))
2119 self.assertEqual(found, expected)
2120 # test listdir without arguments
2121 current_directory = os.getcwd()
2122 try:
2123 os.chdir(os.sep)
2124 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2125 finally:
2126 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002127
Serhiy Storchaka43767632013-11-03 21:31:38 +02002128 def test_open(self):
2129 for fn in self.unicodefn:
2130 f = open(os.path.join(self.dir, fn), 'rb')
2131 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002132
Serhiy Storchaka43767632013-11-03 21:31:38 +02002133 @unittest.skipUnless(hasattr(os, 'statvfs'),
2134 "need os.statvfs()")
2135 def test_statvfs(self):
2136 # issue #9645
2137 for fn in self.unicodefn:
2138 # should not fail with file not found error
2139 fullname = os.path.join(self.dir, fn)
2140 os.statvfs(fullname)
2141
2142 def test_stat(self):
2143 for fn in self.unicodefn:
2144 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002145
Brian Curtineb24d742010-04-12 17:16:38 +00002146@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2147class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002148 def _kill(self, sig):
2149 # Start sys.executable as a subprocess and communicate from the
2150 # subprocess to the parent that the interpreter is ready. When it
2151 # becomes ready, send *sig* via os.kill to the subprocess and check
2152 # that the return code is equal to *sig*.
2153 import ctypes
2154 from ctypes import wintypes
2155 import msvcrt
2156
2157 # Since we can't access the contents of the process' stdout until the
2158 # process has exited, use PeekNamedPipe to see what's inside stdout
2159 # without waiting. This is done so we can tell that the interpreter
2160 # is started and running at a point where it could handle a signal.
2161 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2162 PeekNamedPipe.restype = wintypes.BOOL
2163 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2164 ctypes.POINTER(ctypes.c_char), # stdout buf
2165 wintypes.DWORD, # Buffer size
2166 ctypes.POINTER(wintypes.DWORD), # bytes read
2167 ctypes.POINTER(wintypes.DWORD), # bytes avail
2168 ctypes.POINTER(wintypes.DWORD)) # bytes left
2169 msg = "running"
2170 proc = subprocess.Popen([sys.executable, "-c",
2171 "import sys;"
2172 "sys.stdout.write('{}');"
2173 "sys.stdout.flush();"
2174 "input()".format(msg)],
2175 stdout=subprocess.PIPE,
2176 stderr=subprocess.PIPE,
2177 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002178 self.addCleanup(proc.stdout.close)
2179 self.addCleanup(proc.stderr.close)
2180 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002181
2182 count, max = 0, 100
2183 while count < max and proc.poll() is None:
2184 # Create a string buffer to store the result of stdout from the pipe
2185 buf = ctypes.create_string_buffer(len(msg))
2186 # Obtain the text currently in proc.stdout
2187 # Bytes read/avail/left are left as NULL and unused
2188 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2189 buf, ctypes.sizeof(buf), None, None, None)
2190 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2191 if buf.value:
2192 self.assertEqual(msg, buf.value.decode())
2193 break
2194 time.sleep(0.1)
2195 count += 1
2196 else:
2197 self.fail("Did not receive communication from the subprocess")
2198
Brian Curtineb24d742010-04-12 17:16:38 +00002199 os.kill(proc.pid, sig)
2200 self.assertEqual(proc.wait(), sig)
2201
2202 def test_kill_sigterm(self):
2203 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002204 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002205
2206 def test_kill_int(self):
2207 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002208 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002209
2210 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002211 tagname = "test_os_%s" % uuid.uuid1()
2212 m = mmap.mmap(-1, 1, tagname)
2213 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002214 # Run a script which has console control handling enabled.
2215 proc = subprocess.Popen([sys.executable,
2216 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002217 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002218 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2219 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002220 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002221 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002222 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002223 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002224 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002225 count += 1
2226 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002227 # Forcefully kill the process if we weren't able to signal it.
2228 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002229 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002230 os.kill(proc.pid, event)
2231 # proc.send_signal(event) could also be done here.
2232 # Allow time for the signal to be passed and the process to exit.
2233 time.sleep(0.5)
2234 if not proc.poll():
2235 # Forcefully kill the process if we weren't able to signal it.
2236 os.kill(proc.pid, signal.SIGINT)
2237 self.fail("subprocess did not stop on {}".format(name))
2238
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002239 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002240 def test_CTRL_C_EVENT(self):
2241 from ctypes import wintypes
2242 import ctypes
2243
2244 # Make a NULL value by creating a pointer with no argument.
2245 NULL = ctypes.POINTER(ctypes.c_int)()
2246 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2247 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2248 wintypes.BOOL)
2249 SetConsoleCtrlHandler.restype = wintypes.BOOL
2250
2251 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002252 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002253 # by subprocesses.
2254 SetConsoleCtrlHandler(NULL, 0)
2255
2256 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2257
2258 def test_CTRL_BREAK_EVENT(self):
2259 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2260
2261
Brian Curtind40e6f72010-07-08 21:39:08 +00002262@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002263class Win32ListdirTests(unittest.TestCase):
2264 """Test listdir on Windows."""
2265
2266 def setUp(self):
2267 self.created_paths = []
2268 for i in range(2):
2269 dir_name = 'SUB%d' % i
2270 dir_path = os.path.join(support.TESTFN, dir_name)
2271 file_name = 'FILE%d' % i
2272 file_path = os.path.join(support.TESTFN, file_name)
2273 os.makedirs(dir_path)
2274 with open(file_path, 'w') as f:
2275 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2276 self.created_paths.extend([dir_name, file_name])
2277 self.created_paths.sort()
2278
2279 def tearDown(self):
2280 shutil.rmtree(support.TESTFN)
2281
2282 def test_listdir_no_extended_path(self):
2283 """Test when the path is not an "extended" path."""
2284 # unicode
2285 self.assertEqual(
2286 sorted(os.listdir(support.TESTFN)),
2287 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002288
Tim Golden781bbeb2013-10-25 20:24:06 +01002289 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002290 self.assertEqual(
2291 sorted(os.listdir(os.fsencode(support.TESTFN))),
2292 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002293
2294 def test_listdir_extended_path(self):
2295 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002296 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002297 # unicode
2298 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2299 self.assertEqual(
2300 sorted(os.listdir(path)),
2301 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002302
Tim Golden781bbeb2013-10-25 20:24:06 +01002303 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002304 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2305 self.assertEqual(
2306 sorted(os.listdir(path)),
2307 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002308
2309
Berker Peksage0b5b202018-08-15 13:03:41 +03002310@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2311class ReadlinkTests(unittest.TestCase):
2312 filelink = 'readlinktest'
2313 filelink_target = os.path.abspath(__file__)
2314 filelinkb = os.fsencode(filelink)
2315 filelinkb_target = os.fsencode(filelink_target)
2316
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002317 def assertPathEqual(self, left, right):
2318 left = os.path.normcase(left)
2319 right = os.path.normcase(right)
2320 if sys.platform == 'win32':
2321 # Bad practice to blindly strip the prefix as it may be required to
2322 # correctly refer to the file, but we're only comparing paths here.
2323 has_prefix = lambda p: p.startswith(
2324 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2325 if has_prefix(left):
2326 left = left[4:]
2327 if has_prefix(right):
2328 right = right[4:]
2329 self.assertEqual(left, right)
2330
Berker Peksage0b5b202018-08-15 13:03:41 +03002331 def setUp(self):
2332 self.assertTrue(os.path.exists(self.filelink_target))
2333 self.assertTrue(os.path.exists(self.filelinkb_target))
2334 self.assertFalse(os.path.exists(self.filelink))
2335 self.assertFalse(os.path.exists(self.filelinkb))
2336
2337 def test_not_symlink(self):
2338 filelink_target = FakePath(self.filelink_target)
2339 self.assertRaises(OSError, os.readlink, self.filelink_target)
2340 self.assertRaises(OSError, os.readlink, filelink_target)
2341
2342 def test_missing_link(self):
2343 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2344 self.assertRaises(FileNotFoundError, os.readlink,
2345 FakePath('missing-link'))
2346
2347 @support.skip_unless_symlink
2348 def test_pathlike(self):
2349 os.symlink(self.filelink_target, self.filelink)
2350 self.addCleanup(support.unlink, self.filelink)
2351 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002352 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002353
2354 @support.skip_unless_symlink
2355 def test_pathlike_bytes(self):
2356 os.symlink(self.filelinkb_target, self.filelinkb)
2357 self.addCleanup(support.unlink, self.filelinkb)
2358 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002359 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002360 self.assertIsInstance(path, bytes)
2361
2362 @support.skip_unless_symlink
2363 def test_bytes(self):
2364 os.symlink(self.filelinkb_target, self.filelinkb)
2365 self.addCleanup(support.unlink, self.filelinkb)
2366 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002367 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002368 self.assertIsInstance(path, bytes)
2369
2370
Tim Golden781bbeb2013-10-25 20:24:06 +01002371@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002372@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002373class Win32SymlinkTests(unittest.TestCase):
2374 filelink = 'filelinktest'
2375 filelink_target = os.path.abspath(__file__)
2376 dirlink = 'dirlinktest'
2377 dirlink_target = os.path.dirname(filelink_target)
2378 missing_link = 'missing link'
2379
2380 def setUp(self):
2381 assert os.path.exists(self.dirlink_target)
2382 assert os.path.exists(self.filelink_target)
2383 assert not os.path.exists(self.dirlink)
2384 assert not os.path.exists(self.filelink)
2385 assert not os.path.exists(self.missing_link)
2386
2387 def tearDown(self):
2388 if os.path.exists(self.filelink):
2389 os.remove(self.filelink)
2390 if os.path.exists(self.dirlink):
2391 os.rmdir(self.dirlink)
2392 if os.path.lexists(self.missing_link):
2393 os.remove(self.missing_link)
2394
2395 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002396 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002397 self.assertTrue(os.path.exists(self.dirlink))
2398 self.assertTrue(os.path.isdir(self.dirlink))
2399 self.assertTrue(os.path.islink(self.dirlink))
2400 self.check_stat(self.dirlink, self.dirlink_target)
2401
2402 def test_file_link(self):
2403 os.symlink(self.filelink_target, self.filelink)
2404 self.assertTrue(os.path.exists(self.filelink))
2405 self.assertTrue(os.path.isfile(self.filelink))
2406 self.assertTrue(os.path.islink(self.filelink))
2407 self.check_stat(self.filelink, self.filelink_target)
2408
2409 def _create_missing_dir_link(self):
2410 'Create a "directory" link to a non-existent target'
2411 linkname = self.missing_link
2412 if os.path.lexists(linkname):
2413 os.remove(linkname)
2414 target = r'c:\\target does not exist.29r3c740'
2415 assert not os.path.exists(target)
2416 target_is_dir = True
2417 os.symlink(target, linkname, target_is_dir)
2418
2419 def test_remove_directory_link_to_missing_target(self):
2420 self._create_missing_dir_link()
2421 # For compatibility with Unix, os.remove will check the
2422 # directory status and call RemoveDirectory if the symlink
2423 # was created with target_is_dir==True.
2424 os.remove(self.missing_link)
2425
Brian Curtind40e6f72010-07-08 21:39:08 +00002426 def test_isdir_on_directory_link_to_missing_target(self):
2427 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002428 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002429
Brian Curtind40e6f72010-07-08 21:39:08 +00002430 def test_rmdir_on_directory_link_to_missing_target(self):
2431 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002432 os.rmdir(self.missing_link)
2433
2434 def check_stat(self, link, target):
2435 self.assertEqual(os.stat(link), os.stat(target))
2436 self.assertNotEqual(os.lstat(link), os.stat(link))
2437
Brian Curtind25aef52011-06-13 15:16:04 -05002438 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002439 self.assertEqual(os.stat(bytes_link), os.stat(target))
2440 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002441
2442 def test_12084(self):
2443 level1 = os.path.abspath(support.TESTFN)
2444 level2 = os.path.join(level1, "level2")
2445 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002446 self.addCleanup(support.rmtree, level1)
2447
2448 os.mkdir(level1)
2449 os.mkdir(level2)
2450 os.mkdir(level3)
2451
2452 file1 = os.path.abspath(os.path.join(level1, "file1"))
2453 create_file(file1)
2454
2455 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002456 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002457 os.chdir(level2)
2458 link = os.path.join(level2, "link")
2459 os.symlink(os.path.relpath(file1), "link")
2460 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002461
Victor Stinnerae39d232016-03-24 17:12:55 +01002462 # Check os.stat calls from the same dir as the link
2463 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002464
Victor Stinnerae39d232016-03-24 17:12:55 +01002465 # Check os.stat calls from a dir below the link
2466 os.chdir(level1)
2467 self.assertEqual(os.stat(file1),
2468 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002469
Victor Stinnerae39d232016-03-24 17:12:55 +01002470 # Check os.stat calls from a dir above the link
2471 os.chdir(level3)
2472 self.assertEqual(os.stat(file1),
2473 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002474 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002475 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002476
SSE43c34aad2018-02-13 00:10:35 +07002477 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2478 and os.path.exists(r'C:\ProgramData'),
2479 'Test directories not found')
2480 def test_29248(self):
2481 # os.symlink() calls CreateSymbolicLink, which creates
2482 # the reparse data buffer with the print name stored
2483 # first, so the offset is always 0. CreateSymbolicLink
2484 # stores the "PrintName" DOS path (e.g. "C:\") first,
2485 # with an offset of 0, followed by the "SubstituteName"
2486 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2487 # the other hand, seems to have been created manually
2488 # with an inverted order.
2489 target = os.readlink(r'C:\Users\All Users')
2490 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2491
Steve Dower6921e732018-03-05 14:26:08 -08002492 def test_buffer_overflow(self):
2493 # Older versions would have a buffer overflow when detecting
2494 # whether a link source was a directory. This test ensures we
2495 # no longer crash, but does not otherwise validate the behavior
2496 segment = 'X' * 27
2497 path = os.path.join(*[segment] * 10)
2498 test_cases = [
2499 # overflow with absolute src
2500 ('\\' + path, segment),
2501 # overflow dest with relative src
2502 (segment, path),
2503 # overflow when joining src
2504 (path[:180], path[:180]),
2505 ]
2506 for src, dest in test_cases:
2507 try:
2508 os.symlink(src, dest)
2509 except FileNotFoundError:
2510 pass
2511 else:
2512 try:
2513 os.remove(dest)
2514 except OSError:
2515 pass
2516 # Also test with bytes, since that is a separate code path.
2517 try:
2518 os.symlink(os.fsencode(src), os.fsencode(dest))
2519 except FileNotFoundError:
2520 pass
2521 else:
2522 try:
2523 os.remove(dest)
2524 except OSError:
2525 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002526
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002527 def test_appexeclink(self):
2528 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002529 if not os.path.isdir(root):
2530 self.skipTest("test requires a WindowsApps directory")
2531
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002532 aliases = [os.path.join(root, a)
2533 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2534
2535 for alias in aliases:
2536 if support.verbose:
2537 print()
2538 print("Testing with", alias)
2539 st = os.lstat(alias)
2540 self.assertEqual(st, os.stat(alias))
2541 self.assertFalse(stat.S_ISLNK(st.st_mode))
2542 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2543 # testing the first one we see is sufficient
2544 break
2545 else:
2546 self.skipTest("test requires an app execution alias")
2547
Tim Golden0321cf22014-05-05 19:46:17 +01002548@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2549class Win32JunctionTests(unittest.TestCase):
2550 junction = 'junctiontest'
2551 junction_target = os.path.dirname(os.path.abspath(__file__))
2552
2553 def setUp(self):
2554 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002555 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002556
2557 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002558 if os.path.lexists(self.junction):
2559 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002560
2561 def test_create_junction(self):
2562 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002563 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002564 self.assertTrue(os.path.exists(self.junction))
2565 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002566 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2567 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002568
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002569 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002570 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002571 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2572 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002573
2574 def test_unlink_removes_junction(self):
2575 _winapi.CreateJunction(self.junction_target, self.junction)
2576 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002577 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002578
2579 os.unlink(self.junction)
2580 self.assertFalse(os.path.exists(self.junction))
2581
Mark Becwarb82bfac2019-02-02 16:08:23 -05002582@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2583class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002584 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002585 nt = support.import_module('nt')
2586 ctypes = support.import_module('ctypes')
2587 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002588
2589 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2590 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2591
2592 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2593 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2594 ctypes.wintypes.LPDWORD)
2595
2596 # This is a pseudo-handle that doesn't need to be closed
2597 hproc = kernel.GetCurrentProcess()
2598
2599 handle_count = ctypes.wintypes.DWORD()
2600 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2601 self.assertEqual(1, ok)
2602
2603 before_count = handle_count.value
2604
2605 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002606 filenames = [
2607 r'\\?\C:',
2608 r'\\?\NUL',
2609 r'\\?\CONIN',
2610 __file__,
2611 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002612
Berker Peksag6ef726a2019-04-22 18:46:28 +03002613 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002614 for name in filenames:
2615 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002616 nt._getfinalpathname(name)
2617 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002618 # Failure is expected
2619 pass
2620 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002621 os.stat(name)
2622 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002623 pass
2624
2625 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2626 self.assertEqual(1, ok)
2627
2628 handle_delta = handle_count.value - before_count
2629
2630 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002631
Jason R. Coombs3a092862013-05-27 23:21:28 -04002632@support.skip_unless_symlink
2633class NonLocalSymlinkTests(unittest.TestCase):
2634
2635 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002636 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002637 Create this structure:
2638
2639 base
2640 \___ some_dir
2641 """
2642 os.makedirs('base/some_dir')
2643
2644 def tearDown(self):
2645 shutil.rmtree('base')
2646
2647 def test_directory_link_nonlocal(self):
2648 """
2649 The symlink target should resolve relative to the link, not relative
2650 to the current directory.
2651
2652 Then, link base/some_link -> base/some_dir and ensure that some_link
2653 is resolved as a directory.
2654
2655 In issue13772, it was discovered that directory detection failed if
2656 the symlink target was not specified relative to the current
2657 directory, which was a defect in the implementation.
2658 """
2659 src = os.path.join('base', 'some_link')
2660 os.symlink('some_dir', src)
2661 assert os.path.isdir(src)
2662
2663
Victor Stinnere8d51452010-08-19 01:05:19 +00002664class FSEncodingTests(unittest.TestCase):
2665 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002666 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2667 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002668
Victor Stinnere8d51452010-08-19 01:05:19 +00002669 def test_identity(self):
2670 # assert fsdecode(fsencode(x)) == x
2671 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2672 try:
2673 bytesfn = os.fsencode(fn)
2674 except UnicodeEncodeError:
2675 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002676 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002677
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002678
Brett Cannonefb00c02012-02-29 18:31:31 -05002679
2680class DeviceEncodingTests(unittest.TestCase):
2681
2682 def test_bad_fd(self):
2683 # Return None when an fd doesn't actually exist.
2684 self.assertIsNone(os.device_encoding(123456))
2685
Paul Monson62dfd7d2019-04-25 11:36:45 -07002686 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002687 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002688 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002689 def test_device_encoding(self):
2690 encoding = os.device_encoding(0)
2691 self.assertIsNotNone(encoding)
2692 self.assertTrue(codecs.lookup(encoding))
2693
2694
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002695class PidTests(unittest.TestCase):
2696 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2697 def test_getppid(self):
2698 p = subprocess.Popen([sys.executable, '-c',
2699 'import os; print(os.getppid())'],
2700 stdout=subprocess.PIPE)
2701 stdout, _ = p.communicate()
2702 # We are the parent of our subprocess
2703 self.assertEqual(int(stdout), os.getpid())
2704
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002705 def test_waitpid(self):
2706 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002707 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002708 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002709 status = os.waitpid(pid, 0)
2710 self.assertEqual(status, (pid, 0))
2711
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002712
Victor Stinner4659ccf2016-09-14 10:57:00 +02002713class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002714 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002715 self.exitcode = 17
2716
2717 filename = support.TESTFN
2718 self.addCleanup(support.unlink, filename)
2719
2720 if not with_env:
2721 code = 'import sys; sys.exit(%s)' % self.exitcode
2722 else:
2723 self.env = dict(os.environ)
2724 # create an unique key
2725 self.key = str(uuid.uuid4())
2726 self.env[self.key] = self.key
2727 # read the variable from os.environ to check that it exists
2728 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2729 % (self.key, self.exitcode))
2730
2731 with open(filename, "w") as fp:
2732 fp.write(code)
2733
Berker Peksag81816462016-09-15 20:19:47 +03002734 args = [sys.executable, filename]
2735 if use_bytes:
2736 args = [os.fsencode(a) for a in args]
2737 self.env = {os.fsencode(k): os.fsencode(v)
2738 for k, v in self.env.items()}
2739
2740 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002741
Berker Peksag4af23d72016-09-15 20:32:44 +03002742 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002743 def test_spawnl(self):
2744 args = self.create_args()
2745 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2746 self.assertEqual(exitcode, self.exitcode)
2747
Berker Peksag4af23d72016-09-15 20:32:44 +03002748 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002749 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002750 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002751 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2752 self.assertEqual(exitcode, self.exitcode)
2753
Berker Peksag4af23d72016-09-15 20:32:44 +03002754 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002755 def test_spawnlp(self):
2756 args = self.create_args()
2757 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2758 self.assertEqual(exitcode, self.exitcode)
2759
Berker Peksag4af23d72016-09-15 20:32:44 +03002760 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002761 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002762 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002763 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2764 self.assertEqual(exitcode, self.exitcode)
2765
Berker Peksag4af23d72016-09-15 20:32:44 +03002766 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002767 def test_spawnv(self):
2768 args = self.create_args()
2769 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2770 self.assertEqual(exitcode, self.exitcode)
2771
Berker Peksag4af23d72016-09-15 20:32:44 +03002772 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002773 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002774 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002775 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2776 self.assertEqual(exitcode, self.exitcode)
2777
Berker Peksag4af23d72016-09-15 20:32:44 +03002778 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002779 def test_spawnvp(self):
2780 args = self.create_args()
2781 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2782 self.assertEqual(exitcode, self.exitcode)
2783
Berker Peksag4af23d72016-09-15 20:32:44 +03002784 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002785 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002786 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002787 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2788 self.assertEqual(exitcode, self.exitcode)
2789
Berker Peksag4af23d72016-09-15 20:32:44 +03002790 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002791 def test_nowait(self):
2792 args = self.create_args()
2793 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2794 result = os.waitpid(pid, 0)
2795 self.assertEqual(result[0], pid)
2796 status = result[1]
2797 if hasattr(os, 'WIFEXITED'):
2798 self.assertTrue(os.WIFEXITED(status))
2799 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2800 else:
2801 self.assertEqual(status, self.exitcode << 8)
2802
Berker Peksag4af23d72016-09-15 20:32:44 +03002803 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002804 def test_spawnve_bytes(self):
2805 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2806 args = self.create_args(with_env=True, use_bytes=True)
2807 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2808 self.assertEqual(exitcode, self.exitcode)
2809
Steve Dower859fd7b2016-11-19 18:53:19 -08002810 @requires_os_func('spawnl')
2811 def test_spawnl_noargs(self):
2812 args = self.create_args()
2813 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002814 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002815
2816 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002817 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002818 args = self.create_args()
2819 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002820 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002821
2822 @requires_os_func('spawnv')
2823 def test_spawnv_noargs(self):
2824 args = self.create_args()
2825 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2826 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002827 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2828 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002829
2830 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002831 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002832 args = self.create_args()
2833 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2834 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002835 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2836 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002837
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002838 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002839 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002840
Ville Skyttä49b27342017-08-03 09:00:59 +03002841 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002842 newenv = os.environ.copy()
2843 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2844 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002845 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002846 except ValueError:
2847 pass
2848 else:
2849 self.assertEqual(exitcode, 127)
2850
Ville Skyttä49b27342017-08-03 09:00:59 +03002851 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002852 newenv = os.environ.copy()
2853 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2854 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002855 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002856 except ValueError:
2857 pass
2858 else:
2859 self.assertEqual(exitcode, 127)
2860
Ville Skyttä49b27342017-08-03 09:00:59 +03002861 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002862 newenv = os.environ.copy()
2863 newenv["FRUIT=ORANGE"] = "lemon"
2864 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002865 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002866 except ValueError:
2867 pass
2868 else:
2869 self.assertEqual(exitcode, 127)
2870
Ville Skyttä49b27342017-08-03 09:00:59 +03002871 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002872 filename = support.TESTFN
2873 self.addCleanup(support.unlink, filename)
2874 with open(filename, "w") as fp:
2875 fp.write('import sys, os\n'
2876 'if os.getenv("FRUIT") != "orange=lemon":\n'
2877 ' raise AssertionError')
2878 args = [sys.executable, filename]
2879 newenv = os.environ.copy()
2880 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002881 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002882 self.assertEqual(exitcode, 0)
2883
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002884 @requires_os_func('spawnve')
2885 def test_spawnve_invalid_env(self):
2886 self._test_invalid_env(os.spawnve)
2887
2888 @requires_os_func('spawnvpe')
2889 def test_spawnvpe_invalid_env(self):
2890 self._test_invalid_env(os.spawnvpe)
2891
Serhiy Storchaka77703942017-06-25 07:33:01 +03002892
Brian Curtin0151b8e2010-09-24 13:43:43 +00002893# The introduction of this TestCase caused at least two different errors on
2894# *nix buildbots. Temporarily skip this to let the buildbots move along.
2895@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002896@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2897class LoginTests(unittest.TestCase):
2898 def test_getlogin(self):
2899 user_name = os.getlogin()
2900 self.assertNotEqual(len(user_name), 0)
2901
2902
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002903@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2904 "needs os.getpriority and os.setpriority")
2905class ProgramPriorityTests(unittest.TestCase):
2906 """Tests for os.getpriority() and os.setpriority()."""
2907
2908 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002909
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002910 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2911 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2912 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002913 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2914 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002915 raise unittest.SkipTest("unable to reliably test setpriority "
2916 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002917 else:
2918 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002919 finally:
2920 try:
2921 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2922 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002923 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002924 raise
2925
2926
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002928
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002929 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002930
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 def __init__(self, conn):
2932 asynchat.async_chat.__init__(self, conn)
2933 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002934 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002935 self.closed = False
2936 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002937
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002938 def handle_read(self):
2939 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002940 if self.accumulate:
2941 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002942
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 def get_data(self):
2944 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002945
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002946 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002947 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002949
2950 def handle_error(self):
2951 raise
2952
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002953 def __init__(self, address):
2954 threading.Thread.__init__(self)
2955 asyncore.dispatcher.__init__(self)
2956 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2957 self.bind(address)
2958 self.listen(5)
2959 self.host, self.port = self.socket.getsockname()[:2]
2960 self.handler_instance = None
2961 self._active = False
2962 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002963
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002964 # --- public API
2965
2966 @property
2967 def running(self):
2968 return self._active
2969
2970 def start(self):
2971 assert not self.running
2972 self.__flag = threading.Event()
2973 threading.Thread.start(self)
2974 self.__flag.wait()
2975
2976 def stop(self):
2977 assert self.running
2978 self._active = False
2979 self.join()
2980
2981 def wait(self):
2982 # wait for handler connection to be closed, then stop the server
2983 while not getattr(self.handler_instance, "closed", False):
2984 time.sleep(0.001)
2985 self.stop()
2986
2987 # --- internals
2988
2989 def run(self):
2990 self._active = True
2991 self.__flag.set()
2992 while self._active and asyncore.socket_map:
2993 self._active_lock.acquire()
2994 asyncore.loop(timeout=0.001, count=1)
2995 self._active_lock.release()
2996 asyncore.close_all()
2997
2998 def handle_accept(self):
2999 conn, addr = self.accept()
3000 self.handler_instance = self.Handler(conn)
3001
3002 def handle_connect(self):
3003 self.close()
3004 handle_read = handle_connect
3005
3006 def writable(self):
3007 return 0
3008
3009 def handle_error(self):
3010 raise
3011
3012
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003013@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3014class TestSendfile(unittest.TestCase):
3015
Victor Stinner8c663fd2017-11-08 14:44:44 -08003016 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003017 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003018 not sys.platform.startswith("solaris") and \
3019 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003020 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3021 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003022 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3023 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003024
3025 @classmethod
3026 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003027 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01003028 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003029
3030 @classmethod
3031 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003032 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003033 support.unlink(support.TESTFN)
3034
3035 def setUp(self):
3036 self.server = SendfileTestServer((support.HOST, 0))
3037 self.server.start()
3038 self.client = socket.socket()
3039 self.client.connect((self.server.host, self.server.port))
3040 self.client.settimeout(1)
3041 # synchronize by waiting for "220 ready" response
3042 self.client.recv(1024)
3043 self.sockno = self.client.fileno()
3044 self.file = open(support.TESTFN, 'rb')
3045 self.fileno = self.file.fileno()
3046
3047 def tearDown(self):
3048 self.file.close()
3049 self.client.close()
3050 if self.server.running:
3051 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003052 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003053
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003054 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003055 """A higher level wrapper representing how an application is
3056 supposed to use sendfile().
3057 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003058 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003059 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003060 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003061 except OSError as err:
3062 if err.errno == errno.ECONNRESET:
3063 # disconnected
3064 raise
3065 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3066 # we have to retry send data
3067 continue
3068 else:
3069 raise
3070
3071 def test_send_whole_file(self):
3072 # normal send
3073 total_sent = 0
3074 offset = 0
3075 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003076 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003077 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3078 if sent == 0:
3079 break
3080 offset += sent
3081 total_sent += sent
3082 self.assertTrue(sent <= nbytes)
3083 self.assertEqual(offset, total_sent)
3084
3085 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003086 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003087 self.client.close()
3088 self.server.wait()
3089 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003090 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003091 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003092
3093 def test_send_at_certain_offset(self):
3094 # start sending a file at a certain offset
3095 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003096 offset = len(self.DATA) // 2
3097 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003098 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003099 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003100 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3101 if sent == 0:
3102 break
3103 offset += sent
3104 total_sent += sent
3105 self.assertTrue(sent <= nbytes)
3106
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003107 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003108 self.client.close()
3109 self.server.wait()
3110 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003111 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003112 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003113 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003114 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003115
3116 def test_offset_overflow(self):
3117 # specify an offset > file size
3118 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003119 try:
3120 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3121 except OSError as e:
3122 # Solaris can raise EINVAL if offset >= file length, ignore.
3123 if e.errno != errno.EINVAL:
3124 raise
3125 else:
3126 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003127 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003128 self.client.close()
3129 self.server.wait()
3130 data = self.server.handler_instance.get_data()
3131 self.assertEqual(data, b'')
3132
3133 def test_invalid_offset(self):
3134 with self.assertRaises(OSError) as cm:
3135 os.sendfile(self.sockno, self.fileno, -1, 4096)
3136 self.assertEqual(cm.exception.errno, errno.EINVAL)
3137
Martin Panterbf19d162015-09-09 01:01:13 +00003138 def test_keywords(self):
3139 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003140 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3141 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003142 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003143 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3144 offset=0, count=4096,
3145 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003146
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003147 # --- headers / trailers tests
3148
Serhiy Storchaka43767632013-11-03 21:31:38 +02003149 @requires_headers_trailers
3150 def test_headers(self):
3151 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003152 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003153 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003154 headers=[b"x" * 512, b"y" * 256])
3155 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003156 total_sent += sent
3157 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003158 while total_sent < len(expected_data):
3159 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003160 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3161 offset, nbytes)
3162 if sent == 0:
3163 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003164 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003165 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003166 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003167
Serhiy Storchaka43767632013-11-03 21:31:38 +02003168 self.assertEqual(total_sent, len(expected_data))
3169 self.client.close()
3170 self.server.wait()
3171 data = self.server.handler_instance.get_data()
3172 self.assertEqual(hash(data), hash(expected_data))
3173
3174 @requires_headers_trailers
3175 def test_trailers(self):
3176 TESTFN2 = support.TESTFN + "2"
3177 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003178
3179 self.addCleanup(support.unlink, TESTFN2)
3180 create_file(TESTFN2, file_data)
3181
3182 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003183 os.sendfile(self.sockno, f.fileno(), 0, 5,
3184 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003185 self.client.close()
3186 self.server.wait()
3187 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003188 self.assertEqual(data, b"abcde123456789")
3189
3190 @requires_headers_trailers
3191 @requires_32b
3192 def test_headers_overflow_32bits(self):
3193 self.server.handler_instance.accumulate = False
3194 with self.assertRaises(OSError) as cm:
3195 os.sendfile(self.sockno, self.fileno, 0, 0,
3196 headers=[b"x" * 2**16] * 2**15)
3197 self.assertEqual(cm.exception.errno, errno.EINVAL)
3198
3199 @requires_headers_trailers
3200 @requires_32b
3201 def test_trailers_overflow_32bits(self):
3202 self.server.handler_instance.accumulate = False
3203 with self.assertRaises(OSError) as cm:
3204 os.sendfile(self.sockno, self.fileno, 0, 0,
3205 trailers=[b"x" * 2**16] * 2**15)
3206 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003207
Serhiy Storchaka43767632013-11-03 21:31:38 +02003208 @requires_headers_trailers
3209 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3210 'test needs os.SF_NODISKIO')
3211 def test_flags(self):
3212 try:
3213 os.sendfile(self.sockno, self.fileno, 0, 4096,
3214 flags=os.SF_NODISKIO)
3215 except OSError as err:
3216 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3217 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003218
3219
Larry Hastings9cf065c2012-06-22 16:30:09 -07003220def supports_extended_attributes():
3221 if not hasattr(os, "setxattr"):
3222 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003223
Larry Hastings9cf065c2012-06-22 16:30:09 -07003224 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003225 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003226 try:
3227 os.setxattr(fp.fileno(), b"user.test", b"")
3228 except OSError:
3229 return False
3230 finally:
3231 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003232
3233 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003234
3235
3236@unittest.skipUnless(supports_extended_attributes(),
3237 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003238# Kernels < 2.6.39 don't respect setxattr flags.
3239@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003240class ExtendedAttributeTests(unittest.TestCase):
3241
Larry Hastings9cf065c2012-06-22 16:30:09 -07003242 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003243 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003244 self.addCleanup(support.unlink, fn)
3245 create_file(fn)
3246
Benjamin Peterson799bd802011-08-31 22:15:17 -04003247 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003248 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003249 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003250
Victor Stinnerf12e5062011-10-16 22:12:03 +02003251 init_xattr = listxattr(fn)
3252 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003253
Larry Hastings9cf065c2012-06-22 16:30:09 -07003254 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003255 xattr = set(init_xattr)
3256 xattr.add("user.test")
3257 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003258 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3259 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3260 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003261
Benjamin Peterson799bd802011-08-31 22:15:17 -04003262 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003263 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003264 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003265
Benjamin Peterson799bd802011-08-31 22:15:17 -04003266 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003267 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003268 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003269
Larry Hastings9cf065c2012-06-22 16:30:09 -07003270 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003271 xattr.add("user.test2")
3272 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003273 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003274
Benjamin Peterson799bd802011-08-31 22:15:17 -04003275 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003276 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003277 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003278
Victor Stinnerf12e5062011-10-16 22:12:03 +02003279 xattr.remove("user.test")
3280 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003281 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3282 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3283 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3284 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003285 many = sorted("user.test{}".format(i) for i in range(100))
3286 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003287 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003288 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003289
Larry Hastings9cf065c2012-06-22 16:30:09 -07003290 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003291 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003292 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003293
3294 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3295 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003296
3297 def test_simple(self):
3298 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3299 os.listxattr)
3300
3301 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003302 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3303 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003304
3305 def test_fds(self):
3306 def getxattr(path, *args):
3307 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003308 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003309 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003310 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003311 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003312 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003313 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003314 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003315 def listxattr(path, *args):
3316 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003317 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003318 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3319
3320
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003321@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3322class TermsizeTests(unittest.TestCase):
3323 def test_does_not_crash(self):
3324 """Check if get_terminal_size() returns a meaningful value.
3325
3326 There's no easy portable way to actually check the size of the
3327 terminal, so let's check if it returns something sensible instead.
3328 """
3329 try:
3330 size = os.get_terminal_size()
3331 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003332 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003333 # Under win32 a generic OSError can be thrown if the
3334 # handle cannot be retrieved
3335 self.skipTest("failed to query terminal size")
3336 raise
3337
Antoine Pitroucfade362012-02-08 23:48:59 +01003338 self.assertGreaterEqual(size.columns, 0)
3339 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003340
3341 def test_stty_match(self):
3342 """Check if stty returns the same results
3343
3344 stty actually tests stdin, so get_terminal_size is invoked on
3345 stdin explicitly. If stty succeeded, then get_terminal_size()
3346 should work too.
3347 """
3348 try:
3349 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003350 except (FileNotFoundError, subprocess.CalledProcessError,
3351 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003352 self.skipTest("stty invocation failed")
3353 expected = (int(size[1]), int(size[0])) # reversed order
3354
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003355 try:
3356 actual = os.get_terminal_size(sys.__stdin__.fileno())
3357 except OSError as e:
3358 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3359 # Under win32 a generic OSError can be thrown if the
3360 # handle cannot be retrieved
3361 self.skipTest("failed to query terminal size")
3362 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003363 self.assertEqual(expected, actual)
3364
3365
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003366@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003367@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003368class MemfdCreateTests(unittest.TestCase):
3369 def test_memfd_create(self):
3370 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3371 self.assertNotEqual(fd, -1)
3372 self.addCleanup(os.close, fd)
3373 self.assertFalse(os.get_inheritable(fd))
3374 with open(fd, "wb", closefd=False) as f:
3375 f.write(b'memfd_create')
3376 self.assertEqual(f.tell(), 12)
3377
3378 fd2 = os.memfd_create("Hi")
3379 self.addCleanup(os.close, fd2)
3380 self.assertFalse(os.get_inheritable(fd2))
3381
3382
Victor Stinner292c8352012-10-30 02:17:38 +01003383class OSErrorTests(unittest.TestCase):
3384 def setUp(self):
3385 class Str(str):
3386 pass
3387
Victor Stinnerafe17062012-10-31 22:47:43 +01003388 self.bytes_filenames = []
3389 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003390 if support.TESTFN_UNENCODABLE is not None:
3391 decoded = support.TESTFN_UNENCODABLE
3392 else:
3393 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003394 self.unicode_filenames.append(decoded)
3395 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003396 if support.TESTFN_UNDECODABLE is not None:
3397 encoded = support.TESTFN_UNDECODABLE
3398 else:
3399 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003400 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003401 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003402 self.bytes_filenames.append(memoryview(encoded))
3403
3404 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003405
3406 def test_oserror_filename(self):
3407 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003408 (self.filenames, os.chdir,),
3409 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003410 (self.filenames, os.lstat,),
3411 (self.filenames, os.open, os.O_RDONLY),
3412 (self.filenames, os.rmdir,),
3413 (self.filenames, os.stat,),
3414 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003415 ]
3416 if sys.platform == "win32":
3417 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003418 (self.bytes_filenames, os.rename, b"dst"),
3419 (self.bytes_filenames, os.replace, b"dst"),
3420 (self.unicode_filenames, os.rename, "dst"),
3421 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003422 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003423 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003424 else:
3425 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003426 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003427 (self.filenames, os.rename, "dst"),
3428 (self.filenames, os.replace, "dst"),
3429 ))
3430 if hasattr(os, "chown"):
3431 funcs.append((self.filenames, os.chown, 0, 0))
3432 if hasattr(os, "lchown"):
3433 funcs.append((self.filenames, os.lchown, 0, 0))
3434 if hasattr(os, "truncate"):
3435 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003436 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003437 funcs.append((self.filenames, os.chflags, 0))
3438 if hasattr(os, "lchflags"):
3439 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003440 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003441 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003442 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003443 if sys.platform == "win32":
3444 funcs.append((self.bytes_filenames, os.link, b"dst"))
3445 funcs.append((self.unicode_filenames, os.link, "dst"))
3446 else:
3447 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003448 if hasattr(os, "listxattr"):
3449 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003450 (self.filenames, os.listxattr,),
3451 (self.filenames, os.getxattr, "user.test"),
3452 (self.filenames, os.setxattr, "user.test", b'user'),
3453 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003454 ))
3455 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003456 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003457 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003458 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003459
Steve Dowercc16be82016-09-08 10:35:16 -07003460
Victor Stinnerafe17062012-10-31 22:47:43 +01003461 for filenames, func, *func_args in funcs:
3462 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003463 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003464 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003465 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003466 else:
3467 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3468 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003469 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003470 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003471 except UnicodeDecodeError:
3472 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003473 else:
3474 self.fail("No exception thrown by {}".format(func))
3475
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003476class CPUCountTests(unittest.TestCase):
3477 def test_cpu_count(self):
3478 cpus = os.cpu_count()
3479 if cpus is not None:
3480 self.assertIsInstance(cpus, int)
3481 self.assertGreater(cpus, 0)
3482 else:
3483 self.skipTest("Could not determine the number of CPUs")
3484
Victor Stinnerdaf45552013-08-28 00:53:59 +02003485
3486class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003487 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003488 fd = os.open(__file__, os.O_RDONLY)
3489 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003490 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003491
Victor Stinnerdaf45552013-08-28 00:53:59 +02003492 os.set_inheritable(fd, True)
3493 self.assertEqual(os.get_inheritable(fd), True)
3494
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003495 @unittest.skipIf(fcntl is None, "need fcntl")
3496 def test_get_inheritable_cloexec(self):
3497 fd = os.open(__file__, os.O_RDONLY)
3498 self.addCleanup(os.close, fd)
3499 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003500
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003501 # clear FD_CLOEXEC flag
3502 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3503 flags &= ~fcntl.FD_CLOEXEC
3504 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003505
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003506 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003507
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003508 @unittest.skipIf(fcntl is None, "need fcntl")
3509 def test_set_inheritable_cloexec(self):
3510 fd = os.open(__file__, os.O_RDONLY)
3511 self.addCleanup(os.close, fd)
3512 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3513 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003514
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003515 os.set_inheritable(fd, True)
3516 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3517 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003518
Victor Stinnerdaf45552013-08-28 00:53:59 +02003519 def test_open(self):
3520 fd = os.open(__file__, os.O_RDONLY)
3521 self.addCleanup(os.close, fd)
3522 self.assertEqual(os.get_inheritable(fd), False)
3523
3524 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3525 def test_pipe(self):
3526 rfd, wfd = os.pipe()
3527 self.addCleanup(os.close, rfd)
3528 self.addCleanup(os.close, wfd)
3529 self.assertEqual(os.get_inheritable(rfd), False)
3530 self.assertEqual(os.get_inheritable(wfd), False)
3531
3532 def test_dup(self):
3533 fd1 = os.open(__file__, os.O_RDONLY)
3534 self.addCleanup(os.close, fd1)
3535
3536 fd2 = os.dup(fd1)
3537 self.addCleanup(os.close, fd2)
3538 self.assertEqual(os.get_inheritable(fd2), False)
3539
Zackery Spytz5be66602019-08-23 12:38:41 -06003540 def test_dup_standard_stream(self):
3541 fd = os.dup(1)
3542 self.addCleanup(os.close, fd)
3543 self.assertGreater(fd, 0)
3544
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003545 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3546 def test_dup_nul(self):
3547 # os.dup() was creating inheritable fds for character files.
3548 fd1 = os.open('NUL', os.O_RDONLY)
3549 self.addCleanup(os.close, fd1)
3550 fd2 = os.dup(fd1)
3551 self.addCleanup(os.close, fd2)
3552 self.assertFalse(os.get_inheritable(fd2))
3553
Victor Stinnerdaf45552013-08-28 00:53:59 +02003554 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3555 def test_dup2(self):
3556 fd = os.open(__file__, os.O_RDONLY)
3557 self.addCleanup(os.close, fd)
3558
3559 # inheritable by default
3560 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003561 self.addCleanup(os.close, fd2)
3562 self.assertEqual(os.dup2(fd, fd2), fd2)
3563 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003564
3565 # force non-inheritable
3566 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003567 self.addCleanup(os.close, fd3)
3568 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3569 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003570
3571 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3572 def test_openpty(self):
3573 master_fd, slave_fd = os.openpty()
3574 self.addCleanup(os.close, master_fd)
3575 self.addCleanup(os.close, slave_fd)
3576 self.assertEqual(os.get_inheritable(master_fd), False)
3577 self.assertEqual(os.get_inheritable(slave_fd), False)
3578
3579
Brett Cannon3f9183b2016-08-26 14:44:48 -07003580class PathTConverterTests(unittest.TestCase):
3581 # tuples of (function name, allows fd arguments, additional arguments to
3582 # function, cleanup function)
3583 functions = [
3584 ('stat', True, (), None),
3585 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003586 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003587 ('chflags', False, (0,), None),
3588 ('lchflags', False, (0,), None),
3589 ('open', False, (0,), getattr(os, 'close', None)),
3590 ]
3591
3592 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003593 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003594 if os.name == 'nt':
3595 bytes_fspath = bytes_filename = None
3596 else:
3597 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003598 bytes_fspath = FakePath(bytes_filename)
3599 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003600 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003601 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003602
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003603 int_fspath = FakePath(fd)
3604 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003605
3606 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3607 with self.subTest(name=name):
3608 try:
3609 fn = getattr(os, name)
3610 except AttributeError:
3611 continue
3612
Brett Cannon8f96a302016-08-26 19:30:11 -07003613 for path in (str_filename, bytes_filename, str_fspath,
3614 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003615 if path is None:
3616 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003617 with self.subTest(name=name, path=path):
3618 result = fn(path, *extra_args)
3619 if cleanup_fn is not None:
3620 cleanup_fn(result)
3621
3622 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003623 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003624 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003625
3626 if allow_fd:
3627 result = fn(fd, *extra_args) # should not fail
3628 if cleanup_fn is not None:
3629 cleanup_fn(result)
3630 else:
3631 with self.assertRaisesRegex(
3632 TypeError,
3633 'os.PathLike'):
3634 fn(fd, *extra_args)
3635
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003636 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003637 msg = r'__fspath__\(\) to return str or bytes, not %s'
3638 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003639 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003640 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003641 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003642 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003643 os.stat(FakePath(object()))
3644
Brett Cannon3f9183b2016-08-26 14:44:48 -07003645
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003646@unittest.skipUnless(hasattr(os, 'get_blocking'),
3647 'needs os.get_blocking() and os.set_blocking()')
3648class BlockingTests(unittest.TestCase):
3649 def test_blocking(self):
3650 fd = os.open(__file__, os.O_RDONLY)
3651 self.addCleanup(os.close, fd)
3652 self.assertEqual(os.get_blocking(fd), True)
3653
3654 os.set_blocking(fd, False)
3655 self.assertEqual(os.get_blocking(fd), False)
3656
3657 os.set_blocking(fd, True)
3658 self.assertEqual(os.get_blocking(fd), True)
3659
3660
Yury Selivanov97e2e062014-09-26 12:33:06 -04003661
3662class ExportsTests(unittest.TestCase):
3663 def test_os_all(self):
3664 self.assertIn('open', os.__all__)
3665 self.assertIn('walk', os.__all__)
3666
3667
Eddie Elizondob3966632019-11-05 07:16:14 -08003668class TestDirEntry(unittest.TestCase):
3669 def setUp(self):
3670 self.path = os.path.realpath(support.TESTFN)
3671 self.addCleanup(support.rmtree, self.path)
3672 os.mkdir(self.path)
3673
3674 def test_uninstantiable(self):
3675 self.assertRaises(TypeError, os.DirEntry)
3676
3677 def test_unpickable(self):
3678 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3679 entry = [entry for entry in os.scandir(self.path)].pop()
3680 self.assertIsInstance(entry, os.DirEntry)
3681 self.assertEqual(entry.name, "file.txt")
3682 import pickle
3683 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3684
3685
Victor Stinner6036e442015-03-08 01:58:04 +01003686class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003687 check_no_resource_warning = support.check_no_resource_warning
3688
Victor Stinner6036e442015-03-08 01:58:04 +01003689 def setUp(self):
3690 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003691 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003692 self.addCleanup(support.rmtree, self.path)
3693 os.mkdir(self.path)
3694
3695 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003696 path = self.bytes_path if isinstance(name, bytes) else self.path
3697 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003698 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003699 return filename
3700
3701 def get_entries(self, names):
3702 entries = dict((entry.name, entry)
3703 for entry in os.scandir(self.path))
3704 self.assertEqual(sorted(entries.keys()), names)
3705 return entries
3706
3707 def assert_stat_equal(self, stat1, stat2, skip_fields):
3708 if skip_fields:
3709 for attr in dir(stat1):
3710 if not attr.startswith("st_"):
3711 continue
3712 if attr in ("st_dev", "st_ino", "st_nlink"):
3713 continue
3714 self.assertEqual(getattr(stat1, attr),
3715 getattr(stat2, attr),
3716 (stat1, stat2, attr))
3717 else:
3718 self.assertEqual(stat1, stat2)
3719
Eddie Elizondob3966632019-11-05 07:16:14 -08003720 def test_uninstantiable(self):
3721 scandir_iter = os.scandir(self.path)
3722 self.assertRaises(TypeError, type(scandir_iter))
3723 scandir_iter.close()
3724
3725 def test_unpickable(self):
3726 filename = self.create_file("file.txt")
3727 scandir_iter = os.scandir(self.path)
3728 import pickle
3729 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3730 scandir_iter.close()
3731
Victor Stinner6036e442015-03-08 01:58:04 +01003732 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003733 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003734 self.assertEqual(entry.name, name)
3735 self.assertEqual(entry.path, os.path.join(self.path, name))
3736 self.assertEqual(entry.inode(),
3737 os.stat(entry.path, follow_symlinks=False).st_ino)
3738
3739 entry_stat = os.stat(entry.path)
3740 self.assertEqual(entry.is_dir(),
3741 stat.S_ISDIR(entry_stat.st_mode))
3742 self.assertEqual(entry.is_file(),
3743 stat.S_ISREG(entry_stat.st_mode))
3744 self.assertEqual(entry.is_symlink(),
3745 os.path.islink(entry.path))
3746
3747 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3748 self.assertEqual(entry.is_dir(follow_symlinks=False),
3749 stat.S_ISDIR(entry_lstat.st_mode))
3750 self.assertEqual(entry.is_file(follow_symlinks=False),
3751 stat.S_ISREG(entry_lstat.st_mode))
3752
3753 self.assert_stat_equal(entry.stat(),
3754 entry_stat,
3755 os.name == 'nt' and not is_symlink)
3756 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3757 entry_lstat,
3758 os.name == 'nt')
3759
3760 def test_attributes(self):
3761 link = hasattr(os, 'link')
3762 symlink = support.can_symlink()
3763
3764 dirname = os.path.join(self.path, "dir")
3765 os.mkdir(dirname)
3766 filename = self.create_file("file.txt")
3767 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003768 try:
3769 os.link(filename, os.path.join(self.path, "link_file.txt"))
3770 except PermissionError as e:
3771 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003772 if symlink:
3773 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3774 target_is_directory=True)
3775 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3776
3777 names = ['dir', 'file.txt']
3778 if link:
3779 names.append('link_file.txt')
3780 if symlink:
3781 names.extend(('symlink_dir', 'symlink_file.txt'))
3782 entries = self.get_entries(names)
3783
3784 entry = entries['dir']
3785 self.check_entry(entry, 'dir', True, False, False)
3786
3787 entry = entries['file.txt']
3788 self.check_entry(entry, 'file.txt', False, True, False)
3789
3790 if link:
3791 entry = entries['link_file.txt']
3792 self.check_entry(entry, 'link_file.txt', False, True, False)
3793
3794 if symlink:
3795 entry = entries['symlink_dir']
3796 self.check_entry(entry, 'symlink_dir', True, False, True)
3797
3798 entry = entries['symlink_file.txt']
3799 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3800
3801 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003802 path = self.bytes_path if isinstance(name, bytes) else self.path
3803 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003804 self.assertEqual(len(entries), 1)
3805
3806 entry = entries[0]
3807 self.assertEqual(entry.name, name)
3808 return entry
3809
Brett Cannon96881cd2016-06-10 14:37:21 -07003810 def create_file_entry(self, name='file.txt'):
3811 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003812 return self.get_entry(os.path.basename(filename))
3813
3814 def test_current_directory(self):
3815 filename = self.create_file()
3816 old_dir = os.getcwd()
3817 try:
3818 os.chdir(self.path)
3819
3820 # call scandir() without parameter: it must list the content
3821 # of the current directory
3822 entries = dict((entry.name, entry) for entry in os.scandir())
3823 self.assertEqual(sorted(entries.keys()),
3824 [os.path.basename(filename)])
3825 finally:
3826 os.chdir(old_dir)
3827
3828 def test_repr(self):
3829 entry = self.create_file_entry()
3830 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3831
Brett Cannon96881cd2016-06-10 14:37:21 -07003832 def test_fspath_protocol(self):
3833 entry = self.create_file_entry()
3834 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3835
3836 def test_fspath_protocol_bytes(self):
3837 bytes_filename = os.fsencode('bytesfile.txt')
3838 bytes_entry = self.create_file_entry(name=bytes_filename)
3839 fspath = os.fspath(bytes_entry)
3840 self.assertIsInstance(fspath, bytes)
3841 self.assertEqual(fspath,
3842 os.path.join(os.fsencode(self.path),bytes_filename))
3843
Victor Stinner6036e442015-03-08 01:58:04 +01003844 def test_removed_dir(self):
3845 path = os.path.join(self.path, 'dir')
3846
3847 os.mkdir(path)
3848 entry = self.get_entry('dir')
3849 os.rmdir(path)
3850
3851 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3852 if os.name == 'nt':
3853 self.assertTrue(entry.is_dir())
3854 self.assertFalse(entry.is_file())
3855 self.assertFalse(entry.is_symlink())
3856 if os.name == 'nt':
3857 self.assertRaises(FileNotFoundError, entry.inode)
3858 # don't fail
3859 entry.stat()
3860 entry.stat(follow_symlinks=False)
3861 else:
3862 self.assertGreater(entry.inode(), 0)
3863 self.assertRaises(FileNotFoundError, entry.stat)
3864 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3865
3866 def test_removed_file(self):
3867 entry = self.create_file_entry()
3868 os.unlink(entry.path)
3869
3870 self.assertFalse(entry.is_dir())
3871 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3872 if os.name == 'nt':
3873 self.assertTrue(entry.is_file())
3874 self.assertFalse(entry.is_symlink())
3875 if os.name == 'nt':
3876 self.assertRaises(FileNotFoundError, entry.inode)
3877 # don't fail
3878 entry.stat()
3879 entry.stat(follow_symlinks=False)
3880 else:
3881 self.assertGreater(entry.inode(), 0)
3882 self.assertRaises(FileNotFoundError, entry.stat)
3883 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3884
3885 def test_broken_symlink(self):
3886 if not support.can_symlink():
3887 return self.skipTest('cannot create symbolic link')
3888
3889 filename = self.create_file("file.txt")
3890 os.symlink(filename,
3891 os.path.join(self.path, "symlink.txt"))
3892 entries = self.get_entries(['file.txt', 'symlink.txt'])
3893 entry = entries['symlink.txt']
3894 os.unlink(filename)
3895
3896 self.assertGreater(entry.inode(), 0)
3897 self.assertFalse(entry.is_dir())
3898 self.assertFalse(entry.is_file()) # broken symlink returns False
3899 self.assertFalse(entry.is_dir(follow_symlinks=False))
3900 self.assertFalse(entry.is_file(follow_symlinks=False))
3901 self.assertTrue(entry.is_symlink())
3902 self.assertRaises(FileNotFoundError, entry.stat)
3903 # don't fail
3904 entry.stat(follow_symlinks=False)
3905
3906 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003907 self.create_file("file.txt")
3908
3909 path_bytes = os.fsencode(self.path)
3910 entries = list(os.scandir(path_bytes))
3911 self.assertEqual(len(entries), 1, entries)
3912 entry = entries[0]
3913
3914 self.assertEqual(entry.name, b'file.txt')
3915 self.assertEqual(entry.path,
3916 os.fsencode(os.path.join(self.path, 'file.txt')))
3917
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003918 def test_bytes_like(self):
3919 self.create_file("file.txt")
3920
3921 for cls in bytearray, memoryview:
3922 path_bytes = cls(os.fsencode(self.path))
3923 with self.assertWarns(DeprecationWarning):
3924 entries = list(os.scandir(path_bytes))
3925 self.assertEqual(len(entries), 1, entries)
3926 entry = entries[0]
3927
3928 self.assertEqual(entry.name, b'file.txt')
3929 self.assertEqual(entry.path,
3930 os.fsencode(os.path.join(self.path, 'file.txt')))
3931 self.assertIs(type(entry.name), bytes)
3932 self.assertIs(type(entry.path), bytes)
3933
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003934 @unittest.skipUnless(os.listdir in os.supports_fd,
3935 'fd support for listdir required for this test.')
3936 def test_fd(self):
3937 self.assertIn(os.scandir, os.supports_fd)
3938 self.create_file('file.txt')
3939 expected_names = ['file.txt']
3940 if support.can_symlink():
3941 os.symlink('file.txt', os.path.join(self.path, 'link'))
3942 expected_names.append('link')
3943
3944 fd = os.open(self.path, os.O_RDONLY)
3945 try:
3946 with os.scandir(fd) as it:
3947 entries = list(it)
3948 names = [entry.name for entry in entries]
3949 self.assertEqual(sorted(names), expected_names)
3950 self.assertEqual(names, os.listdir(fd))
3951 for entry in entries:
3952 self.assertEqual(entry.path, entry.name)
3953 self.assertEqual(os.fspath(entry), entry.name)
3954 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3955 if os.stat in os.supports_dir_fd:
3956 st = os.stat(entry.name, dir_fd=fd)
3957 self.assertEqual(entry.stat(), st)
3958 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3959 self.assertEqual(entry.stat(follow_symlinks=False), st)
3960 finally:
3961 os.close(fd)
3962
Victor Stinner6036e442015-03-08 01:58:04 +01003963 def test_empty_path(self):
3964 self.assertRaises(FileNotFoundError, os.scandir, '')
3965
3966 def test_consume_iterator_twice(self):
3967 self.create_file("file.txt")
3968 iterator = os.scandir(self.path)
3969
3970 entries = list(iterator)
3971 self.assertEqual(len(entries), 1, entries)
3972
3973 # check than consuming the iterator twice doesn't raise exception
3974 entries2 = list(iterator)
3975 self.assertEqual(len(entries2), 0, entries2)
3976
3977 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003978 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003979 self.assertRaises(TypeError, os.scandir, obj)
3980
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003981 def test_close(self):
3982 self.create_file("file.txt")
3983 self.create_file("file2.txt")
3984 iterator = os.scandir(self.path)
3985 next(iterator)
3986 iterator.close()
3987 # multiple closes
3988 iterator.close()
3989 with self.check_no_resource_warning():
3990 del iterator
3991
3992 def test_context_manager(self):
3993 self.create_file("file.txt")
3994 self.create_file("file2.txt")
3995 with os.scandir(self.path) as iterator:
3996 next(iterator)
3997 with self.check_no_resource_warning():
3998 del iterator
3999
4000 def test_context_manager_close(self):
4001 self.create_file("file.txt")
4002 self.create_file("file2.txt")
4003 with os.scandir(self.path) as iterator:
4004 next(iterator)
4005 iterator.close()
4006
4007 def test_context_manager_exception(self):
4008 self.create_file("file.txt")
4009 self.create_file("file2.txt")
4010 with self.assertRaises(ZeroDivisionError):
4011 with os.scandir(self.path) as iterator:
4012 next(iterator)
4013 1/0
4014 with self.check_no_resource_warning():
4015 del iterator
4016
4017 def test_resource_warning(self):
4018 self.create_file("file.txt")
4019 self.create_file("file2.txt")
4020 iterator = os.scandir(self.path)
4021 next(iterator)
4022 with self.assertWarns(ResourceWarning):
4023 del iterator
4024 support.gc_collect()
4025 # exhausted iterator
4026 iterator = os.scandir(self.path)
4027 list(iterator)
4028 with self.check_no_resource_warning():
4029 del iterator
4030
Victor Stinner6036e442015-03-08 01:58:04 +01004031
Ethan Furmancdc08792016-06-02 15:06:09 -07004032class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004033
4034 # Abstracted so it can be overridden to test pure Python implementation
4035 # if a C version is provided.
4036 fspath = staticmethod(os.fspath)
4037
Ethan Furmancdc08792016-06-02 15:06:09 -07004038 def test_return_bytes(self):
4039 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004040 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004041
4042 def test_return_string(self):
4043 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004044 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004045
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004046 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004047 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004048 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004049
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004050 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004051 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4052 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4053
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004054 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004055 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4056 self.assertTrue(issubclass(FakePath, os.PathLike))
4057 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004058
Ethan Furmancdc08792016-06-02 15:06:09 -07004059 def test_garbage_in_exception_out(self):
4060 vapor = type('blah', (), {})
4061 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004062 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004063
4064 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004065 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004066
Brett Cannon044283a2016-07-15 10:41:49 -07004067 def test_bad_pathlike(self):
4068 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004069 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004070 # __fspath__ attribute that is not callable.
4071 c = type('foo', (), {})
4072 c.__fspath__ = 1
4073 self.assertRaises(TypeError, self.fspath, c())
4074 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004075 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004076 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004077
Bar Hareleae87e32019-12-22 11:57:27 +02004078 def test_pathlike_subclasshook(self):
4079 # bpo-38878: subclasshook causes subclass checks
4080 # true on abstract implementation.
4081 class A(os.PathLike):
4082 pass
4083 self.assertFalse(issubclass(FakePath, A))
4084 self.assertTrue(issubclass(FakePath, os.PathLike))
4085
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004086 def test_pathlike_class_getitem(self):
4087 self.assertIs(os.PathLike[bytes], os.PathLike)
4088
Victor Stinnerc29b5852017-11-02 07:28:27 -07004089
4090class TimesTests(unittest.TestCase):
4091 def test_times(self):
4092 times = os.times()
4093 self.assertIsInstance(times, os.times_result)
4094
4095 for field in ('user', 'system', 'children_user', 'children_system',
4096 'elapsed'):
4097 value = getattr(times, field)
4098 self.assertIsInstance(value, float)
4099
4100 if os.name == 'nt':
4101 self.assertEqual(times.children_user, 0)
4102 self.assertEqual(times.children_system, 0)
4103 self.assertEqual(times.elapsed, 0)
4104
4105
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004106# Only test if the C version is provided, otherwise TestPEP519 already tested
4107# the pure Python implementation.
4108if hasattr(os, "_fspath"):
4109 class TestPEP519PurePython(TestPEP519):
4110
4111 """Explicitly test the pure Python implementation of os.fspath()."""
4112
4113 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004114
4115
Fred Drake2e2be372001-09-20 21:33:42 +00004116if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004117 unittest.main()