blob: 50535da0a2bfc10e3bae54a46a4c3571842ad20c [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200109 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Victor Stinner689830e2019-06-26 17:31:12 +0200144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Steve Dower772ec0f2019-09-04 14:42:54 -0700594 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
595 def test_stat_block_device(self):
596 # bpo-38030: os.stat fails for block devices
597 # Test a filename like "//./C:"
598 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
599 result = os.stat(fname)
600 self.assertEqual(result.st_mode, stat.S_IFBLK)
601
Victor Stinner47aacc82015-06-12 17:26:23 +0200602
603class UtimeTests(unittest.TestCase):
604 def setUp(self):
605 self.dirname = support.TESTFN
606 self.fname = os.path.join(self.dirname, "f1")
607
608 self.addCleanup(support.rmtree, self.dirname)
609 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100610 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200611
Victor Stinner47aacc82015-06-12 17:26:23 +0200612 def support_subsecond(self, filename):
613 # Heuristic to check if the filesystem supports timestamp with
614 # subsecond resolution: check if float and int timestamps are different
615 st = os.stat(filename)
616 return ((st.st_atime != st[7])
617 or (st.st_mtime != st[8])
618 or (st.st_ctime != st[9]))
619
620 def _test_utime(self, set_time, filename=None):
621 if not filename:
622 filename = self.fname
623
624 support_subsecond = self.support_subsecond(filename)
625 if support_subsecond:
626 # Timestamp with a resolution of 1 microsecond (10^-6).
627 #
628 # The resolution of the C internal function used by os.utime()
629 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
630 # test with a resolution of 1 ns requires more work:
631 # see the issue #15745.
632 atime_ns = 1002003000 # 1.002003 seconds
633 mtime_ns = 4005006000 # 4.005006 seconds
634 else:
635 # use a resolution of 1 second
636 atime_ns = 5 * 10**9
637 mtime_ns = 8 * 10**9
638
639 set_time(filename, (atime_ns, mtime_ns))
640 st = os.stat(filename)
641
642 if support_subsecond:
643 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
644 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
645 else:
646 self.assertEqual(st.st_atime, atime_ns * 1e-9)
647 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
648 self.assertEqual(st.st_atime_ns, atime_ns)
649 self.assertEqual(st.st_mtime_ns, mtime_ns)
650
651 def test_utime(self):
652 def set_time(filename, ns):
653 # test the ns keyword parameter
654 os.utime(filename, ns=ns)
655 self._test_utime(set_time)
656
657 @staticmethod
658 def ns_to_sec(ns):
659 # Convert a number of nanosecond (int) to a number of seconds (float).
660 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
661 # issue, os.utime() rounds towards minus infinity.
662 return (ns * 1e-9) + 0.5e-9
663
664 def test_utime_by_indexed(self):
665 # pass times as floating point seconds as the second indexed parameter
666 def set_time(filename, ns):
667 atime_ns, mtime_ns = ns
668 atime = self.ns_to_sec(atime_ns)
669 mtime = self.ns_to_sec(mtime_ns)
670 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
671 # or utime(time_t)
672 os.utime(filename, (atime, mtime))
673 self._test_utime(set_time)
674
675 def test_utime_by_times(self):
676 def set_time(filename, ns):
677 atime_ns, mtime_ns = ns
678 atime = self.ns_to_sec(atime_ns)
679 mtime = self.ns_to_sec(mtime_ns)
680 # test the times keyword parameter
681 os.utime(filename, times=(atime, mtime))
682 self._test_utime(set_time)
683
684 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
685 "follow_symlinks support for utime required "
686 "for this test.")
687 def test_utime_nofollow_symlinks(self):
688 def set_time(filename, ns):
689 # use follow_symlinks=False to test utimensat(timespec)
690 # or lutimes(timeval)
691 os.utime(filename, ns=ns, follow_symlinks=False)
692 self._test_utime(set_time)
693
694 @unittest.skipUnless(os.utime in os.supports_fd,
695 "fd support for utime required for this test.")
696 def test_utime_fd(self):
697 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100698 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200699 # use a file descriptor to test futimens(timespec)
700 # or futimes(timeval)
701 os.utime(fp.fileno(), ns=ns)
702 self._test_utime(set_time)
703
704 @unittest.skipUnless(os.utime in os.supports_dir_fd,
705 "dir_fd support for utime required for this test.")
706 def test_utime_dir_fd(self):
707 def set_time(filename, ns):
708 dirname, name = os.path.split(filename)
709 dirfd = os.open(dirname, os.O_RDONLY)
710 try:
711 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
712 os.utime(name, dir_fd=dirfd, ns=ns)
713 finally:
714 os.close(dirfd)
715 self._test_utime(set_time)
716
717 def test_utime_directory(self):
718 def set_time(filename, ns):
719 # test calling os.utime() on a directory
720 os.utime(filename, ns=ns)
721 self._test_utime(set_time, filename=self.dirname)
722
723 def _test_utime_current(self, set_time):
724 # Get the system clock
725 current = time.time()
726
727 # Call os.utime() to set the timestamp to the current system clock
728 set_time(self.fname)
729
730 if not self.support_subsecond(self.fname):
731 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700732 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200733 # On Windows, the usual resolution of time.time() is 15.6 ms.
734 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700735 #
736 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
737 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200738 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200739 st = os.stat(self.fname)
740 msg = ("st_time=%r, current=%r, dt=%r"
741 % (st.st_mtime, current, st.st_mtime - current))
742 self.assertAlmostEqual(st.st_mtime, current,
743 delta=delta, msg=msg)
744
745 def test_utime_current(self):
746 def set_time(filename):
747 # Set to the current time in the new way
748 os.utime(self.fname)
749 self._test_utime_current(set_time)
750
751 def test_utime_current_old(self):
752 def set_time(filename):
753 # Set to the current time in the old explicit way.
754 os.utime(self.fname, None)
755 self._test_utime_current(set_time)
756
757 def get_file_system(self, path):
758 if sys.platform == 'win32':
759 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
760 import ctypes
761 kernel32 = ctypes.windll.kernel32
762 buf = ctypes.create_unicode_buffer("", 100)
763 ok = kernel32.GetVolumeInformationW(root, None, 0,
764 None, None, None,
765 buf, len(buf))
766 if ok:
767 return buf.value
768 # return None if the filesystem is unknown
769
770 def test_large_time(self):
771 # Many filesystems are limited to the year 2038. At least, the test
772 # pass with NTFS filesystem.
773 if self.get_file_system(self.dirname) != "NTFS":
774 self.skipTest("requires NTFS")
775
776 large = 5000000000 # some day in 2128
777 os.utime(self.fname, (large, large))
778 self.assertEqual(os.stat(self.fname).st_mtime, large)
779
780 def test_utime_invalid_arguments(self):
781 # seconds and nanoseconds parameters are mutually exclusive
782 with self.assertRaises(ValueError):
783 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200784 with self.assertRaises(TypeError):
785 os.utime(self.fname, [5, 5])
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, (5,))
788 with self.assertRaises(TypeError):
789 os.utime(self.fname, (5, 5, 5))
790 with self.assertRaises(TypeError):
791 os.utime(self.fname, ns=[5, 5])
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, ns=(5,))
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, ns=(5, 5, 5))
796
797 if os.utime not in os.supports_follow_symlinks:
798 with self.assertRaises(NotImplementedError):
799 os.utime(self.fname, (5, 5), follow_symlinks=False)
800 if os.utime not in os.supports_fd:
801 with open(self.fname, 'wb', 0) as fp:
802 with self.assertRaises(TypeError):
803 os.utime(fp.fileno(), (5, 5))
804 if os.utime not in os.supports_dir_fd:
805 with self.assertRaises(NotImplementedError):
806 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200807
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300808 @support.cpython_only
809 def test_issue31577(self):
810 # The interpreter shouldn't crash in case utime() received a bad
811 # ns argument.
812 def get_bad_int(divmod_ret_val):
813 class BadInt:
814 def __divmod__(*args):
815 return divmod_ret_val
816 return BadInt()
817 with self.assertRaises(TypeError):
818 os.utime(self.fname, ns=(get_bad_int(42), 1))
819 with self.assertRaises(TypeError):
820 os.utime(self.fname, ns=(get_bad_int(()), 1))
821 with self.assertRaises(TypeError):
822 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
823
Victor Stinner47aacc82015-06-12 17:26:23 +0200824
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000825from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000826
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000827class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000828 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000829 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000830
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 def setUp(self):
832 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000835 for key, value in self._reference().items():
836 os.environ[key] = value
837
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 def tearDown(self):
839 os.environ.clear()
840 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000841 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000842 os.environb.clear()
843 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000844
Christian Heimes90333392007-11-01 19:08:42 +0000845 def _reference(self):
846 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
847
848 def _empty_mapping(self):
849 os.environ.clear()
850 return os.environ
851
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000852 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000855 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000856 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300857 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200858 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 value = popen.read().strip()
860 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000861
Xavier de Gayed1415312016-07-22 12:15:29 +0200862 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
863 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000864 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
866 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300867 it = iter(popen)
868 self.assertEqual(next(it), "line1\n")
869 self.assertEqual(next(it), "line2\n")
870 self.assertEqual(next(it), "line3\n")
871 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000872
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000873 # Verify environ keys and values from the OS are of the
874 # correct str type.
875 def test_keyvalue_types(self):
876 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000877 self.assertEqual(type(key), str)
878 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000879
Christian Heimes90333392007-11-01 19:08:42 +0000880 def test_items(self):
881 for key, value in self._reference().items():
882 self.assertEqual(os.environ.get(key), value)
883
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000884 # Issue 7310
885 def test___repr__(self):
886 """Check that the repr() of os.environ looks like environ({...})."""
887 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000888 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
889 '{!r}: {!r}'.format(key, value)
890 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000892 def test_get_exec_path(self):
893 defpath_list = os.defpath.split(os.pathsep)
894 test_path = ['/monty', '/python', '', '/flying/circus']
895 test_env = {'PATH': os.pathsep.join(test_path)}
896
897 saved_environ = os.environ
898 try:
899 os.environ = dict(test_env)
900 # Test that defaulting to os.environ works.
901 self.assertSequenceEqual(test_path, os.get_exec_path())
902 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
903 finally:
904 os.environ = saved_environ
905
906 # No PATH environment variable
907 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
908 # Empty PATH environment variable
909 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
910 # Supplied PATH environment variable
911 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
912
Victor Stinnerb745a742010-05-18 17:17:23 +0000913 if os.supports_bytes_environ:
914 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000915 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000916 # ignore BytesWarning warning
917 with warnings.catch_warnings(record=True):
918 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000919 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000920 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000921 pass
922 else:
923 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000924
925 # bytes key and/or value
926 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
927 ['abc'])
928 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
929 ['abc'])
930 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
931 ['abc'])
932
933 @unittest.skipUnless(os.supports_bytes_environ,
934 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000935 def test_environb(self):
936 # os.environ -> os.environb
937 value = 'euro\u20ac'
938 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000939 value_bytes = value.encode(sys.getfilesystemencoding(),
940 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000941 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000942 msg = "U+20AC character is not encodable to %s" % (
943 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000944 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['unicode'], value)
947 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000948
949 # os.environb -> os.environ
950 value = b'\xff'
951 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000953 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000955
Victor Stinner13ff2452018-01-22 18:32:50 +0100956 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100957 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100958 def test_unset_error(self):
Victor Stinner56cd3712020-01-21 16:13:09 +0100959 # "=" is not allowed in a variable name
960 key = 'key='
961 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100962
Victor Stinner6d101392013-04-14 16:35:04 +0200963 def test_key_type(self):
964 missing = 'missingkey'
965 self.assertNotIn(missing, os.environ)
966
Victor Stinner839e5ea2013-04-14 16:43:03 +0200967 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200968 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200969 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200970 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200971
Victor Stinner839e5ea2013-04-14 16:43:03 +0200972 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200973 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200974 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200975 self.assertTrue(cm.exception.__suppress_context__)
976
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300977 def _test_environ_iteration(self, collection):
978 iterator = iter(collection)
979 new_key = "__new_key__"
980
981 next(iterator) # start iteration over os.environ.items
982
983 # add a new key in os.environ mapping
984 os.environ[new_key] = "test_environ_iteration"
985
986 try:
987 next(iterator) # force iteration over modified mapping
988 self.assertEqual(os.environ[new_key], "test_environ_iteration")
989 finally:
990 del os.environ[new_key]
991
992 def test_iter_error_when_changing_os_environ(self):
993 self._test_environ_iteration(os.environ)
994
995 def test_iter_error_when_changing_os_environ_items(self):
996 self._test_environ_iteration(os.environ.items())
997
998 def test_iter_error_when_changing_os_environ_values(self):
999 self._test_environ_iteration(os.environ.values())
1000
Victor Stinner6d101392013-04-14 16:35:04 +02001001
Tim Petersc4e09402003-04-25 07:11:48 +00001002class WalkTests(unittest.TestCase):
1003 """Tests for os.walk()."""
1004
Victor Stinner0561c532015-03-12 10:28:24 +01001005 # Wrapper to hide minor differences between os.walk and os.fwalk
1006 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001007 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001008 if 'follow_symlinks' in kwargs:
1009 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001010 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001011
Charles-François Natali7372b062012-02-05 15:15:38 +01001012 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001013 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001014 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001015
1016 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001017 # TESTFN/
1018 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001019 # tmp1
1020 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001021 # tmp2
1022 # SUB11/ no kids
1023 # SUB2/ a file kid and a dirsymlink kid
1024 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001025 # SUB21/ not readable
1026 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001027 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001028 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001029 # broken_link2
1030 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001031 # TEST2/
1032 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001033 self.walk_path = join(support.TESTFN, "TEST1")
1034 self.sub1_path = join(self.walk_path, "SUB1")
1035 self.sub11_path = join(self.sub1_path, "SUB11")
1036 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001037 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001038 tmp1_path = join(self.walk_path, "tmp1")
1039 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001040 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001041 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001042 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001043 t2_path = join(support.TESTFN, "TEST2")
1044 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001045 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001046 broken_link2_path = join(sub2_path, "broken_link2")
1047 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001048
1049 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001050 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001051 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001052 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001053 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001054
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001055 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001056 with open(path, "x") as f:
1057 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001058
Victor Stinner0561c532015-03-12 10:28:24 +01001059 if support.can_symlink():
1060 os.symlink(os.path.abspath(t2_path), self.link_path)
1061 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001062 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1063 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001064 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001065 ["broken_link", "broken_link2", "broken_link3",
1066 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001067 else:
pxinwr3e028b22019-02-15 13:04:47 +08001068 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001069
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001070 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001071 try:
1072 os.listdir(sub21_path)
1073 except PermissionError:
1074 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1075 else:
1076 os.chmod(sub21_path, stat.S_IRWXU)
1077 os.unlink(tmp5_path)
1078 os.rmdir(sub21_path)
1079 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001080
Victor Stinner0561c532015-03-12 10:28:24 +01001081 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001082 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001083 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001084
Tim Petersc4e09402003-04-25 07:11:48 +00001085 self.assertEqual(len(all), 4)
1086 # We can't know which order SUB1 and SUB2 will appear in.
1087 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1088 # flipped: TESTFN, SUB2, SUB1, SUB11
1089 flipped = all[0][1][0] != "SUB1"
1090 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001091 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001092 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001093 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1094 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1095 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1096 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001097
Brett Cannon3f9183b2016-08-26 14:44:48 -07001098 def test_walk_prune(self, walk_path=None):
1099 if walk_path is None:
1100 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001101 # Prune the search.
1102 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001103 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001104 all.append((root, dirs, files))
1105 # Don't descend into SUB1.
1106 if 'SUB1' in dirs:
1107 # Note that this also mutates the dirs we appended to all!
1108 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001109
Victor Stinner0561c532015-03-12 10:28:24 +01001110 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001111 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001112
1113 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001114 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001115 self.assertEqual(all[1], self.sub2_tree)
1116
Brett Cannon3f9183b2016-08-26 14:44:48 -07001117 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001118 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001119
Victor Stinner0561c532015-03-12 10:28:24 +01001120 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001121 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001122 all = list(self.walk(self.walk_path, topdown=False))
1123
Victor Stinner53b0a412016-03-26 01:12:36 +01001124 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001125 # We can't know which order SUB1 and SUB2 will appear in.
1126 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1127 # flipped: SUB2, SUB11, SUB1, TESTFN
1128 flipped = all[3][1][0] != "SUB1"
1129 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001130 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001131 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001132 self.assertEqual(all[3],
1133 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1134 self.assertEqual(all[flipped],
1135 (self.sub11_path, [], []))
1136 self.assertEqual(all[flipped + 1],
1137 (self.sub1_path, ["SUB11"], ["tmp2"]))
1138 self.assertEqual(all[2 - 2 * flipped],
1139 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001140
Victor Stinner0561c532015-03-12 10:28:24 +01001141 def test_walk_symlink(self):
1142 if not support.can_symlink():
1143 self.skipTest("need symlink support")
1144
1145 # Walk, following symlinks.
1146 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1147 for root, dirs, files in walk_it:
1148 if root == self.link_path:
1149 self.assertEqual(dirs, [])
1150 self.assertEqual(files, ["tmp4"])
1151 break
1152 else:
1153 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001154
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001155 def test_walk_bad_dir(self):
1156 # Walk top-down.
1157 errors = []
1158 walk_it = self.walk(self.walk_path, onerror=errors.append)
1159 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001160 self.assertEqual(errors, [])
1161 dir1 = 'SUB1'
1162 path1 = os.path.join(root, dir1)
1163 path1new = os.path.join(root, dir1 + '.new')
1164 os.rename(path1, path1new)
1165 try:
1166 roots = [r for r, d, f in walk_it]
1167 self.assertTrue(errors)
1168 self.assertNotIn(path1, roots)
1169 self.assertNotIn(path1new, roots)
1170 for dir2 in dirs:
1171 if dir2 != dir1:
1172 self.assertIn(os.path.join(root, dir2), roots)
1173 finally:
1174 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001175
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001176 def test_walk_many_open_files(self):
1177 depth = 30
1178 base = os.path.join(support.TESTFN, 'deep')
1179 p = os.path.join(base, *(['d']*depth))
1180 os.makedirs(p)
1181
1182 iters = [self.walk(base, topdown=False) for j in range(100)]
1183 for i in range(depth + 1):
1184 expected = (p, ['d'] if i else [], [])
1185 for it in iters:
1186 self.assertEqual(next(it), expected)
1187 p = os.path.dirname(p)
1188
1189 iters = [self.walk(base, topdown=True) for j in range(100)]
1190 p = base
1191 for i in range(depth + 1):
1192 expected = (p, ['d'] if i < depth else [], [])
1193 for it in iters:
1194 self.assertEqual(next(it), expected)
1195 p = os.path.join(p, 'd')
1196
Charles-François Natali7372b062012-02-05 15:15:38 +01001197
1198@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1199class FwalkTests(WalkTests):
1200 """Tests for os.fwalk()."""
1201
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001202 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001203 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001204 yield (root, dirs, files)
1205
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001206 def fwalk(self, *args, **kwargs):
1207 return os.fwalk(*args, **kwargs)
1208
Larry Hastingsc48fe982012-06-25 04:49:05 -07001209 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1210 """
1211 compare with walk() results.
1212 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001213 walk_kwargs = walk_kwargs.copy()
1214 fwalk_kwargs = fwalk_kwargs.copy()
1215 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1216 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1217 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001218
Charles-François Natali7372b062012-02-05 15:15:38 +01001219 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001220 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001221 expected[root] = (set(dirs), set(files))
1222
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001223 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001224 self.assertIn(root, expected)
1225 self.assertEqual(expected[root], (set(dirs), set(files)))
1226
Larry Hastingsc48fe982012-06-25 04:49:05 -07001227 def test_compare_to_walk(self):
1228 kwargs = {'top': support.TESTFN}
1229 self._compare_to_walk(kwargs, kwargs)
1230
Charles-François Natali7372b062012-02-05 15:15:38 +01001231 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001232 try:
1233 fd = os.open(".", os.O_RDONLY)
1234 walk_kwargs = {'top': support.TESTFN}
1235 fwalk_kwargs = walk_kwargs.copy()
1236 fwalk_kwargs['dir_fd'] = fd
1237 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1238 finally:
1239 os.close(fd)
1240
1241 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001242 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001243 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1244 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001245 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001246 # check that the FD is valid
1247 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001248 # redundant check
1249 os.stat(rootfd)
1250 # check that listdir() returns consistent information
1251 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001252
1253 def test_fd_leak(self):
1254 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1255 # we both check that calling fwalk() a large number of times doesn't
1256 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1257 minfd = os.dup(1)
1258 os.close(minfd)
1259 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001260 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001261 pass
1262 newfd = os.dup(1)
1263 self.addCleanup(os.close, newfd)
1264 self.assertEqual(newfd, minfd)
1265
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001266 # fwalk() keeps file descriptors open
1267 test_walk_many_open_files = None
1268
1269
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001270class BytesWalkTests(WalkTests):
1271 """Tests for os.walk() with bytes."""
1272 def walk(self, top, **kwargs):
1273 if 'follow_symlinks' in kwargs:
1274 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1275 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1276 root = os.fsdecode(broot)
1277 dirs = list(map(os.fsdecode, bdirs))
1278 files = list(map(os.fsdecode, bfiles))
1279 yield (root, dirs, files)
1280 bdirs[:] = list(map(os.fsencode, dirs))
1281 bfiles[:] = list(map(os.fsencode, files))
1282
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001283@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1284class BytesFwalkTests(FwalkTests):
1285 """Tests for os.walk() with bytes."""
1286 def fwalk(self, top='.', *args, **kwargs):
1287 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1288 root = os.fsdecode(broot)
1289 dirs = list(map(os.fsdecode, bdirs))
1290 files = list(map(os.fsdecode, bfiles))
1291 yield (root, dirs, files, topfd)
1292 bdirs[:] = list(map(os.fsencode, dirs))
1293 bfiles[:] = list(map(os.fsencode, files))
1294
Charles-François Natali7372b062012-02-05 15:15:38 +01001295
Guido van Rossume7ba4952007-06-06 23:52:48 +00001296class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001297 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001298 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001299
1300 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001301 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001302 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1303 os.makedirs(path) # Should work
1304 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1305 os.makedirs(path)
1306
1307 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001308 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001309 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1310 os.makedirs(path)
1311 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1312 'dir5', 'dir6')
1313 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001314
Serhiy Storchakae304e332017-03-24 13:27:42 +02001315 def test_mode(self):
1316 with support.temp_umask(0o002):
1317 base = support.TESTFN
1318 parent = os.path.join(base, 'dir1')
1319 path = os.path.join(parent, 'dir2')
1320 os.makedirs(path, 0o555)
1321 self.assertTrue(os.path.exists(path))
1322 self.assertTrue(os.path.isdir(path))
1323 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001324 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1325 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001326
Terry Reedy5a22b652010-12-02 07:05:56 +00001327 def test_exist_ok_existing_directory(self):
1328 path = os.path.join(support.TESTFN, 'dir1')
1329 mode = 0o777
1330 old_mask = os.umask(0o022)
1331 os.makedirs(path, mode)
1332 self.assertRaises(OSError, os.makedirs, path, mode)
1333 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001334 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001335 os.makedirs(path, mode=mode, exist_ok=True)
1336 os.umask(old_mask)
1337
Martin Pantera82642f2015-11-19 04:48:44 +00001338 # Issue #25583: A drive root could raise PermissionError on Windows
1339 os.makedirs(os.path.abspath('/'), exist_ok=True)
1340
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001341 def test_exist_ok_s_isgid_directory(self):
1342 path = os.path.join(support.TESTFN, 'dir1')
1343 S_ISGID = stat.S_ISGID
1344 mode = 0o777
1345 old_mask = os.umask(0o022)
1346 try:
1347 existing_testfn_mode = stat.S_IMODE(
1348 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001349 try:
1350 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001351 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001352 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001353 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1354 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1355 # The os should apply S_ISGID from the parent dir for us, but
1356 # this test need not depend on that behavior. Be explicit.
1357 os.makedirs(path, mode | S_ISGID)
1358 # http://bugs.python.org/issue14992
1359 # Should not fail when the bit is already set.
1360 os.makedirs(path, mode, exist_ok=True)
1361 # remove the bit.
1362 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001363 # May work even when the bit is not already set when demanded.
1364 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001365 finally:
1366 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001367
1368 def test_exist_ok_existing_regular_file(self):
1369 base = support.TESTFN
1370 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001371 with open(path, 'w') as f:
1372 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001373 self.assertRaises(OSError, os.makedirs, path)
1374 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1375 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1376 os.remove(path)
1377
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001378 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001379 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001380 'dir4', 'dir5', 'dir6')
1381 # If the tests failed, the bottom-most directory ('../dir6')
1382 # may not have been created, so we look for the outermost directory
1383 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001384 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001385 path = os.path.dirname(path)
1386
1387 os.removedirs(path)
1388
Andrew Svetlov405faed2012-12-25 12:18:09 +02001389
R David Murrayf2ad1732014-12-25 18:36:56 -05001390@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1391class ChownFileTests(unittest.TestCase):
1392
Berker Peksag036a71b2015-07-21 09:29:48 +03001393 @classmethod
1394 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001395 os.mkdir(support.TESTFN)
1396
1397 def test_chown_uid_gid_arguments_must_be_index(self):
1398 stat = os.stat(support.TESTFN)
1399 uid = stat.st_uid
1400 gid = stat.st_gid
1401 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1402 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1403 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1404 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1405 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1406
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001407 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1408 def test_chown_gid(self):
1409 groups = os.getgroups()
1410 if len(groups) < 2:
1411 self.skipTest("test needs at least 2 groups")
1412
R David Murrayf2ad1732014-12-25 18:36:56 -05001413 gid_1, gid_2 = groups[:2]
1414 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001415
R David Murrayf2ad1732014-12-25 18:36:56 -05001416 os.chown(support.TESTFN, uid, gid_1)
1417 gid = os.stat(support.TESTFN).st_gid
1418 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001419
R David Murrayf2ad1732014-12-25 18:36:56 -05001420 os.chown(support.TESTFN, uid, gid_2)
1421 gid = os.stat(support.TESTFN).st_gid
1422 self.assertEqual(gid, gid_2)
1423
1424 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1425 "test needs root privilege and more than one user")
1426 def test_chown_with_root(self):
1427 uid_1, uid_2 = all_users[:2]
1428 gid = os.stat(support.TESTFN).st_gid
1429 os.chown(support.TESTFN, uid_1, gid)
1430 uid = os.stat(support.TESTFN).st_uid
1431 self.assertEqual(uid, uid_1)
1432 os.chown(support.TESTFN, uid_2, gid)
1433 uid = os.stat(support.TESTFN).st_uid
1434 self.assertEqual(uid, uid_2)
1435
1436 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1437 "test needs non-root account and more than one user")
1438 def test_chown_without_permission(self):
1439 uid_1, uid_2 = all_users[:2]
1440 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001441 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001442 os.chown(support.TESTFN, uid_1, gid)
1443 os.chown(support.TESTFN, uid_2, gid)
1444
Berker Peksag036a71b2015-07-21 09:29:48 +03001445 @classmethod
1446 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001447 os.rmdir(support.TESTFN)
1448
1449
Andrew Svetlov405faed2012-12-25 12:18:09 +02001450class RemoveDirsTests(unittest.TestCase):
1451 def setUp(self):
1452 os.makedirs(support.TESTFN)
1453
1454 def tearDown(self):
1455 support.rmtree(support.TESTFN)
1456
1457 def test_remove_all(self):
1458 dira = os.path.join(support.TESTFN, 'dira')
1459 os.mkdir(dira)
1460 dirb = os.path.join(dira, 'dirb')
1461 os.mkdir(dirb)
1462 os.removedirs(dirb)
1463 self.assertFalse(os.path.exists(dirb))
1464 self.assertFalse(os.path.exists(dira))
1465 self.assertFalse(os.path.exists(support.TESTFN))
1466
1467 def test_remove_partial(self):
1468 dira = os.path.join(support.TESTFN, 'dira')
1469 os.mkdir(dira)
1470 dirb = os.path.join(dira, 'dirb')
1471 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001472 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001473 os.removedirs(dirb)
1474 self.assertFalse(os.path.exists(dirb))
1475 self.assertTrue(os.path.exists(dira))
1476 self.assertTrue(os.path.exists(support.TESTFN))
1477
1478 def test_remove_nothing(self):
1479 dira = os.path.join(support.TESTFN, 'dira')
1480 os.mkdir(dira)
1481 dirb = os.path.join(dira, 'dirb')
1482 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001483 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001484 with self.assertRaises(OSError):
1485 os.removedirs(dirb)
1486 self.assertTrue(os.path.exists(dirb))
1487 self.assertTrue(os.path.exists(dira))
1488 self.assertTrue(os.path.exists(support.TESTFN))
1489
1490
Guido van Rossume7ba4952007-06-06 23:52:48 +00001491class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001492 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001493 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001494 f.write(b'hello')
1495 f.close()
1496 with open(os.devnull, 'rb') as f:
1497 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001498
Andrew Svetlov405faed2012-12-25 12:18:09 +02001499
Guido van Rossume7ba4952007-06-06 23:52:48 +00001500class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001501 def test_urandom_length(self):
1502 self.assertEqual(len(os.urandom(0)), 0)
1503 self.assertEqual(len(os.urandom(1)), 1)
1504 self.assertEqual(len(os.urandom(10)), 10)
1505 self.assertEqual(len(os.urandom(100)), 100)
1506 self.assertEqual(len(os.urandom(1000)), 1000)
1507
1508 def test_urandom_value(self):
1509 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001510 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001511 data2 = os.urandom(16)
1512 self.assertNotEqual(data1, data2)
1513
1514 def get_urandom_subprocess(self, count):
1515 code = '\n'.join((
1516 'import os, sys',
1517 'data = os.urandom(%s)' % count,
1518 'sys.stdout.buffer.write(data)',
1519 'sys.stdout.buffer.flush()'))
1520 out = assert_python_ok('-c', code)
1521 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001522 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001523 return stdout
1524
1525 def test_urandom_subprocess(self):
1526 data1 = self.get_urandom_subprocess(16)
1527 data2 = self.get_urandom_subprocess(16)
1528 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001529
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001530
Victor Stinner9b1f4742016-09-06 16:18:52 -07001531@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1532class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001533 @classmethod
1534 def setUpClass(cls):
1535 try:
1536 os.getrandom(1)
1537 except OSError as exc:
1538 if exc.errno == errno.ENOSYS:
1539 # Python compiled on a more recent Linux version
1540 # than the current Linux kernel
1541 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1542 else:
1543 raise
1544
Victor Stinner9b1f4742016-09-06 16:18:52 -07001545 def test_getrandom_type(self):
1546 data = os.getrandom(16)
1547 self.assertIsInstance(data, bytes)
1548 self.assertEqual(len(data), 16)
1549
1550 def test_getrandom0(self):
1551 empty = os.getrandom(0)
1552 self.assertEqual(empty, b'')
1553
1554 def test_getrandom_random(self):
1555 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1556
1557 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1558 # resource /dev/random
1559
1560 def test_getrandom_nonblock(self):
1561 # The call must not fail. Check also that the flag exists
1562 try:
1563 os.getrandom(1, os.GRND_NONBLOCK)
1564 except BlockingIOError:
1565 # System urandom is not initialized yet
1566 pass
1567
1568 def test_getrandom_value(self):
1569 data1 = os.getrandom(16)
1570 data2 = os.getrandom(16)
1571 self.assertNotEqual(data1, data2)
1572
1573
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001574# os.urandom() doesn't use a file descriptor when it is implemented with the
1575# getentropy() function, the getrandom() function or the getrandom() syscall
1576OS_URANDOM_DONT_USE_FD = (
1577 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1578 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1579 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001580
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001581@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1582 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001583@unittest.skipIf(sys.platform == "vxworks",
1584 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001585class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001586 @unittest.skipUnless(resource, "test requires the resource module")
1587 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001588 # Check urandom() failing when it is not able to open /dev/random.
1589 # We spawn a new process to make the test more robust (if getrlimit()
1590 # failed to restore the file descriptor limit after this, the whole
1591 # test suite would crash; this actually happened on the OS X Tiger
1592 # buildbot).
1593 code = """if 1:
1594 import errno
1595 import os
1596 import resource
1597
1598 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1599 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1600 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001601 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001602 except OSError as e:
1603 assert e.errno == errno.EMFILE, e.errno
1604 else:
1605 raise AssertionError("OSError not raised")
1606 """
1607 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001608
Antoine Pitroue472aea2014-04-26 14:33:03 +02001609 def test_urandom_fd_closed(self):
1610 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1611 # closed.
1612 code = """if 1:
1613 import os
1614 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001615 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001616 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001617 with test.support.SuppressCrashReport():
1618 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001619 sys.stdout.buffer.write(os.urandom(4))
1620 """
1621 rc, out, err = assert_python_ok('-Sc', code)
1622
1623 def test_urandom_fd_reopened(self):
1624 # Issue #21207: urandom() should detect its fd to /dev/urandom
1625 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001626 self.addCleanup(support.unlink, support.TESTFN)
1627 create_file(support.TESTFN, b"x" * 256)
1628
Antoine Pitroue472aea2014-04-26 14:33:03 +02001629 code = """if 1:
1630 import os
1631 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001632 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001633 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001634 with test.support.SuppressCrashReport():
1635 for fd in range(3, 256):
1636 try:
1637 os.close(fd)
1638 except OSError:
1639 pass
1640 else:
1641 # Found the urandom fd (XXX hopefully)
1642 break
1643 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001644 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001645 new_fd = f.fileno()
1646 # Issue #26935: posix allows new_fd and fd to be equal but
1647 # some libc implementations have dup2 return an error in this
1648 # case.
1649 if new_fd != fd:
1650 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001651 sys.stdout.buffer.write(os.urandom(4))
1652 sys.stdout.buffer.write(os.urandom(4))
1653 """.format(TESTFN=support.TESTFN)
1654 rc, out, err = assert_python_ok('-Sc', code)
1655 self.assertEqual(len(out), 8)
1656 self.assertNotEqual(out[0:4], out[4:8])
1657 rc, out2, err2 = assert_python_ok('-Sc', code)
1658 self.assertEqual(len(out2), 8)
1659 self.assertNotEqual(out2, out)
1660
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001661
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001662@contextlib.contextmanager
1663def _execvpe_mockup(defpath=None):
1664 """
1665 Stubs out execv and execve functions when used as context manager.
1666 Records exec calls. The mock execv and execve functions always raise an
1667 exception as they would normally never return.
1668 """
1669 # A list of tuples containing (function name, first arg, args)
1670 # of calls to execv or execve that have been made.
1671 calls = []
1672
1673 def mock_execv(name, *args):
1674 calls.append(('execv', name, args))
1675 raise RuntimeError("execv called")
1676
1677 def mock_execve(name, *args):
1678 calls.append(('execve', name, args))
1679 raise OSError(errno.ENOTDIR, "execve called")
1680
1681 try:
1682 orig_execv = os.execv
1683 orig_execve = os.execve
1684 orig_defpath = os.defpath
1685 os.execv = mock_execv
1686 os.execve = mock_execve
1687 if defpath is not None:
1688 os.defpath = defpath
1689 yield calls
1690 finally:
1691 os.execv = orig_execv
1692 os.execve = orig_execve
1693 os.defpath = orig_defpath
1694
pxinwrf2d7ac72019-05-21 18:46:37 +08001695@unittest.skipUnless(hasattr(os, 'execv'),
1696 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001697class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001698 @unittest.skipIf(USING_LINUXTHREADS,
1699 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001700 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001701 self.assertRaises(OSError, os.execvpe, 'no such app-',
1702 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001703
Steve Dowerbce26262016-11-19 19:17:26 -08001704 def test_execv_with_bad_arglist(self):
1705 self.assertRaises(ValueError, os.execv, 'notepad', ())
1706 self.assertRaises(ValueError, os.execv, 'notepad', [])
1707 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1708 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1709
Thomas Heller6790d602007-08-30 17:15:14 +00001710 def test_execvpe_with_bad_arglist(self):
1711 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001712 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1713 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001714
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001715 @unittest.skipUnless(hasattr(os, '_execvpe'),
1716 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001717 def _test_internal_execvpe(self, test_type):
1718 program_path = os.sep + 'absolutepath'
1719 if test_type is bytes:
1720 program = b'executable'
1721 fullpath = os.path.join(os.fsencode(program_path), program)
1722 native_fullpath = fullpath
1723 arguments = [b'progname', 'arg1', 'arg2']
1724 else:
1725 program = 'executable'
1726 arguments = ['progname', 'arg1', 'arg2']
1727 fullpath = os.path.join(program_path, program)
1728 if os.name != "nt":
1729 native_fullpath = os.fsencode(fullpath)
1730 else:
1731 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001732 env = {'spam': 'beans'}
1733
Victor Stinnerb745a742010-05-18 17:17:23 +00001734 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001735 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001736 self.assertRaises(RuntimeError,
1737 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001738 self.assertEqual(len(calls), 1)
1739 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1740
Victor Stinnerb745a742010-05-18 17:17:23 +00001741 # test os._execvpe() with a relative path:
1742 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001743 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001744 self.assertRaises(OSError,
1745 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001746 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001747 self.assertSequenceEqual(calls[0],
1748 ('execve', native_fullpath, (arguments, env)))
1749
1750 # test os._execvpe() with a relative path:
1751 # os.get_exec_path() reads the 'PATH' variable
1752 with _execvpe_mockup() as calls:
1753 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001754 if test_type is bytes:
1755 env_path[b'PATH'] = program_path
1756 else:
1757 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001758 self.assertRaises(OSError,
1759 os._execvpe, program, arguments, env=env_path)
1760 self.assertEqual(len(calls), 1)
1761 self.assertSequenceEqual(calls[0],
1762 ('execve', native_fullpath, (arguments, env_path)))
1763
1764 def test_internal_execvpe_str(self):
1765 self._test_internal_execvpe(str)
1766 if os.name != "nt":
1767 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001768
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001769 def test_execve_invalid_env(self):
1770 args = [sys.executable, '-c', 'pass']
1771
Ville Skyttä49b27342017-08-03 09:00:59 +03001772 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001773 newenv = os.environ.copy()
1774 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1775 with self.assertRaises(ValueError):
1776 os.execve(args[0], args, newenv)
1777
Ville Skyttä49b27342017-08-03 09:00:59 +03001778 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001779 newenv = os.environ.copy()
1780 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1781 with self.assertRaises(ValueError):
1782 os.execve(args[0], args, newenv)
1783
Ville Skyttä49b27342017-08-03 09:00:59 +03001784 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001785 newenv = os.environ.copy()
1786 newenv["FRUIT=ORANGE"] = "lemon"
1787 with self.assertRaises(ValueError):
1788 os.execve(args[0], args, newenv)
1789
Alexey Izbyshev83460312018-10-20 03:28:22 +03001790 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1791 def test_execve_with_empty_path(self):
1792 # bpo-32890: Check GetLastError() misuse
1793 try:
1794 os.execve('', ['arg'], {})
1795 except OSError as e:
1796 self.assertTrue(e.winerror is None or e.winerror != 0)
1797 else:
1798 self.fail('No OSError raised')
1799
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001800
Serhiy Storchaka43767632013-11-03 21:31:38 +02001801@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001802class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001803 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001804 try:
1805 os.stat(support.TESTFN)
1806 except FileNotFoundError:
1807 exists = False
1808 except OSError as exc:
1809 exists = True
1810 self.fail("file %s must not exist; os.stat failed with %s"
1811 % (support.TESTFN, exc))
1812 else:
1813 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001814
Thomas Wouters477c8d52006-05-27 19:21:47 +00001815 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001816 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001817
1818 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001819 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001820
1821 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001822 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001823
1824 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001825 self.addCleanup(support.unlink, support.TESTFN)
1826
Victor Stinnere77c9742016-03-25 10:28:23 +01001827 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001828 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001829
1830 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001831 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001832
Thomas Wouters477c8d52006-05-27 19:21:47 +00001833 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001834 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001835
Victor Stinnere77c9742016-03-25 10:28:23 +01001836
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001837class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001838 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001839 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1840 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001841 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001842 def get_single(f):
1843 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001844 if hasattr(os, f):
1845 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001846 return helper
1847 for f in singles:
1848 locals()["test_"+f] = get_single(f)
1849
Benjamin Peterson7522c742009-01-19 21:00:09 +00001850 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001851 try:
1852 f(support.make_bad_fd(), *args)
1853 except OSError as e:
1854 self.assertEqual(e.errno, errno.EBADF)
1855 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001856 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001857 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001858
Serhiy Storchaka43767632013-11-03 21:31:38 +02001859 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001860 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001862
Serhiy Storchaka43767632013-11-03 21:31:38 +02001863 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001864 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865 fd = support.make_bad_fd()
1866 # Make sure none of the descriptors we are about to close are
1867 # currently valid (issue 6542).
1868 for i in range(10):
1869 try: os.fstat(fd+i)
1870 except OSError:
1871 pass
1872 else:
1873 break
1874 if i < 2:
1875 raise unittest.SkipTest(
1876 "Unable to acquire a range of invalid file descriptors")
1877 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001878
Serhiy Storchaka43767632013-11-03 21:31:38 +02001879 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001880 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001881 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001882
Serhiy Storchaka43767632013-11-03 21:31:38 +02001883 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001884 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001885 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001886
Serhiy Storchaka43767632013-11-03 21:31:38 +02001887 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001888 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001889 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001890
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001892 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001893 self.check(os.pathconf, "PC_NAME_MAX")
1894 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001895
Serhiy Storchaka43767632013-11-03 21:31:38 +02001896 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001897 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001898 self.check(os.truncate, 0)
1899 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001900
Serhiy Storchaka43767632013-11-03 21:31:38 +02001901 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001902 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001903 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001904
Serhiy Storchaka43767632013-11-03 21:31:38 +02001905 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001906 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001907 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001908
Victor Stinner57ddf782014-01-08 15:21:28 +01001909 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1910 def test_readv(self):
1911 buf = bytearray(10)
1912 self.check(os.readv, [buf])
1913
Serhiy Storchaka43767632013-11-03 21:31:38 +02001914 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001915 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001916 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001917
Serhiy Storchaka43767632013-11-03 21:31:38 +02001918 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001919 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001920 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001921
Victor Stinner57ddf782014-01-08 15:21:28 +01001922 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1923 def test_writev(self):
1924 self.check(os.writev, [b'abc'])
1925
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001926 def test_inheritable(self):
1927 self.check(os.get_inheritable)
1928 self.check(os.set_inheritable, True)
1929
1930 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1931 'needs os.get_blocking() and os.set_blocking()')
1932 def test_blocking(self):
1933 self.check(os.get_blocking)
1934 self.check(os.set_blocking, True)
1935
Brian Curtin1b9df392010-11-24 20:24:31 +00001936
1937class LinkTests(unittest.TestCase):
1938 def setUp(self):
1939 self.file1 = support.TESTFN
1940 self.file2 = os.path.join(support.TESTFN + "2")
1941
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001942 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001943 for file in (self.file1, self.file2):
1944 if os.path.exists(file):
1945 os.unlink(file)
1946
Brian Curtin1b9df392010-11-24 20:24:31 +00001947 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001948 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001949
xdegaye6a55d092017-11-12 17:57:04 +01001950 try:
1951 os.link(file1, file2)
1952 except PermissionError as e:
1953 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001954 with open(file1, "r") as f1, open(file2, "r") as f2:
1955 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1956
1957 def test_link(self):
1958 self._test_link(self.file1, self.file2)
1959
1960 def test_link_bytes(self):
1961 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1962 bytes(self.file2, sys.getfilesystemencoding()))
1963
Brian Curtinf498b752010-11-30 15:54:04 +00001964 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001965 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001966 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001967 except UnicodeError:
1968 raise unittest.SkipTest("Unable to encode for this platform.")
1969
Brian Curtinf498b752010-11-30 15:54:04 +00001970 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001971 self.file2 = self.file1 + "2"
1972 self._test_link(self.file1, self.file2)
1973
Serhiy Storchaka43767632013-11-03 21:31:38 +02001974@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1975class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001976 # uid_t and gid_t are 32-bit unsigned integers on Linux
1977 UID_OVERFLOW = (1 << 32)
1978 GID_OVERFLOW = (1 << 32)
1979
Serhiy Storchaka43767632013-11-03 21:31:38 +02001980 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1981 def test_setuid(self):
1982 if os.getuid() != 0:
1983 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001984 self.assertRaises(TypeError, os.setuid, 'not an int')
1985 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001986
Serhiy Storchaka43767632013-11-03 21:31:38 +02001987 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1988 def test_setgid(self):
1989 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1990 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001991 self.assertRaises(TypeError, os.setgid, 'not an int')
1992 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001993
Serhiy Storchaka43767632013-11-03 21:31:38 +02001994 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1995 def test_seteuid(self):
1996 if os.getuid() != 0:
1997 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001998 self.assertRaises(TypeError, os.setegid, 'not an int')
1999 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002000
Serhiy Storchaka43767632013-11-03 21:31:38 +02002001 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2002 def test_setegid(self):
2003 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2004 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002005 self.assertRaises(TypeError, os.setegid, 'not an int')
2006 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002007
Serhiy Storchaka43767632013-11-03 21:31:38 +02002008 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2009 def test_setreuid(self):
2010 if os.getuid() != 0:
2011 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002012 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2013 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2014 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2015 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002016
Serhiy Storchaka43767632013-11-03 21:31:38 +02002017 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2018 def test_setreuid_neg1(self):
2019 # Needs to accept -1. We run this in a subprocess to avoid
2020 # altering the test runner's process state (issue8045).
2021 subprocess.check_call([
2022 sys.executable, '-c',
2023 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002024
Serhiy Storchaka43767632013-11-03 21:31:38 +02002025 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2026 def test_setregid(self):
2027 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2028 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002029 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2030 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2031 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2032 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002033
Serhiy Storchaka43767632013-11-03 21:31:38 +02002034 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2035 def test_setregid_neg1(self):
2036 # Needs to accept -1. We run this in a subprocess to avoid
2037 # altering the test runner's process state (issue8045).
2038 subprocess.check_call([
2039 sys.executable, '-c',
2040 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002041
Serhiy Storchaka43767632013-11-03 21:31:38 +02002042@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2043class Pep383Tests(unittest.TestCase):
2044 def setUp(self):
2045 if support.TESTFN_UNENCODABLE:
2046 self.dir = support.TESTFN_UNENCODABLE
2047 elif support.TESTFN_NONASCII:
2048 self.dir = support.TESTFN_NONASCII
2049 else:
2050 self.dir = support.TESTFN
2051 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002052
Serhiy Storchaka43767632013-11-03 21:31:38 +02002053 bytesfn = []
2054 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002055 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002056 fn = os.fsencode(fn)
2057 except UnicodeEncodeError:
2058 return
2059 bytesfn.append(fn)
2060 add_filename(support.TESTFN_UNICODE)
2061 if support.TESTFN_UNENCODABLE:
2062 add_filename(support.TESTFN_UNENCODABLE)
2063 if support.TESTFN_NONASCII:
2064 add_filename(support.TESTFN_NONASCII)
2065 if not bytesfn:
2066 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002067
Serhiy Storchaka43767632013-11-03 21:31:38 +02002068 self.unicodefn = set()
2069 os.mkdir(self.dir)
2070 try:
2071 for fn in bytesfn:
2072 support.create_empty_file(os.path.join(self.bdir, fn))
2073 fn = os.fsdecode(fn)
2074 if fn in self.unicodefn:
2075 raise ValueError("duplicate filename")
2076 self.unicodefn.add(fn)
2077 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002078 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002079 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002080
Serhiy Storchaka43767632013-11-03 21:31:38 +02002081 def tearDown(self):
2082 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002083
Serhiy Storchaka43767632013-11-03 21:31:38 +02002084 def test_listdir(self):
2085 expected = self.unicodefn
2086 found = set(os.listdir(self.dir))
2087 self.assertEqual(found, expected)
2088 # test listdir without arguments
2089 current_directory = os.getcwd()
2090 try:
2091 os.chdir(os.sep)
2092 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2093 finally:
2094 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002095
Serhiy Storchaka43767632013-11-03 21:31:38 +02002096 def test_open(self):
2097 for fn in self.unicodefn:
2098 f = open(os.path.join(self.dir, fn), 'rb')
2099 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002100
Serhiy Storchaka43767632013-11-03 21:31:38 +02002101 @unittest.skipUnless(hasattr(os, 'statvfs'),
2102 "need os.statvfs()")
2103 def test_statvfs(self):
2104 # issue #9645
2105 for fn in self.unicodefn:
2106 # should not fail with file not found error
2107 fullname = os.path.join(self.dir, fn)
2108 os.statvfs(fullname)
2109
2110 def test_stat(self):
2111 for fn in self.unicodefn:
2112 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002113
Brian Curtineb24d742010-04-12 17:16:38 +00002114@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2115class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002116 def _kill(self, sig):
2117 # Start sys.executable as a subprocess and communicate from the
2118 # subprocess to the parent that the interpreter is ready. When it
2119 # becomes ready, send *sig* via os.kill to the subprocess and check
2120 # that the return code is equal to *sig*.
2121 import ctypes
2122 from ctypes import wintypes
2123 import msvcrt
2124
2125 # Since we can't access the contents of the process' stdout until the
2126 # process has exited, use PeekNamedPipe to see what's inside stdout
2127 # without waiting. This is done so we can tell that the interpreter
2128 # is started and running at a point where it could handle a signal.
2129 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2130 PeekNamedPipe.restype = wintypes.BOOL
2131 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2132 ctypes.POINTER(ctypes.c_char), # stdout buf
2133 wintypes.DWORD, # Buffer size
2134 ctypes.POINTER(wintypes.DWORD), # bytes read
2135 ctypes.POINTER(wintypes.DWORD), # bytes avail
2136 ctypes.POINTER(wintypes.DWORD)) # bytes left
2137 msg = "running"
2138 proc = subprocess.Popen([sys.executable, "-c",
2139 "import sys;"
2140 "sys.stdout.write('{}');"
2141 "sys.stdout.flush();"
2142 "input()".format(msg)],
2143 stdout=subprocess.PIPE,
2144 stderr=subprocess.PIPE,
2145 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002146 self.addCleanup(proc.stdout.close)
2147 self.addCleanup(proc.stderr.close)
2148 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002149
2150 count, max = 0, 100
2151 while count < max and proc.poll() is None:
2152 # Create a string buffer to store the result of stdout from the pipe
2153 buf = ctypes.create_string_buffer(len(msg))
2154 # Obtain the text currently in proc.stdout
2155 # Bytes read/avail/left are left as NULL and unused
2156 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2157 buf, ctypes.sizeof(buf), None, None, None)
2158 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2159 if buf.value:
2160 self.assertEqual(msg, buf.value.decode())
2161 break
2162 time.sleep(0.1)
2163 count += 1
2164 else:
2165 self.fail("Did not receive communication from the subprocess")
2166
Brian Curtineb24d742010-04-12 17:16:38 +00002167 os.kill(proc.pid, sig)
2168 self.assertEqual(proc.wait(), sig)
2169
2170 def test_kill_sigterm(self):
2171 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002172 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002173
2174 def test_kill_int(self):
2175 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002176 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002177
2178 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002179 tagname = "test_os_%s" % uuid.uuid1()
2180 m = mmap.mmap(-1, 1, tagname)
2181 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002182 # Run a script which has console control handling enabled.
2183 proc = subprocess.Popen([sys.executable,
2184 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002185 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002186 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2187 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002188 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002189 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002190 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002191 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002192 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002193 count += 1
2194 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002195 # Forcefully kill the process if we weren't able to signal it.
2196 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002197 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002198 os.kill(proc.pid, event)
2199 # proc.send_signal(event) could also be done here.
2200 # Allow time for the signal to be passed and the process to exit.
2201 time.sleep(0.5)
2202 if not proc.poll():
2203 # Forcefully kill the process if we weren't able to signal it.
2204 os.kill(proc.pid, signal.SIGINT)
2205 self.fail("subprocess did not stop on {}".format(name))
2206
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002207 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002208 def test_CTRL_C_EVENT(self):
2209 from ctypes import wintypes
2210 import ctypes
2211
2212 # Make a NULL value by creating a pointer with no argument.
2213 NULL = ctypes.POINTER(ctypes.c_int)()
2214 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2215 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2216 wintypes.BOOL)
2217 SetConsoleCtrlHandler.restype = wintypes.BOOL
2218
2219 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002220 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002221 # by subprocesses.
2222 SetConsoleCtrlHandler(NULL, 0)
2223
2224 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2225
2226 def test_CTRL_BREAK_EVENT(self):
2227 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2228
2229
Brian Curtind40e6f72010-07-08 21:39:08 +00002230@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002231class Win32ListdirTests(unittest.TestCase):
2232 """Test listdir on Windows."""
2233
2234 def setUp(self):
2235 self.created_paths = []
2236 for i in range(2):
2237 dir_name = 'SUB%d' % i
2238 dir_path = os.path.join(support.TESTFN, dir_name)
2239 file_name = 'FILE%d' % i
2240 file_path = os.path.join(support.TESTFN, file_name)
2241 os.makedirs(dir_path)
2242 with open(file_path, 'w') as f:
2243 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2244 self.created_paths.extend([dir_name, file_name])
2245 self.created_paths.sort()
2246
2247 def tearDown(self):
2248 shutil.rmtree(support.TESTFN)
2249
2250 def test_listdir_no_extended_path(self):
2251 """Test when the path is not an "extended" path."""
2252 # unicode
2253 self.assertEqual(
2254 sorted(os.listdir(support.TESTFN)),
2255 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002256
Tim Golden781bbeb2013-10-25 20:24:06 +01002257 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002258 self.assertEqual(
2259 sorted(os.listdir(os.fsencode(support.TESTFN))),
2260 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002261
2262 def test_listdir_extended_path(self):
2263 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002264 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002265 # unicode
2266 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2267 self.assertEqual(
2268 sorted(os.listdir(path)),
2269 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002270
Tim Golden781bbeb2013-10-25 20:24:06 +01002271 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002272 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2273 self.assertEqual(
2274 sorted(os.listdir(path)),
2275 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002276
2277
Berker Peksage0b5b202018-08-15 13:03:41 +03002278@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2279class ReadlinkTests(unittest.TestCase):
2280 filelink = 'readlinktest'
2281 filelink_target = os.path.abspath(__file__)
2282 filelinkb = os.fsencode(filelink)
2283 filelinkb_target = os.fsencode(filelink_target)
2284
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002285 def assertPathEqual(self, left, right):
2286 left = os.path.normcase(left)
2287 right = os.path.normcase(right)
2288 if sys.platform == 'win32':
2289 # Bad practice to blindly strip the prefix as it may be required to
2290 # correctly refer to the file, but we're only comparing paths here.
2291 has_prefix = lambda p: p.startswith(
2292 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2293 if has_prefix(left):
2294 left = left[4:]
2295 if has_prefix(right):
2296 right = right[4:]
2297 self.assertEqual(left, right)
2298
Berker Peksage0b5b202018-08-15 13:03:41 +03002299 def setUp(self):
2300 self.assertTrue(os.path.exists(self.filelink_target))
2301 self.assertTrue(os.path.exists(self.filelinkb_target))
2302 self.assertFalse(os.path.exists(self.filelink))
2303 self.assertFalse(os.path.exists(self.filelinkb))
2304
2305 def test_not_symlink(self):
2306 filelink_target = FakePath(self.filelink_target)
2307 self.assertRaises(OSError, os.readlink, self.filelink_target)
2308 self.assertRaises(OSError, os.readlink, filelink_target)
2309
2310 def test_missing_link(self):
2311 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2312 self.assertRaises(FileNotFoundError, os.readlink,
2313 FakePath('missing-link'))
2314
2315 @support.skip_unless_symlink
2316 def test_pathlike(self):
2317 os.symlink(self.filelink_target, self.filelink)
2318 self.addCleanup(support.unlink, self.filelink)
2319 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002320 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002321
2322 @support.skip_unless_symlink
2323 def test_pathlike_bytes(self):
2324 os.symlink(self.filelinkb_target, self.filelinkb)
2325 self.addCleanup(support.unlink, self.filelinkb)
2326 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002327 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002328 self.assertIsInstance(path, bytes)
2329
2330 @support.skip_unless_symlink
2331 def test_bytes(self):
2332 os.symlink(self.filelinkb_target, self.filelinkb)
2333 self.addCleanup(support.unlink, self.filelinkb)
2334 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002335 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002336 self.assertIsInstance(path, bytes)
2337
2338
Tim Golden781bbeb2013-10-25 20:24:06 +01002339@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002340@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002341class Win32SymlinkTests(unittest.TestCase):
2342 filelink = 'filelinktest'
2343 filelink_target = os.path.abspath(__file__)
2344 dirlink = 'dirlinktest'
2345 dirlink_target = os.path.dirname(filelink_target)
2346 missing_link = 'missing link'
2347
2348 def setUp(self):
2349 assert os.path.exists(self.dirlink_target)
2350 assert os.path.exists(self.filelink_target)
2351 assert not os.path.exists(self.dirlink)
2352 assert not os.path.exists(self.filelink)
2353 assert not os.path.exists(self.missing_link)
2354
2355 def tearDown(self):
2356 if os.path.exists(self.filelink):
2357 os.remove(self.filelink)
2358 if os.path.exists(self.dirlink):
2359 os.rmdir(self.dirlink)
2360 if os.path.lexists(self.missing_link):
2361 os.remove(self.missing_link)
2362
2363 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002364 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002365 self.assertTrue(os.path.exists(self.dirlink))
2366 self.assertTrue(os.path.isdir(self.dirlink))
2367 self.assertTrue(os.path.islink(self.dirlink))
2368 self.check_stat(self.dirlink, self.dirlink_target)
2369
2370 def test_file_link(self):
2371 os.symlink(self.filelink_target, self.filelink)
2372 self.assertTrue(os.path.exists(self.filelink))
2373 self.assertTrue(os.path.isfile(self.filelink))
2374 self.assertTrue(os.path.islink(self.filelink))
2375 self.check_stat(self.filelink, self.filelink_target)
2376
2377 def _create_missing_dir_link(self):
2378 'Create a "directory" link to a non-existent target'
2379 linkname = self.missing_link
2380 if os.path.lexists(linkname):
2381 os.remove(linkname)
2382 target = r'c:\\target does not exist.29r3c740'
2383 assert not os.path.exists(target)
2384 target_is_dir = True
2385 os.symlink(target, linkname, target_is_dir)
2386
2387 def test_remove_directory_link_to_missing_target(self):
2388 self._create_missing_dir_link()
2389 # For compatibility with Unix, os.remove will check the
2390 # directory status and call RemoveDirectory if the symlink
2391 # was created with target_is_dir==True.
2392 os.remove(self.missing_link)
2393
Brian Curtind40e6f72010-07-08 21:39:08 +00002394 def test_isdir_on_directory_link_to_missing_target(self):
2395 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002396 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002397
Brian Curtind40e6f72010-07-08 21:39:08 +00002398 def test_rmdir_on_directory_link_to_missing_target(self):
2399 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002400 os.rmdir(self.missing_link)
2401
2402 def check_stat(self, link, target):
2403 self.assertEqual(os.stat(link), os.stat(target))
2404 self.assertNotEqual(os.lstat(link), os.stat(link))
2405
Brian Curtind25aef52011-06-13 15:16:04 -05002406 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002407 self.assertEqual(os.stat(bytes_link), os.stat(target))
2408 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002409
2410 def test_12084(self):
2411 level1 = os.path.abspath(support.TESTFN)
2412 level2 = os.path.join(level1, "level2")
2413 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002414 self.addCleanup(support.rmtree, level1)
2415
2416 os.mkdir(level1)
2417 os.mkdir(level2)
2418 os.mkdir(level3)
2419
2420 file1 = os.path.abspath(os.path.join(level1, "file1"))
2421 create_file(file1)
2422
2423 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002424 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002425 os.chdir(level2)
2426 link = os.path.join(level2, "link")
2427 os.symlink(os.path.relpath(file1), "link")
2428 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002429
Victor Stinnerae39d232016-03-24 17:12:55 +01002430 # Check os.stat calls from the same dir as the link
2431 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002432
Victor Stinnerae39d232016-03-24 17:12:55 +01002433 # Check os.stat calls from a dir below the link
2434 os.chdir(level1)
2435 self.assertEqual(os.stat(file1),
2436 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002437
Victor Stinnerae39d232016-03-24 17:12:55 +01002438 # Check os.stat calls from a dir above the link
2439 os.chdir(level3)
2440 self.assertEqual(os.stat(file1),
2441 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002442 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002443 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002444
SSE43c34aad2018-02-13 00:10:35 +07002445 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2446 and os.path.exists(r'C:\ProgramData'),
2447 'Test directories not found')
2448 def test_29248(self):
2449 # os.symlink() calls CreateSymbolicLink, which creates
2450 # the reparse data buffer with the print name stored
2451 # first, so the offset is always 0. CreateSymbolicLink
2452 # stores the "PrintName" DOS path (e.g. "C:\") first,
2453 # with an offset of 0, followed by the "SubstituteName"
2454 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2455 # the other hand, seems to have been created manually
2456 # with an inverted order.
2457 target = os.readlink(r'C:\Users\All Users')
2458 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2459
Steve Dower6921e732018-03-05 14:26:08 -08002460 def test_buffer_overflow(self):
2461 # Older versions would have a buffer overflow when detecting
2462 # whether a link source was a directory. This test ensures we
2463 # no longer crash, but does not otherwise validate the behavior
2464 segment = 'X' * 27
2465 path = os.path.join(*[segment] * 10)
2466 test_cases = [
2467 # overflow with absolute src
2468 ('\\' + path, segment),
2469 # overflow dest with relative src
2470 (segment, path),
2471 # overflow when joining src
2472 (path[:180], path[:180]),
2473 ]
2474 for src, dest in test_cases:
2475 try:
2476 os.symlink(src, dest)
2477 except FileNotFoundError:
2478 pass
2479 else:
2480 try:
2481 os.remove(dest)
2482 except OSError:
2483 pass
2484 # Also test with bytes, since that is a separate code path.
2485 try:
2486 os.symlink(os.fsencode(src), os.fsencode(dest))
2487 except FileNotFoundError:
2488 pass
2489 else:
2490 try:
2491 os.remove(dest)
2492 except OSError:
2493 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002494
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002495 def test_appexeclink(self):
2496 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002497 if not os.path.isdir(root):
2498 self.skipTest("test requires a WindowsApps directory")
2499
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002500 aliases = [os.path.join(root, a)
2501 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2502
2503 for alias in aliases:
2504 if support.verbose:
2505 print()
2506 print("Testing with", alias)
2507 st = os.lstat(alias)
2508 self.assertEqual(st, os.stat(alias))
2509 self.assertFalse(stat.S_ISLNK(st.st_mode))
2510 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2511 # testing the first one we see is sufficient
2512 break
2513 else:
2514 self.skipTest("test requires an app execution alias")
2515
Tim Golden0321cf22014-05-05 19:46:17 +01002516@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2517class Win32JunctionTests(unittest.TestCase):
2518 junction = 'junctiontest'
2519 junction_target = os.path.dirname(os.path.abspath(__file__))
2520
2521 def setUp(self):
2522 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002523 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002524
2525 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002526 if os.path.lexists(self.junction):
2527 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002528
2529 def test_create_junction(self):
2530 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002531 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002532 self.assertTrue(os.path.exists(self.junction))
2533 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002534 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2535 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002536
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002537 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002538 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002539 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2540 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002541
2542 def test_unlink_removes_junction(self):
2543 _winapi.CreateJunction(self.junction_target, self.junction)
2544 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002545 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002546
2547 os.unlink(self.junction)
2548 self.assertFalse(os.path.exists(self.junction))
2549
Mark Becwarb82bfac2019-02-02 16:08:23 -05002550@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2551class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002552 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002553 nt = support.import_module('nt')
2554 ctypes = support.import_module('ctypes')
2555 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002556
2557 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2558 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2559
2560 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2561 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2562 ctypes.wintypes.LPDWORD)
2563
2564 # This is a pseudo-handle that doesn't need to be closed
2565 hproc = kernel.GetCurrentProcess()
2566
2567 handle_count = ctypes.wintypes.DWORD()
2568 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2569 self.assertEqual(1, ok)
2570
2571 before_count = handle_count.value
2572
2573 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002574 filenames = [
2575 r'\\?\C:',
2576 r'\\?\NUL',
2577 r'\\?\CONIN',
2578 __file__,
2579 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002580
Berker Peksag6ef726a2019-04-22 18:46:28 +03002581 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002582 for name in filenames:
2583 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002584 nt._getfinalpathname(name)
2585 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002586 # Failure is expected
2587 pass
2588 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002589 os.stat(name)
2590 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002591 pass
2592
2593 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2594 self.assertEqual(1, ok)
2595
2596 handle_delta = handle_count.value - before_count
2597
2598 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002599
Jason R. Coombs3a092862013-05-27 23:21:28 -04002600@support.skip_unless_symlink
2601class NonLocalSymlinkTests(unittest.TestCase):
2602
2603 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002604 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002605 Create this structure:
2606
2607 base
2608 \___ some_dir
2609 """
2610 os.makedirs('base/some_dir')
2611
2612 def tearDown(self):
2613 shutil.rmtree('base')
2614
2615 def test_directory_link_nonlocal(self):
2616 """
2617 The symlink target should resolve relative to the link, not relative
2618 to the current directory.
2619
2620 Then, link base/some_link -> base/some_dir and ensure that some_link
2621 is resolved as a directory.
2622
2623 In issue13772, it was discovered that directory detection failed if
2624 the symlink target was not specified relative to the current
2625 directory, which was a defect in the implementation.
2626 """
2627 src = os.path.join('base', 'some_link')
2628 os.symlink('some_dir', src)
2629 assert os.path.isdir(src)
2630
2631
Victor Stinnere8d51452010-08-19 01:05:19 +00002632class FSEncodingTests(unittest.TestCase):
2633 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002634 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2635 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002636
Victor Stinnere8d51452010-08-19 01:05:19 +00002637 def test_identity(self):
2638 # assert fsdecode(fsencode(x)) == x
2639 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2640 try:
2641 bytesfn = os.fsencode(fn)
2642 except UnicodeEncodeError:
2643 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002644 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002645
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002646
Brett Cannonefb00c02012-02-29 18:31:31 -05002647
2648class DeviceEncodingTests(unittest.TestCase):
2649
2650 def test_bad_fd(self):
2651 # Return None when an fd doesn't actually exist.
2652 self.assertIsNone(os.device_encoding(123456))
2653
Paul Monson62dfd7d2019-04-25 11:36:45 -07002654 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002655 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002656 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002657 def test_device_encoding(self):
2658 encoding = os.device_encoding(0)
2659 self.assertIsNotNone(encoding)
2660 self.assertTrue(codecs.lookup(encoding))
2661
2662
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002663class PidTests(unittest.TestCase):
2664 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2665 def test_getppid(self):
2666 p = subprocess.Popen([sys.executable, '-c',
2667 'import os; print(os.getppid())'],
2668 stdout=subprocess.PIPE)
2669 stdout, _ = p.communicate()
2670 # We are the parent of our subprocess
2671 self.assertEqual(int(stdout), os.getpid())
2672
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002673 def test_waitpid(self):
2674 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002675 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002676 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002677 status = os.waitpid(pid, 0)
2678 self.assertEqual(status, (pid, 0))
2679
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002680
Victor Stinner4659ccf2016-09-14 10:57:00 +02002681class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002682 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002683 self.exitcode = 17
2684
2685 filename = support.TESTFN
2686 self.addCleanup(support.unlink, filename)
2687
2688 if not with_env:
2689 code = 'import sys; sys.exit(%s)' % self.exitcode
2690 else:
2691 self.env = dict(os.environ)
2692 # create an unique key
2693 self.key = str(uuid.uuid4())
2694 self.env[self.key] = self.key
2695 # read the variable from os.environ to check that it exists
2696 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2697 % (self.key, self.exitcode))
2698
2699 with open(filename, "w") as fp:
2700 fp.write(code)
2701
Berker Peksag81816462016-09-15 20:19:47 +03002702 args = [sys.executable, filename]
2703 if use_bytes:
2704 args = [os.fsencode(a) for a in args]
2705 self.env = {os.fsencode(k): os.fsencode(v)
2706 for k, v in self.env.items()}
2707
2708 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002709
Berker Peksag4af23d72016-09-15 20:32:44 +03002710 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002711 def test_spawnl(self):
2712 args = self.create_args()
2713 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2714 self.assertEqual(exitcode, self.exitcode)
2715
Berker Peksag4af23d72016-09-15 20:32:44 +03002716 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002717 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002718 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002719 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2720 self.assertEqual(exitcode, self.exitcode)
2721
Berker Peksag4af23d72016-09-15 20:32:44 +03002722 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002723 def test_spawnlp(self):
2724 args = self.create_args()
2725 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2726 self.assertEqual(exitcode, self.exitcode)
2727
Berker Peksag4af23d72016-09-15 20:32:44 +03002728 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002729 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002730 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002731 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2732 self.assertEqual(exitcode, self.exitcode)
2733
Berker Peksag4af23d72016-09-15 20:32:44 +03002734 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002735 def test_spawnv(self):
2736 args = self.create_args()
2737 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2738 self.assertEqual(exitcode, self.exitcode)
2739
Berker Peksag4af23d72016-09-15 20:32:44 +03002740 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002741 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002742 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002743 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2744 self.assertEqual(exitcode, self.exitcode)
2745
Berker Peksag4af23d72016-09-15 20:32:44 +03002746 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002747 def test_spawnvp(self):
2748 args = self.create_args()
2749 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2750 self.assertEqual(exitcode, self.exitcode)
2751
Berker Peksag4af23d72016-09-15 20:32:44 +03002752 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002753 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002754 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002755 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2756 self.assertEqual(exitcode, self.exitcode)
2757
Berker Peksag4af23d72016-09-15 20:32:44 +03002758 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002759 def test_nowait(self):
2760 args = self.create_args()
2761 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2762 result = os.waitpid(pid, 0)
2763 self.assertEqual(result[0], pid)
2764 status = result[1]
2765 if hasattr(os, 'WIFEXITED'):
2766 self.assertTrue(os.WIFEXITED(status))
2767 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2768 else:
2769 self.assertEqual(status, self.exitcode << 8)
2770
Berker Peksag4af23d72016-09-15 20:32:44 +03002771 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002772 def test_spawnve_bytes(self):
2773 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2774 args = self.create_args(with_env=True, use_bytes=True)
2775 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2776 self.assertEqual(exitcode, self.exitcode)
2777
Steve Dower859fd7b2016-11-19 18:53:19 -08002778 @requires_os_func('spawnl')
2779 def test_spawnl_noargs(self):
2780 args = self.create_args()
2781 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002782 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002783
2784 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002785 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002786 args = self.create_args()
2787 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002788 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002789
2790 @requires_os_func('spawnv')
2791 def test_spawnv_noargs(self):
2792 args = self.create_args()
2793 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2794 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002795 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2796 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002797
2798 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002799 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002800 args = self.create_args()
2801 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2802 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002803 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2804 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002805
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002806 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002807 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002808
Ville Skyttä49b27342017-08-03 09:00:59 +03002809 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002810 newenv = os.environ.copy()
2811 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2812 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002813 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002814 except ValueError:
2815 pass
2816 else:
2817 self.assertEqual(exitcode, 127)
2818
Ville Skyttä49b27342017-08-03 09:00:59 +03002819 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002820 newenv = os.environ.copy()
2821 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2822 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002823 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002824 except ValueError:
2825 pass
2826 else:
2827 self.assertEqual(exitcode, 127)
2828
Ville Skyttä49b27342017-08-03 09:00:59 +03002829 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002830 newenv = os.environ.copy()
2831 newenv["FRUIT=ORANGE"] = "lemon"
2832 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002833 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002834 except ValueError:
2835 pass
2836 else:
2837 self.assertEqual(exitcode, 127)
2838
Ville Skyttä49b27342017-08-03 09:00:59 +03002839 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002840 filename = support.TESTFN
2841 self.addCleanup(support.unlink, filename)
2842 with open(filename, "w") as fp:
2843 fp.write('import sys, os\n'
2844 'if os.getenv("FRUIT") != "orange=lemon":\n'
2845 ' raise AssertionError')
2846 args = [sys.executable, filename]
2847 newenv = os.environ.copy()
2848 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002849 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002850 self.assertEqual(exitcode, 0)
2851
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002852 @requires_os_func('spawnve')
2853 def test_spawnve_invalid_env(self):
2854 self._test_invalid_env(os.spawnve)
2855
2856 @requires_os_func('spawnvpe')
2857 def test_spawnvpe_invalid_env(self):
2858 self._test_invalid_env(os.spawnvpe)
2859
Serhiy Storchaka77703942017-06-25 07:33:01 +03002860
Brian Curtin0151b8e2010-09-24 13:43:43 +00002861# The introduction of this TestCase caused at least two different errors on
2862# *nix buildbots. Temporarily skip this to let the buildbots move along.
2863@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002864@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2865class LoginTests(unittest.TestCase):
2866 def test_getlogin(self):
2867 user_name = os.getlogin()
2868 self.assertNotEqual(len(user_name), 0)
2869
2870
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002871@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2872 "needs os.getpriority and os.setpriority")
2873class ProgramPriorityTests(unittest.TestCase):
2874 """Tests for os.getpriority() and os.setpriority()."""
2875
2876 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002877
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002878 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2879 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2880 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002881 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2882 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002883 raise unittest.SkipTest("unable to reliably test setpriority "
2884 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002885 else:
2886 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002887 finally:
2888 try:
2889 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2890 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002891 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002892 raise
2893
2894
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002895class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002896
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002897 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002898
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002899 def __init__(self, conn):
2900 asynchat.async_chat.__init__(self, conn)
2901 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002902 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002903 self.closed = False
2904 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002905
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002906 def handle_read(self):
2907 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002908 if self.accumulate:
2909 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002910
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002911 def get_data(self):
2912 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002913
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002914 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002915 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002916 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002917
2918 def handle_error(self):
2919 raise
2920
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002921 def __init__(self, address):
2922 threading.Thread.__init__(self)
2923 asyncore.dispatcher.__init__(self)
2924 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2925 self.bind(address)
2926 self.listen(5)
2927 self.host, self.port = self.socket.getsockname()[:2]
2928 self.handler_instance = None
2929 self._active = False
2930 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002931
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002932 # --- public API
2933
2934 @property
2935 def running(self):
2936 return self._active
2937
2938 def start(self):
2939 assert not self.running
2940 self.__flag = threading.Event()
2941 threading.Thread.start(self)
2942 self.__flag.wait()
2943
2944 def stop(self):
2945 assert self.running
2946 self._active = False
2947 self.join()
2948
2949 def wait(self):
2950 # wait for handler connection to be closed, then stop the server
2951 while not getattr(self.handler_instance, "closed", False):
2952 time.sleep(0.001)
2953 self.stop()
2954
2955 # --- internals
2956
2957 def run(self):
2958 self._active = True
2959 self.__flag.set()
2960 while self._active and asyncore.socket_map:
2961 self._active_lock.acquire()
2962 asyncore.loop(timeout=0.001, count=1)
2963 self._active_lock.release()
2964 asyncore.close_all()
2965
2966 def handle_accept(self):
2967 conn, addr = self.accept()
2968 self.handler_instance = self.Handler(conn)
2969
2970 def handle_connect(self):
2971 self.close()
2972 handle_read = handle_connect
2973
2974 def writable(self):
2975 return 0
2976
2977 def handle_error(self):
2978 raise
2979
2980
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002981@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2982class TestSendfile(unittest.TestCase):
2983
Victor Stinner8c663fd2017-11-08 14:44:44 -08002984 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002985 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002986 not sys.platform.startswith("solaris") and \
2987 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002988 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2989 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002990 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2991 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002992
2993 @classmethod
2994 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002995 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002996 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002997
2998 @classmethod
2999 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003000 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003001 support.unlink(support.TESTFN)
3002
3003 def setUp(self):
3004 self.server = SendfileTestServer((support.HOST, 0))
3005 self.server.start()
3006 self.client = socket.socket()
3007 self.client.connect((self.server.host, self.server.port))
3008 self.client.settimeout(1)
3009 # synchronize by waiting for "220 ready" response
3010 self.client.recv(1024)
3011 self.sockno = self.client.fileno()
3012 self.file = open(support.TESTFN, 'rb')
3013 self.fileno = self.file.fileno()
3014
3015 def tearDown(self):
3016 self.file.close()
3017 self.client.close()
3018 if self.server.running:
3019 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003020 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003021
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003022 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003023 """A higher level wrapper representing how an application is
3024 supposed to use sendfile().
3025 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003026 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003027 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003028 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003029 except OSError as err:
3030 if err.errno == errno.ECONNRESET:
3031 # disconnected
3032 raise
3033 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3034 # we have to retry send data
3035 continue
3036 else:
3037 raise
3038
3039 def test_send_whole_file(self):
3040 # normal send
3041 total_sent = 0
3042 offset = 0
3043 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003044 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003045 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3046 if sent == 0:
3047 break
3048 offset += sent
3049 total_sent += sent
3050 self.assertTrue(sent <= nbytes)
3051 self.assertEqual(offset, total_sent)
3052
3053 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003054 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003055 self.client.close()
3056 self.server.wait()
3057 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003058 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003059 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003060
3061 def test_send_at_certain_offset(self):
3062 # start sending a file at a certain offset
3063 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003064 offset = len(self.DATA) // 2
3065 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003066 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003067 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003068 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3069 if sent == 0:
3070 break
3071 offset += sent
3072 total_sent += sent
3073 self.assertTrue(sent <= nbytes)
3074
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003075 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003076 self.client.close()
3077 self.server.wait()
3078 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003079 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003080 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003081 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003082 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003083
3084 def test_offset_overflow(self):
3085 # specify an offset > file size
3086 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003087 try:
3088 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3089 except OSError as e:
3090 # Solaris can raise EINVAL if offset >= file length, ignore.
3091 if e.errno != errno.EINVAL:
3092 raise
3093 else:
3094 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003095 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003096 self.client.close()
3097 self.server.wait()
3098 data = self.server.handler_instance.get_data()
3099 self.assertEqual(data, b'')
3100
3101 def test_invalid_offset(self):
3102 with self.assertRaises(OSError) as cm:
3103 os.sendfile(self.sockno, self.fileno, -1, 4096)
3104 self.assertEqual(cm.exception.errno, errno.EINVAL)
3105
Martin Panterbf19d162015-09-09 01:01:13 +00003106 def test_keywords(self):
3107 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003108 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3109 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003110 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003111 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3112 offset=0, count=4096,
3113 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003114
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003115 # --- headers / trailers tests
3116
Serhiy Storchaka43767632013-11-03 21:31:38 +02003117 @requires_headers_trailers
3118 def test_headers(self):
3119 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003120 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003121 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003122 headers=[b"x" * 512, b"y" * 256])
3123 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003124 total_sent += sent
3125 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003126 while total_sent < len(expected_data):
3127 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003128 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3129 offset, nbytes)
3130 if sent == 0:
3131 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003132 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003133 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003134 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003135
Serhiy Storchaka43767632013-11-03 21:31:38 +02003136 self.assertEqual(total_sent, len(expected_data))
3137 self.client.close()
3138 self.server.wait()
3139 data = self.server.handler_instance.get_data()
3140 self.assertEqual(hash(data), hash(expected_data))
3141
3142 @requires_headers_trailers
3143 def test_trailers(self):
3144 TESTFN2 = support.TESTFN + "2"
3145 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003146
3147 self.addCleanup(support.unlink, TESTFN2)
3148 create_file(TESTFN2, file_data)
3149
3150 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003151 os.sendfile(self.sockno, f.fileno(), 0, 5,
3152 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003153 self.client.close()
3154 self.server.wait()
3155 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003156 self.assertEqual(data, b"abcde123456789")
3157
3158 @requires_headers_trailers
3159 @requires_32b
3160 def test_headers_overflow_32bits(self):
3161 self.server.handler_instance.accumulate = False
3162 with self.assertRaises(OSError) as cm:
3163 os.sendfile(self.sockno, self.fileno, 0, 0,
3164 headers=[b"x" * 2**16] * 2**15)
3165 self.assertEqual(cm.exception.errno, errno.EINVAL)
3166
3167 @requires_headers_trailers
3168 @requires_32b
3169 def test_trailers_overflow_32bits(self):
3170 self.server.handler_instance.accumulate = False
3171 with self.assertRaises(OSError) as cm:
3172 os.sendfile(self.sockno, self.fileno, 0, 0,
3173 trailers=[b"x" * 2**16] * 2**15)
3174 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003175
Serhiy Storchaka43767632013-11-03 21:31:38 +02003176 @requires_headers_trailers
3177 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3178 'test needs os.SF_NODISKIO')
3179 def test_flags(self):
3180 try:
3181 os.sendfile(self.sockno, self.fileno, 0, 4096,
3182 flags=os.SF_NODISKIO)
3183 except OSError as err:
3184 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3185 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003186
3187
Larry Hastings9cf065c2012-06-22 16:30:09 -07003188def supports_extended_attributes():
3189 if not hasattr(os, "setxattr"):
3190 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003191
Larry Hastings9cf065c2012-06-22 16:30:09 -07003192 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003193 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003194 try:
3195 os.setxattr(fp.fileno(), b"user.test", b"")
3196 except OSError:
3197 return False
3198 finally:
3199 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003200
3201 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003202
3203
3204@unittest.skipUnless(supports_extended_attributes(),
3205 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003206# Kernels < 2.6.39 don't respect setxattr flags.
3207@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003208class ExtendedAttributeTests(unittest.TestCase):
3209
Larry Hastings9cf065c2012-06-22 16:30:09 -07003210 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003211 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003212 self.addCleanup(support.unlink, fn)
3213 create_file(fn)
3214
Benjamin Peterson799bd802011-08-31 22:15:17 -04003215 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003216 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003217 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003218
Victor Stinnerf12e5062011-10-16 22:12:03 +02003219 init_xattr = listxattr(fn)
3220 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003221
Larry Hastings9cf065c2012-06-22 16:30:09 -07003222 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003223 xattr = set(init_xattr)
3224 xattr.add("user.test")
3225 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003226 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3227 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3228 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003229
Benjamin Peterson799bd802011-08-31 22:15:17 -04003230 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003231 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003232 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003233
Benjamin Peterson799bd802011-08-31 22:15:17 -04003234 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003235 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003236 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003237
Larry Hastings9cf065c2012-06-22 16:30:09 -07003238 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003239 xattr.add("user.test2")
3240 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003241 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003242
Benjamin Peterson799bd802011-08-31 22:15:17 -04003243 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003244 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003245 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003246
Victor Stinnerf12e5062011-10-16 22:12:03 +02003247 xattr.remove("user.test")
3248 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003249 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3250 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3251 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3252 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003253 many = sorted("user.test{}".format(i) for i in range(100))
3254 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003255 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003256 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003257
Larry Hastings9cf065c2012-06-22 16:30:09 -07003258 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003259 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003260 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003261
3262 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3263 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003264
3265 def test_simple(self):
3266 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3267 os.listxattr)
3268
3269 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003270 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3271 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003272
3273 def test_fds(self):
3274 def getxattr(path, *args):
3275 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003276 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003277 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003278 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003279 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003280 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003281 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003282 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003283 def listxattr(path, *args):
3284 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003285 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003286 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3287
3288
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003289@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3290class TermsizeTests(unittest.TestCase):
3291 def test_does_not_crash(self):
3292 """Check if get_terminal_size() returns a meaningful value.
3293
3294 There's no easy portable way to actually check the size of the
3295 terminal, so let's check if it returns something sensible instead.
3296 """
3297 try:
3298 size = os.get_terminal_size()
3299 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003300 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003301 # Under win32 a generic OSError can be thrown if the
3302 # handle cannot be retrieved
3303 self.skipTest("failed to query terminal size")
3304 raise
3305
Antoine Pitroucfade362012-02-08 23:48:59 +01003306 self.assertGreaterEqual(size.columns, 0)
3307 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003308
3309 def test_stty_match(self):
3310 """Check if stty returns the same results
3311
3312 stty actually tests stdin, so get_terminal_size is invoked on
3313 stdin explicitly. If stty succeeded, then get_terminal_size()
3314 should work too.
3315 """
3316 try:
3317 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003318 except (FileNotFoundError, subprocess.CalledProcessError,
3319 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003320 self.skipTest("stty invocation failed")
3321 expected = (int(size[1]), int(size[0])) # reversed order
3322
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003323 try:
3324 actual = os.get_terminal_size(sys.__stdin__.fileno())
3325 except OSError as e:
3326 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3327 # Under win32 a generic OSError can be thrown if the
3328 # handle cannot be retrieved
3329 self.skipTest("failed to query terminal size")
3330 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003331 self.assertEqual(expected, actual)
3332
3333
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003334@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003335@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003336class MemfdCreateTests(unittest.TestCase):
3337 def test_memfd_create(self):
3338 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3339 self.assertNotEqual(fd, -1)
3340 self.addCleanup(os.close, fd)
3341 self.assertFalse(os.get_inheritable(fd))
3342 with open(fd, "wb", closefd=False) as f:
3343 f.write(b'memfd_create')
3344 self.assertEqual(f.tell(), 12)
3345
3346 fd2 = os.memfd_create("Hi")
3347 self.addCleanup(os.close, fd2)
3348 self.assertFalse(os.get_inheritable(fd2))
3349
3350
Victor Stinner292c8352012-10-30 02:17:38 +01003351class OSErrorTests(unittest.TestCase):
3352 def setUp(self):
3353 class Str(str):
3354 pass
3355
Victor Stinnerafe17062012-10-31 22:47:43 +01003356 self.bytes_filenames = []
3357 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003358 if support.TESTFN_UNENCODABLE is not None:
3359 decoded = support.TESTFN_UNENCODABLE
3360 else:
3361 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003362 self.unicode_filenames.append(decoded)
3363 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003364 if support.TESTFN_UNDECODABLE is not None:
3365 encoded = support.TESTFN_UNDECODABLE
3366 else:
3367 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003368 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003369 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003370 self.bytes_filenames.append(memoryview(encoded))
3371
3372 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003373
3374 def test_oserror_filename(self):
3375 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003376 (self.filenames, os.chdir,),
3377 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003378 (self.filenames, os.lstat,),
3379 (self.filenames, os.open, os.O_RDONLY),
3380 (self.filenames, os.rmdir,),
3381 (self.filenames, os.stat,),
3382 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003383 ]
3384 if sys.platform == "win32":
3385 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003386 (self.bytes_filenames, os.rename, b"dst"),
3387 (self.bytes_filenames, os.replace, b"dst"),
3388 (self.unicode_filenames, os.rename, "dst"),
3389 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003390 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003391 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003392 else:
3393 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003394 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003395 (self.filenames, os.rename, "dst"),
3396 (self.filenames, os.replace, "dst"),
3397 ))
3398 if hasattr(os, "chown"):
3399 funcs.append((self.filenames, os.chown, 0, 0))
3400 if hasattr(os, "lchown"):
3401 funcs.append((self.filenames, os.lchown, 0, 0))
3402 if hasattr(os, "truncate"):
3403 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003404 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003405 funcs.append((self.filenames, os.chflags, 0))
3406 if hasattr(os, "lchflags"):
3407 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003408 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003409 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003410 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003411 if sys.platform == "win32":
3412 funcs.append((self.bytes_filenames, os.link, b"dst"))
3413 funcs.append((self.unicode_filenames, os.link, "dst"))
3414 else:
3415 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003416 if hasattr(os, "listxattr"):
3417 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003418 (self.filenames, os.listxattr,),
3419 (self.filenames, os.getxattr, "user.test"),
3420 (self.filenames, os.setxattr, "user.test", b'user'),
3421 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003422 ))
3423 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003424 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003425 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003426 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003427
Steve Dowercc16be82016-09-08 10:35:16 -07003428
Victor Stinnerafe17062012-10-31 22:47:43 +01003429 for filenames, func, *func_args in funcs:
3430 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003431 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003432 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003433 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003434 else:
3435 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3436 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003437 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003438 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003439 except UnicodeDecodeError:
3440 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003441 else:
3442 self.fail("No exception thrown by {}".format(func))
3443
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003444class CPUCountTests(unittest.TestCase):
3445 def test_cpu_count(self):
3446 cpus = os.cpu_count()
3447 if cpus is not None:
3448 self.assertIsInstance(cpus, int)
3449 self.assertGreater(cpus, 0)
3450 else:
3451 self.skipTest("Could not determine the number of CPUs")
3452
Victor Stinnerdaf45552013-08-28 00:53:59 +02003453
3454class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003455 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003456 fd = os.open(__file__, os.O_RDONLY)
3457 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003458 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003459
Victor Stinnerdaf45552013-08-28 00:53:59 +02003460 os.set_inheritable(fd, True)
3461 self.assertEqual(os.get_inheritable(fd), True)
3462
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003463 @unittest.skipIf(fcntl is None, "need fcntl")
3464 def test_get_inheritable_cloexec(self):
3465 fd = os.open(__file__, os.O_RDONLY)
3466 self.addCleanup(os.close, fd)
3467 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003468
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003469 # clear FD_CLOEXEC flag
3470 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3471 flags &= ~fcntl.FD_CLOEXEC
3472 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003473
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003474 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003475
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003476 @unittest.skipIf(fcntl is None, "need fcntl")
3477 def test_set_inheritable_cloexec(self):
3478 fd = os.open(__file__, os.O_RDONLY)
3479 self.addCleanup(os.close, fd)
3480 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3481 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003482
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003483 os.set_inheritable(fd, True)
3484 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3485 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003486
Victor Stinnerdaf45552013-08-28 00:53:59 +02003487 def test_open(self):
3488 fd = os.open(__file__, os.O_RDONLY)
3489 self.addCleanup(os.close, fd)
3490 self.assertEqual(os.get_inheritable(fd), False)
3491
3492 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3493 def test_pipe(self):
3494 rfd, wfd = os.pipe()
3495 self.addCleanup(os.close, rfd)
3496 self.addCleanup(os.close, wfd)
3497 self.assertEqual(os.get_inheritable(rfd), False)
3498 self.assertEqual(os.get_inheritable(wfd), False)
3499
3500 def test_dup(self):
3501 fd1 = os.open(__file__, os.O_RDONLY)
3502 self.addCleanup(os.close, fd1)
3503
3504 fd2 = os.dup(fd1)
3505 self.addCleanup(os.close, fd2)
3506 self.assertEqual(os.get_inheritable(fd2), False)
3507
Zackery Spytz5be66602019-08-23 12:38:41 -06003508 def test_dup_standard_stream(self):
3509 fd = os.dup(1)
3510 self.addCleanup(os.close, fd)
3511 self.assertGreater(fd, 0)
3512
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003513 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3514 def test_dup_nul(self):
3515 # os.dup() was creating inheritable fds for character files.
3516 fd1 = os.open('NUL', os.O_RDONLY)
3517 self.addCleanup(os.close, fd1)
3518 fd2 = os.dup(fd1)
3519 self.addCleanup(os.close, fd2)
3520 self.assertFalse(os.get_inheritable(fd2))
3521
Victor Stinnerdaf45552013-08-28 00:53:59 +02003522 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3523 def test_dup2(self):
3524 fd = os.open(__file__, os.O_RDONLY)
3525 self.addCleanup(os.close, fd)
3526
3527 # inheritable by default
3528 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003529 self.addCleanup(os.close, fd2)
3530 self.assertEqual(os.dup2(fd, fd2), fd2)
3531 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003532
3533 # force non-inheritable
3534 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003535 self.addCleanup(os.close, fd3)
3536 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3537 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003538
3539 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3540 def test_openpty(self):
3541 master_fd, slave_fd = os.openpty()
3542 self.addCleanup(os.close, master_fd)
3543 self.addCleanup(os.close, slave_fd)
3544 self.assertEqual(os.get_inheritable(master_fd), False)
3545 self.assertEqual(os.get_inheritable(slave_fd), False)
3546
3547
Brett Cannon3f9183b2016-08-26 14:44:48 -07003548class PathTConverterTests(unittest.TestCase):
3549 # tuples of (function name, allows fd arguments, additional arguments to
3550 # function, cleanup function)
3551 functions = [
3552 ('stat', True, (), None),
3553 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003554 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003555 ('chflags', False, (0,), None),
3556 ('lchflags', False, (0,), None),
3557 ('open', False, (0,), getattr(os, 'close', None)),
3558 ]
3559
3560 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003561 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003562 if os.name == 'nt':
3563 bytes_fspath = bytes_filename = None
3564 else:
3565 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003566 bytes_fspath = FakePath(bytes_filename)
3567 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003568 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003569 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003570
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003571 int_fspath = FakePath(fd)
3572 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003573
3574 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3575 with self.subTest(name=name):
3576 try:
3577 fn = getattr(os, name)
3578 except AttributeError:
3579 continue
3580
Brett Cannon8f96a302016-08-26 19:30:11 -07003581 for path in (str_filename, bytes_filename, str_fspath,
3582 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003583 if path is None:
3584 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003585 with self.subTest(name=name, path=path):
3586 result = fn(path, *extra_args)
3587 if cleanup_fn is not None:
3588 cleanup_fn(result)
3589
3590 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003591 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003592 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003593
3594 if allow_fd:
3595 result = fn(fd, *extra_args) # should not fail
3596 if cleanup_fn is not None:
3597 cleanup_fn(result)
3598 else:
3599 with self.assertRaisesRegex(
3600 TypeError,
3601 'os.PathLike'):
3602 fn(fd, *extra_args)
3603
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003604 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003605 msg = r'__fspath__\(\) to return str or bytes, not %s'
3606 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003607 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003608 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003609 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003610 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003611 os.stat(FakePath(object()))
3612
Brett Cannon3f9183b2016-08-26 14:44:48 -07003613
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003614@unittest.skipUnless(hasattr(os, 'get_blocking'),
3615 'needs os.get_blocking() and os.set_blocking()')
3616class BlockingTests(unittest.TestCase):
3617 def test_blocking(self):
3618 fd = os.open(__file__, os.O_RDONLY)
3619 self.addCleanup(os.close, fd)
3620 self.assertEqual(os.get_blocking(fd), True)
3621
3622 os.set_blocking(fd, False)
3623 self.assertEqual(os.get_blocking(fd), False)
3624
3625 os.set_blocking(fd, True)
3626 self.assertEqual(os.get_blocking(fd), True)
3627
3628
Yury Selivanov97e2e062014-09-26 12:33:06 -04003629
3630class ExportsTests(unittest.TestCase):
3631 def test_os_all(self):
3632 self.assertIn('open', os.__all__)
3633 self.assertIn('walk', os.__all__)
3634
3635
Eddie Elizondob3966632019-11-05 07:16:14 -08003636class TestDirEntry(unittest.TestCase):
3637 def setUp(self):
3638 self.path = os.path.realpath(support.TESTFN)
3639 self.addCleanup(support.rmtree, self.path)
3640 os.mkdir(self.path)
3641
3642 def test_uninstantiable(self):
3643 self.assertRaises(TypeError, os.DirEntry)
3644
3645 def test_unpickable(self):
3646 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3647 entry = [entry for entry in os.scandir(self.path)].pop()
3648 self.assertIsInstance(entry, os.DirEntry)
3649 self.assertEqual(entry.name, "file.txt")
3650 import pickle
3651 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3652
3653
Victor Stinner6036e442015-03-08 01:58:04 +01003654class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003655 check_no_resource_warning = support.check_no_resource_warning
3656
Victor Stinner6036e442015-03-08 01:58:04 +01003657 def setUp(self):
3658 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003659 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003660 self.addCleanup(support.rmtree, self.path)
3661 os.mkdir(self.path)
3662
3663 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003664 path = self.bytes_path if isinstance(name, bytes) else self.path
3665 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003666 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003667 return filename
3668
3669 def get_entries(self, names):
3670 entries = dict((entry.name, entry)
3671 for entry in os.scandir(self.path))
3672 self.assertEqual(sorted(entries.keys()), names)
3673 return entries
3674
3675 def assert_stat_equal(self, stat1, stat2, skip_fields):
3676 if skip_fields:
3677 for attr in dir(stat1):
3678 if not attr.startswith("st_"):
3679 continue
3680 if attr in ("st_dev", "st_ino", "st_nlink"):
3681 continue
3682 self.assertEqual(getattr(stat1, attr),
3683 getattr(stat2, attr),
3684 (stat1, stat2, attr))
3685 else:
3686 self.assertEqual(stat1, stat2)
3687
Eddie Elizondob3966632019-11-05 07:16:14 -08003688 def test_uninstantiable(self):
3689 scandir_iter = os.scandir(self.path)
3690 self.assertRaises(TypeError, type(scandir_iter))
3691 scandir_iter.close()
3692
3693 def test_unpickable(self):
3694 filename = self.create_file("file.txt")
3695 scandir_iter = os.scandir(self.path)
3696 import pickle
3697 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3698 scandir_iter.close()
3699
Victor Stinner6036e442015-03-08 01:58:04 +01003700 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003701 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003702 self.assertEqual(entry.name, name)
3703 self.assertEqual(entry.path, os.path.join(self.path, name))
3704 self.assertEqual(entry.inode(),
3705 os.stat(entry.path, follow_symlinks=False).st_ino)
3706
3707 entry_stat = os.stat(entry.path)
3708 self.assertEqual(entry.is_dir(),
3709 stat.S_ISDIR(entry_stat.st_mode))
3710 self.assertEqual(entry.is_file(),
3711 stat.S_ISREG(entry_stat.st_mode))
3712 self.assertEqual(entry.is_symlink(),
3713 os.path.islink(entry.path))
3714
3715 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3716 self.assertEqual(entry.is_dir(follow_symlinks=False),
3717 stat.S_ISDIR(entry_lstat.st_mode))
3718 self.assertEqual(entry.is_file(follow_symlinks=False),
3719 stat.S_ISREG(entry_lstat.st_mode))
3720
3721 self.assert_stat_equal(entry.stat(),
3722 entry_stat,
3723 os.name == 'nt' and not is_symlink)
3724 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3725 entry_lstat,
3726 os.name == 'nt')
3727
3728 def test_attributes(self):
3729 link = hasattr(os, 'link')
3730 symlink = support.can_symlink()
3731
3732 dirname = os.path.join(self.path, "dir")
3733 os.mkdir(dirname)
3734 filename = self.create_file("file.txt")
3735 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003736 try:
3737 os.link(filename, os.path.join(self.path, "link_file.txt"))
3738 except PermissionError as e:
3739 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003740 if symlink:
3741 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3742 target_is_directory=True)
3743 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3744
3745 names = ['dir', 'file.txt']
3746 if link:
3747 names.append('link_file.txt')
3748 if symlink:
3749 names.extend(('symlink_dir', 'symlink_file.txt'))
3750 entries = self.get_entries(names)
3751
3752 entry = entries['dir']
3753 self.check_entry(entry, 'dir', True, False, False)
3754
3755 entry = entries['file.txt']
3756 self.check_entry(entry, 'file.txt', False, True, False)
3757
3758 if link:
3759 entry = entries['link_file.txt']
3760 self.check_entry(entry, 'link_file.txt', False, True, False)
3761
3762 if symlink:
3763 entry = entries['symlink_dir']
3764 self.check_entry(entry, 'symlink_dir', True, False, True)
3765
3766 entry = entries['symlink_file.txt']
3767 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3768
3769 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003770 path = self.bytes_path if isinstance(name, bytes) else self.path
3771 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003772 self.assertEqual(len(entries), 1)
3773
3774 entry = entries[0]
3775 self.assertEqual(entry.name, name)
3776 return entry
3777
Brett Cannon96881cd2016-06-10 14:37:21 -07003778 def create_file_entry(self, name='file.txt'):
3779 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003780 return self.get_entry(os.path.basename(filename))
3781
3782 def test_current_directory(self):
3783 filename = self.create_file()
3784 old_dir = os.getcwd()
3785 try:
3786 os.chdir(self.path)
3787
3788 # call scandir() without parameter: it must list the content
3789 # of the current directory
3790 entries = dict((entry.name, entry) for entry in os.scandir())
3791 self.assertEqual(sorted(entries.keys()),
3792 [os.path.basename(filename)])
3793 finally:
3794 os.chdir(old_dir)
3795
3796 def test_repr(self):
3797 entry = self.create_file_entry()
3798 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3799
Brett Cannon96881cd2016-06-10 14:37:21 -07003800 def test_fspath_protocol(self):
3801 entry = self.create_file_entry()
3802 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3803
3804 def test_fspath_protocol_bytes(self):
3805 bytes_filename = os.fsencode('bytesfile.txt')
3806 bytes_entry = self.create_file_entry(name=bytes_filename)
3807 fspath = os.fspath(bytes_entry)
3808 self.assertIsInstance(fspath, bytes)
3809 self.assertEqual(fspath,
3810 os.path.join(os.fsencode(self.path),bytes_filename))
3811
Victor Stinner6036e442015-03-08 01:58:04 +01003812 def test_removed_dir(self):
3813 path = os.path.join(self.path, 'dir')
3814
3815 os.mkdir(path)
3816 entry = self.get_entry('dir')
3817 os.rmdir(path)
3818
3819 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3820 if os.name == 'nt':
3821 self.assertTrue(entry.is_dir())
3822 self.assertFalse(entry.is_file())
3823 self.assertFalse(entry.is_symlink())
3824 if os.name == 'nt':
3825 self.assertRaises(FileNotFoundError, entry.inode)
3826 # don't fail
3827 entry.stat()
3828 entry.stat(follow_symlinks=False)
3829 else:
3830 self.assertGreater(entry.inode(), 0)
3831 self.assertRaises(FileNotFoundError, entry.stat)
3832 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3833
3834 def test_removed_file(self):
3835 entry = self.create_file_entry()
3836 os.unlink(entry.path)
3837
3838 self.assertFalse(entry.is_dir())
3839 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3840 if os.name == 'nt':
3841 self.assertTrue(entry.is_file())
3842 self.assertFalse(entry.is_symlink())
3843 if os.name == 'nt':
3844 self.assertRaises(FileNotFoundError, entry.inode)
3845 # don't fail
3846 entry.stat()
3847 entry.stat(follow_symlinks=False)
3848 else:
3849 self.assertGreater(entry.inode(), 0)
3850 self.assertRaises(FileNotFoundError, entry.stat)
3851 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3852
3853 def test_broken_symlink(self):
3854 if not support.can_symlink():
3855 return self.skipTest('cannot create symbolic link')
3856
3857 filename = self.create_file("file.txt")
3858 os.symlink(filename,
3859 os.path.join(self.path, "symlink.txt"))
3860 entries = self.get_entries(['file.txt', 'symlink.txt'])
3861 entry = entries['symlink.txt']
3862 os.unlink(filename)
3863
3864 self.assertGreater(entry.inode(), 0)
3865 self.assertFalse(entry.is_dir())
3866 self.assertFalse(entry.is_file()) # broken symlink returns False
3867 self.assertFalse(entry.is_dir(follow_symlinks=False))
3868 self.assertFalse(entry.is_file(follow_symlinks=False))
3869 self.assertTrue(entry.is_symlink())
3870 self.assertRaises(FileNotFoundError, entry.stat)
3871 # don't fail
3872 entry.stat(follow_symlinks=False)
3873
3874 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003875 self.create_file("file.txt")
3876
3877 path_bytes = os.fsencode(self.path)
3878 entries = list(os.scandir(path_bytes))
3879 self.assertEqual(len(entries), 1, entries)
3880 entry = entries[0]
3881
3882 self.assertEqual(entry.name, b'file.txt')
3883 self.assertEqual(entry.path,
3884 os.fsencode(os.path.join(self.path, 'file.txt')))
3885
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003886 def test_bytes_like(self):
3887 self.create_file("file.txt")
3888
3889 for cls in bytearray, memoryview:
3890 path_bytes = cls(os.fsencode(self.path))
3891 with self.assertWarns(DeprecationWarning):
3892 entries = list(os.scandir(path_bytes))
3893 self.assertEqual(len(entries), 1, entries)
3894 entry = entries[0]
3895
3896 self.assertEqual(entry.name, b'file.txt')
3897 self.assertEqual(entry.path,
3898 os.fsencode(os.path.join(self.path, 'file.txt')))
3899 self.assertIs(type(entry.name), bytes)
3900 self.assertIs(type(entry.path), bytes)
3901
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003902 @unittest.skipUnless(os.listdir in os.supports_fd,
3903 'fd support for listdir required for this test.')
3904 def test_fd(self):
3905 self.assertIn(os.scandir, os.supports_fd)
3906 self.create_file('file.txt')
3907 expected_names = ['file.txt']
3908 if support.can_symlink():
3909 os.symlink('file.txt', os.path.join(self.path, 'link'))
3910 expected_names.append('link')
3911
3912 fd = os.open(self.path, os.O_RDONLY)
3913 try:
3914 with os.scandir(fd) as it:
3915 entries = list(it)
3916 names = [entry.name for entry in entries]
3917 self.assertEqual(sorted(names), expected_names)
3918 self.assertEqual(names, os.listdir(fd))
3919 for entry in entries:
3920 self.assertEqual(entry.path, entry.name)
3921 self.assertEqual(os.fspath(entry), entry.name)
3922 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3923 if os.stat in os.supports_dir_fd:
3924 st = os.stat(entry.name, dir_fd=fd)
3925 self.assertEqual(entry.stat(), st)
3926 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3927 self.assertEqual(entry.stat(follow_symlinks=False), st)
3928 finally:
3929 os.close(fd)
3930
Victor Stinner6036e442015-03-08 01:58:04 +01003931 def test_empty_path(self):
3932 self.assertRaises(FileNotFoundError, os.scandir, '')
3933
3934 def test_consume_iterator_twice(self):
3935 self.create_file("file.txt")
3936 iterator = os.scandir(self.path)
3937
3938 entries = list(iterator)
3939 self.assertEqual(len(entries), 1, entries)
3940
3941 # check than consuming the iterator twice doesn't raise exception
3942 entries2 = list(iterator)
3943 self.assertEqual(len(entries2), 0, entries2)
3944
3945 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003946 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003947 self.assertRaises(TypeError, os.scandir, obj)
3948
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003949 def test_close(self):
3950 self.create_file("file.txt")
3951 self.create_file("file2.txt")
3952 iterator = os.scandir(self.path)
3953 next(iterator)
3954 iterator.close()
3955 # multiple closes
3956 iterator.close()
3957 with self.check_no_resource_warning():
3958 del iterator
3959
3960 def test_context_manager(self):
3961 self.create_file("file.txt")
3962 self.create_file("file2.txt")
3963 with os.scandir(self.path) as iterator:
3964 next(iterator)
3965 with self.check_no_resource_warning():
3966 del iterator
3967
3968 def test_context_manager_close(self):
3969 self.create_file("file.txt")
3970 self.create_file("file2.txt")
3971 with os.scandir(self.path) as iterator:
3972 next(iterator)
3973 iterator.close()
3974
3975 def test_context_manager_exception(self):
3976 self.create_file("file.txt")
3977 self.create_file("file2.txt")
3978 with self.assertRaises(ZeroDivisionError):
3979 with os.scandir(self.path) as iterator:
3980 next(iterator)
3981 1/0
3982 with self.check_no_resource_warning():
3983 del iterator
3984
3985 def test_resource_warning(self):
3986 self.create_file("file.txt")
3987 self.create_file("file2.txt")
3988 iterator = os.scandir(self.path)
3989 next(iterator)
3990 with self.assertWarns(ResourceWarning):
3991 del iterator
3992 support.gc_collect()
3993 # exhausted iterator
3994 iterator = os.scandir(self.path)
3995 list(iterator)
3996 with self.check_no_resource_warning():
3997 del iterator
3998
Victor Stinner6036e442015-03-08 01:58:04 +01003999
Ethan Furmancdc08792016-06-02 15:06:09 -07004000class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004001
4002 # Abstracted so it can be overridden to test pure Python implementation
4003 # if a C version is provided.
4004 fspath = staticmethod(os.fspath)
4005
Ethan Furmancdc08792016-06-02 15:06:09 -07004006 def test_return_bytes(self):
4007 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004008 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004009
4010 def test_return_string(self):
4011 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004012 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004013
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004014 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004015 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004016 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004017
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004018 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004019 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4020 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4021
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004022 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004023 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4024 self.assertTrue(issubclass(FakePath, os.PathLike))
4025 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004026
Ethan Furmancdc08792016-06-02 15:06:09 -07004027 def test_garbage_in_exception_out(self):
4028 vapor = type('blah', (), {})
4029 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004030 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004031
4032 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004033 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004034
Brett Cannon044283a2016-07-15 10:41:49 -07004035 def test_bad_pathlike(self):
4036 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004037 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004038 # __fspath__ attribute that is not callable.
4039 c = type('foo', (), {})
4040 c.__fspath__ = 1
4041 self.assertRaises(TypeError, self.fspath, c())
4042 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004043 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004044 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004045
Bar Hareleae87e32019-12-22 11:57:27 +02004046 def test_pathlike_subclasshook(self):
4047 # bpo-38878: subclasshook causes subclass checks
4048 # true on abstract implementation.
4049 class A(os.PathLike):
4050 pass
4051 self.assertFalse(issubclass(FakePath, A))
4052 self.assertTrue(issubclass(FakePath, os.PathLike))
4053
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004054 def test_pathlike_class_getitem(self):
4055 self.assertIs(os.PathLike[bytes], os.PathLike)
4056
Victor Stinnerc29b5852017-11-02 07:28:27 -07004057
4058class TimesTests(unittest.TestCase):
4059 def test_times(self):
4060 times = os.times()
4061 self.assertIsInstance(times, os.times_result)
4062
4063 for field in ('user', 'system', 'children_user', 'children_system',
4064 'elapsed'):
4065 value = getattr(times, field)
4066 self.assertIsInstance(value, float)
4067
4068 if os.name == 'nt':
4069 self.assertEqual(times.children_user, 0)
4070 self.assertEqual(times.children_system, 0)
4071 self.assertEqual(times.elapsed, 0)
4072
4073
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004074# Only test if the C version is provided, otherwise TestPEP519 already tested
4075# the pure Python implementation.
4076if hasattr(os, "_fspath"):
4077 class TestPEP519PurePython(TestPEP519):
4078
4079 """Explicitly test the pure Python implementation of os.fspath()."""
4080
4081 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004082
4083
Fred Drake2e2be372001-09-20 21:33:42 +00004084if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004085 unittest.main()