blob: 2a4ae1573ef59927c64b751ca8d9cf4685d4b147 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dower9eb3d542019-08-21 15:52:42 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Miss Islington (bot)68c1c392019-06-28 09:23:06 -070025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Miss Islington (bot)63429c82019-06-26 09:14:30 -070086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Miss Islington (bot)68c1c392019-06-28 09:23:06 -070091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Miss Islington (bot)e3761ca2019-06-28 11:07:10 -0700109 with support.change_cwd(tmpdir) as path:
Miss Islington (bot)68c1c392019-06-28 09:23:06 -0700110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Miss Islington (bot)63429c82019-06-26 09:14:30 -0700144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Miss Islington (bot)cad7abf2019-09-04 15:18:05 -0700594 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
595 def test_stat_block_device(self):
596 # bpo-38030: os.stat fails for block devices
597 # Test a filename like "//./C:"
598 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
599 result = os.stat(fname)
600 self.assertEqual(result.st_mode, stat.S_IFBLK)
601
Victor Stinner47aacc82015-06-12 17:26:23 +0200602
603class UtimeTests(unittest.TestCase):
604 def setUp(self):
605 self.dirname = support.TESTFN
606 self.fname = os.path.join(self.dirname, "f1")
607
608 self.addCleanup(support.rmtree, self.dirname)
609 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100610 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200611
Victor Stinner47aacc82015-06-12 17:26:23 +0200612 def support_subsecond(self, filename):
613 # Heuristic to check if the filesystem supports timestamp with
614 # subsecond resolution: check if float and int timestamps are different
615 st = os.stat(filename)
616 return ((st.st_atime != st[7])
617 or (st.st_mtime != st[8])
618 or (st.st_ctime != st[9]))
619
620 def _test_utime(self, set_time, filename=None):
621 if not filename:
622 filename = self.fname
623
624 support_subsecond = self.support_subsecond(filename)
625 if support_subsecond:
626 # Timestamp with a resolution of 1 microsecond (10^-6).
627 #
628 # The resolution of the C internal function used by os.utime()
629 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
630 # test with a resolution of 1 ns requires more work:
631 # see the issue #15745.
632 atime_ns = 1002003000 # 1.002003 seconds
633 mtime_ns = 4005006000 # 4.005006 seconds
634 else:
635 # use a resolution of 1 second
636 atime_ns = 5 * 10**9
637 mtime_ns = 8 * 10**9
638
639 set_time(filename, (atime_ns, mtime_ns))
640 st = os.stat(filename)
641
642 if support_subsecond:
643 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
644 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
645 else:
646 self.assertEqual(st.st_atime, atime_ns * 1e-9)
647 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
648 self.assertEqual(st.st_atime_ns, atime_ns)
649 self.assertEqual(st.st_mtime_ns, mtime_ns)
650
651 def test_utime(self):
652 def set_time(filename, ns):
653 # test the ns keyword parameter
654 os.utime(filename, ns=ns)
655 self._test_utime(set_time)
656
657 @staticmethod
658 def ns_to_sec(ns):
659 # Convert a number of nanosecond (int) to a number of seconds (float).
660 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
661 # issue, os.utime() rounds towards minus infinity.
662 return (ns * 1e-9) + 0.5e-9
663
664 def test_utime_by_indexed(self):
665 # pass times as floating point seconds as the second indexed parameter
666 def set_time(filename, ns):
667 atime_ns, mtime_ns = ns
668 atime = self.ns_to_sec(atime_ns)
669 mtime = self.ns_to_sec(mtime_ns)
670 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
671 # or utime(time_t)
672 os.utime(filename, (atime, mtime))
673 self._test_utime(set_time)
674
675 def test_utime_by_times(self):
676 def set_time(filename, ns):
677 atime_ns, mtime_ns = ns
678 atime = self.ns_to_sec(atime_ns)
679 mtime = self.ns_to_sec(mtime_ns)
680 # test the times keyword parameter
681 os.utime(filename, times=(atime, mtime))
682 self._test_utime(set_time)
683
684 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
685 "follow_symlinks support for utime required "
686 "for this test.")
687 def test_utime_nofollow_symlinks(self):
688 def set_time(filename, ns):
689 # use follow_symlinks=False to test utimensat(timespec)
690 # or lutimes(timeval)
691 os.utime(filename, ns=ns, follow_symlinks=False)
692 self._test_utime(set_time)
693
694 @unittest.skipUnless(os.utime in os.supports_fd,
695 "fd support for utime required for this test.")
696 def test_utime_fd(self):
697 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100698 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200699 # use a file descriptor to test futimens(timespec)
700 # or futimes(timeval)
701 os.utime(fp.fileno(), ns=ns)
702 self._test_utime(set_time)
703
704 @unittest.skipUnless(os.utime in os.supports_dir_fd,
705 "dir_fd support for utime required for this test.")
706 def test_utime_dir_fd(self):
707 def set_time(filename, ns):
708 dirname, name = os.path.split(filename)
709 dirfd = os.open(dirname, os.O_RDONLY)
710 try:
711 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
712 os.utime(name, dir_fd=dirfd, ns=ns)
713 finally:
714 os.close(dirfd)
715 self._test_utime(set_time)
716
717 def test_utime_directory(self):
718 def set_time(filename, ns):
719 # test calling os.utime() on a directory
720 os.utime(filename, ns=ns)
721 self._test_utime(set_time, filename=self.dirname)
722
723 def _test_utime_current(self, set_time):
724 # Get the system clock
725 current = time.time()
726
727 # Call os.utime() to set the timestamp to the current system clock
728 set_time(self.fname)
729
730 if not self.support_subsecond(self.fname):
731 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700732 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200733 # On Windows, the usual resolution of time.time() is 15.6 ms.
734 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700735 #
736 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
737 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200738 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200739 st = os.stat(self.fname)
740 msg = ("st_time=%r, current=%r, dt=%r"
741 % (st.st_mtime, current, st.st_mtime - current))
742 self.assertAlmostEqual(st.st_mtime, current,
743 delta=delta, msg=msg)
744
745 def test_utime_current(self):
746 def set_time(filename):
747 # Set to the current time in the new way
748 os.utime(self.fname)
749 self._test_utime_current(set_time)
750
751 def test_utime_current_old(self):
752 def set_time(filename):
753 # Set to the current time in the old explicit way.
754 os.utime(self.fname, None)
755 self._test_utime_current(set_time)
756
757 def get_file_system(self, path):
758 if sys.platform == 'win32':
759 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
760 import ctypes
761 kernel32 = ctypes.windll.kernel32
762 buf = ctypes.create_unicode_buffer("", 100)
763 ok = kernel32.GetVolumeInformationW(root, None, 0,
764 None, None, None,
765 buf, len(buf))
766 if ok:
767 return buf.value
768 # return None if the filesystem is unknown
769
770 def test_large_time(self):
771 # Many filesystems are limited to the year 2038. At least, the test
772 # pass with NTFS filesystem.
773 if self.get_file_system(self.dirname) != "NTFS":
774 self.skipTest("requires NTFS")
775
776 large = 5000000000 # some day in 2128
777 os.utime(self.fname, (large, large))
778 self.assertEqual(os.stat(self.fname).st_mtime, large)
779
780 def test_utime_invalid_arguments(self):
781 # seconds and nanoseconds parameters are mutually exclusive
782 with self.assertRaises(ValueError):
783 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200784 with self.assertRaises(TypeError):
785 os.utime(self.fname, [5, 5])
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, (5,))
788 with self.assertRaises(TypeError):
789 os.utime(self.fname, (5, 5, 5))
790 with self.assertRaises(TypeError):
791 os.utime(self.fname, ns=[5, 5])
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, ns=(5,))
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, ns=(5, 5, 5))
796
797 if os.utime not in os.supports_follow_symlinks:
798 with self.assertRaises(NotImplementedError):
799 os.utime(self.fname, (5, 5), follow_symlinks=False)
800 if os.utime not in os.supports_fd:
801 with open(self.fname, 'wb', 0) as fp:
802 with self.assertRaises(TypeError):
803 os.utime(fp.fileno(), (5, 5))
804 if os.utime not in os.supports_dir_fd:
805 with self.assertRaises(NotImplementedError):
806 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200807
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300808 @support.cpython_only
809 def test_issue31577(self):
810 # The interpreter shouldn't crash in case utime() received a bad
811 # ns argument.
812 def get_bad_int(divmod_ret_val):
813 class BadInt:
814 def __divmod__(*args):
815 return divmod_ret_val
816 return BadInt()
817 with self.assertRaises(TypeError):
818 os.utime(self.fname, ns=(get_bad_int(42), 1))
819 with self.assertRaises(TypeError):
820 os.utime(self.fname, ns=(get_bad_int(()), 1))
821 with self.assertRaises(TypeError):
822 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
823
Victor Stinner47aacc82015-06-12 17:26:23 +0200824
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000825from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000826
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000827class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000828 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000829 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000830
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 def setUp(self):
832 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000835 for key, value in self._reference().items():
836 os.environ[key] = value
837
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 def tearDown(self):
839 os.environ.clear()
840 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000841 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000842 os.environb.clear()
843 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000844
Christian Heimes90333392007-11-01 19:08:42 +0000845 def _reference(self):
846 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
847
848 def _empty_mapping(self):
849 os.environ.clear()
850 return os.environ
851
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000852 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000855 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000856 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300857 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200858 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 value = popen.read().strip()
860 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000861
Xavier de Gayed1415312016-07-22 12:15:29 +0200862 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
863 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000864 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
866 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300867 it = iter(popen)
868 self.assertEqual(next(it), "line1\n")
869 self.assertEqual(next(it), "line2\n")
870 self.assertEqual(next(it), "line3\n")
871 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000872
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000873 # Verify environ keys and values from the OS are of the
874 # correct str type.
875 def test_keyvalue_types(self):
876 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000877 self.assertEqual(type(key), str)
878 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000879
Christian Heimes90333392007-11-01 19:08:42 +0000880 def test_items(self):
881 for key, value in self._reference().items():
882 self.assertEqual(os.environ.get(key), value)
883
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000884 # Issue 7310
885 def test___repr__(self):
886 """Check that the repr() of os.environ looks like environ({...})."""
887 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000888 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
889 '{!r}: {!r}'.format(key, value)
890 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000892 def test_get_exec_path(self):
893 defpath_list = os.defpath.split(os.pathsep)
894 test_path = ['/monty', '/python', '', '/flying/circus']
895 test_env = {'PATH': os.pathsep.join(test_path)}
896
897 saved_environ = os.environ
898 try:
899 os.environ = dict(test_env)
900 # Test that defaulting to os.environ works.
901 self.assertSequenceEqual(test_path, os.get_exec_path())
902 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
903 finally:
904 os.environ = saved_environ
905
906 # No PATH environment variable
907 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
908 # Empty PATH environment variable
909 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
910 # Supplied PATH environment variable
911 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
912
Victor Stinnerb745a742010-05-18 17:17:23 +0000913 if os.supports_bytes_environ:
914 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000915 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000916 # ignore BytesWarning warning
917 with warnings.catch_warnings(record=True):
918 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000919 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000920 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000921 pass
922 else:
923 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000924
925 # bytes key and/or value
926 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
927 ['abc'])
928 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
929 ['abc'])
930 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
931 ['abc'])
932
933 @unittest.skipUnless(os.supports_bytes_environ,
934 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000935 def test_environb(self):
936 # os.environ -> os.environb
937 value = 'euro\u20ac'
938 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000939 value_bytes = value.encode(sys.getfilesystemencoding(),
940 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000941 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000942 msg = "U+20AC character is not encodable to %s" % (
943 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000944 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['unicode'], value)
947 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000948
949 # os.environb -> os.environ
950 value = b'\xff'
951 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000953 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000955
Victor Stinner13ff2452018-01-22 18:32:50 +0100956 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100957 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100958 def test_unset_error(self):
959 if sys.platform == "win32":
960 # an environment variable is limited to 32,767 characters
961 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100962 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100963 else:
964 # "=" is not allowed in a variable name
965 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100966 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100967
Victor Stinner6d101392013-04-14 16:35:04 +0200968 def test_key_type(self):
969 missing = 'missingkey'
970 self.assertNotIn(missing, os.environ)
971
Victor Stinner839e5ea2013-04-14 16:43:03 +0200972 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200973 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200974 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200975 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200976
Victor Stinner839e5ea2013-04-14 16:43:03 +0200977 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200978 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200979 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200980 self.assertTrue(cm.exception.__suppress_context__)
981
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300982 def _test_environ_iteration(self, collection):
983 iterator = iter(collection)
984 new_key = "__new_key__"
985
986 next(iterator) # start iteration over os.environ.items
987
988 # add a new key in os.environ mapping
989 os.environ[new_key] = "test_environ_iteration"
990
991 try:
992 next(iterator) # force iteration over modified mapping
993 self.assertEqual(os.environ[new_key], "test_environ_iteration")
994 finally:
995 del os.environ[new_key]
996
997 def test_iter_error_when_changing_os_environ(self):
998 self._test_environ_iteration(os.environ)
999
1000 def test_iter_error_when_changing_os_environ_items(self):
1001 self._test_environ_iteration(os.environ.items())
1002
1003 def test_iter_error_when_changing_os_environ_values(self):
1004 self._test_environ_iteration(os.environ.values())
1005
Victor Stinner6d101392013-04-14 16:35:04 +02001006
Tim Petersc4e09402003-04-25 07:11:48 +00001007class WalkTests(unittest.TestCase):
1008 """Tests for os.walk()."""
1009
Victor Stinner0561c532015-03-12 10:28:24 +01001010 # Wrapper to hide minor differences between os.walk and os.fwalk
1011 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001012 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001013 if 'follow_symlinks' in kwargs:
1014 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001015 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001016
Charles-François Natali7372b062012-02-05 15:15:38 +01001017 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001018 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001019 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001020
1021 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001022 # TESTFN/
1023 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001024 # tmp1
1025 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001026 # tmp2
1027 # SUB11/ no kids
1028 # SUB2/ a file kid and a dirsymlink kid
1029 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001030 # SUB21/ not readable
1031 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001032 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001033 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001034 # broken_link2
1035 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001036 # TEST2/
1037 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001038 self.walk_path = join(support.TESTFN, "TEST1")
1039 self.sub1_path = join(self.walk_path, "SUB1")
1040 self.sub11_path = join(self.sub1_path, "SUB11")
1041 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001042 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001043 tmp1_path = join(self.walk_path, "tmp1")
1044 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001045 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001046 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001047 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001048 t2_path = join(support.TESTFN, "TEST2")
1049 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001050 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001051 broken_link2_path = join(sub2_path, "broken_link2")
1052 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001053
1054 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001055 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001056 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001057 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001058 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001059
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001060 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001061 with open(path, "x") as f:
1062 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001063
Victor Stinner0561c532015-03-12 10:28:24 +01001064 if support.can_symlink():
1065 os.symlink(os.path.abspath(t2_path), self.link_path)
1066 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001067 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1068 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001069 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001070 ["broken_link", "broken_link2", "broken_link3",
1071 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001072 else:
pxinwr3e028b22019-02-15 13:04:47 +08001073 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001074
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001075 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001076 try:
1077 os.listdir(sub21_path)
1078 except PermissionError:
1079 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1080 else:
1081 os.chmod(sub21_path, stat.S_IRWXU)
1082 os.unlink(tmp5_path)
1083 os.rmdir(sub21_path)
1084 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001085
Victor Stinner0561c532015-03-12 10:28:24 +01001086 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001087 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001088 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001089
Tim Petersc4e09402003-04-25 07:11:48 +00001090 self.assertEqual(len(all), 4)
1091 # We can't know which order SUB1 and SUB2 will appear in.
1092 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1093 # flipped: TESTFN, SUB2, SUB1, SUB11
1094 flipped = all[0][1][0] != "SUB1"
1095 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001096 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001097 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001098 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1099 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1100 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1101 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001102
Brett Cannon3f9183b2016-08-26 14:44:48 -07001103 def test_walk_prune(self, walk_path=None):
1104 if walk_path is None:
1105 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001106 # Prune the search.
1107 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001108 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001109 all.append((root, dirs, files))
1110 # Don't descend into SUB1.
1111 if 'SUB1' in dirs:
1112 # Note that this also mutates the dirs we appended to all!
1113 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001114
Victor Stinner0561c532015-03-12 10:28:24 +01001115 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001116 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001117
1118 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001119 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001120 self.assertEqual(all[1], self.sub2_tree)
1121
Brett Cannon3f9183b2016-08-26 14:44:48 -07001122 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001123 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001124
Victor Stinner0561c532015-03-12 10:28:24 +01001125 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001126 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001127 all = list(self.walk(self.walk_path, topdown=False))
1128
Victor Stinner53b0a412016-03-26 01:12:36 +01001129 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001130 # We can't know which order SUB1 and SUB2 will appear in.
1131 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1132 # flipped: SUB2, SUB11, SUB1, TESTFN
1133 flipped = all[3][1][0] != "SUB1"
1134 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001135 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001136 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001137 self.assertEqual(all[3],
1138 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1139 self.assertEqual(all[flipped],
1140 (self.sub11_path, [], []))
1141 self.assertEqual(all[flipped + 1],
1142 (self.sub1_path, ["SUB11"], ["tmp2"]))
1143 self.assertEqual(all[2 - 2 * flipped],
1144 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001145
Victor Stinner0561c532015-03-12 10:28:24 +01001146 def test_walk_symlink(self):
1147 if not support.can_symlink():
1148 self.skipTest("need symlink support")
1149
1150 # Walk, following symlinks.
1151 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1152 for root, dirs, files in walk_it:
1153 if root == self.link_path:
1154 self.assertEqual(dirs, [])
1155 self.assertEqual(files, ["tmp4"])
1156 break
1157 else:
1158 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001159
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001160 def test_walk_bad_dir(self):
1161 # Walk top-down.
1162 errors = []
1163 walk_it = self.walk(self.walk_path, onerror=errors.append)
1164 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001165 self.assertEqual(errors, [])
1166 dir1 = 'SUB1'
1167 path1 = os.path.join(root, dir1)
1168 path1new = os.path.join(root, dir1 + '.new')
1169 os.rename(path1, path1new)
1170 try:
1171 roots = [r for r, d, f in walk_it]
1172 self.assertTrue(errors)
1173 self.assertNotIn(path1, roots)
1174 self.assertNotIn(path1new, roots)
1175 for dir2 in dirs:
1176 if dir2 != dir1:
1177 self.assertIn(os.path.join(root, dir2), roots)
1178 finally:
1179 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001180
Miss Islington (bot)98a4a712019-09-12 08:07:47 -07001181 def test_walk_many_open_files(self):
1182 depth = 30
1183 base = os.path.join(support.TESTFN, 'deep')
1184 p = os.path.join(base, *(['d']*depth))
1185 os.makedirs(p)
1186
1187 iters = [self.walk(base, topdown=False) for j in range(100)]
1188 for i in range(depth + 1):
1189 expected = (p, ['d'] if i else [], [])
1190 for it in iters:
1191 self.assertEqual(next(it), expected)
1192 p = os.path.dirname(p)
1193
1194 iters = [self.walk(base, topdown=True) for j in range(100)]
1195 p = base
1196 for i in range(depth + 1):
1197 expected = (p, ['d'] if i < depth else [], [])
1198 for it in iters:
1199 self.assertEqual(next(it), expected)
1200 p = os.path.join(p, 'd')
1201
Charles-François Natali7372b062012-02-05 15:15:38 +01001202
1203@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1204class FwalkTests(WalkTests):
1205 """Tests for os.fwalk()."""
1206
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001207 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001208 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001209 yield (root, dirs, files)
1210
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001211 def fwalk(self, *args, **kwargs):
1212 return os.fwalk(*args, **kwargs)
1213
Larry Hastingsc48fe982012-06-25 04:49:05 -07001214 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1215 """
1216 compare with walk() results.
1217 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001218 walk_kwargs = walk_kwargs.copy()
1219 fwalk_kwargs = fwalk_kwargs.copy()
1220 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1221 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1222 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001223
Charles-François Natali7372b062012-02-05 15:15:38 +01001224 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001225 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001226 expected[root] = (set(dirs), set(files))
1227
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001228 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001229 self.assertIn(root, expected)
1230 self.assertEqual(expected[root], (set(dirs), set(files)))
1231
Larry Hastingsc48fe982012-06-25 04:49:05 -07001232 def test_compare_to_walk(self):
1233 kwargs = {'top': support.TESTFN}
1234 self._compare_to_walk(kwargs, kwargs)
1235
Charles-François Natali7372b062012-02-05 15:15:38 +01001236 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001237 try:
1238 fd = os.open(".", os.O_RDONLY)
1239 walk_kwargs = {'top': support.TESTFN}
1240 fwalk_kwargs = walk_kwargs.copy()
1241 fwalk_kwargs['dir_fd'] = fd
1242 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1243 finally:
1244 os.close(fd)
1245
1246 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001247 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001248 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1249 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001250 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001251 # check that the FD is valid
1252 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001253 # redundant check
1254 os.stat(rootfd)
1255 # check that listdir() returns consistent information
1256 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001257
1258 def test_fd_leak(self):
1259 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1260 # we both check that calling fwalk() a large number of times doesn't
1261 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1262 minfd = os.dup(1)
1263 os.close(minfd)
1264 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001265 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001266 pass
1267 newfd = os.dup(1)
1268 self.addCleanup(os.close, newfd)
1269 self.assertEqual(newfd, minfd)
1270
Miss Islington (bot)98a4a712019-09-12 08:07:47 -07001271 # fwalk() keeps file descriptors open
1272 test_walk_many_open_files = None
1273
1274
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001275class BytesWalkTests(WalkTests):
1276 """Tests for os.walk() with bytes."""
1277 def walk(self, top, **kwargs):
1278 if 'follow_symlinks' in kwargs:
1279 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1280 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1281 root = os.fsdecode(broot)
1282 dirs = list(map(os.fsdecode, bdirs))
1283 files = list(map(os.fsdecode, bfiles))
1284 yield (root, dirs, files)
1285 bdirs[:] = list(map(os.fsencode, dirs))
1286 bfiles[:] = list(map(os.fsencode, files))
1287
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001288@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1289class BytesFwalkTests(FwalkTests):
1290 """Tests for os.walk() with bytes."""
1291 def fwalk(self, top='.', *args, **kwargs):
1292 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1293 root = os.fsdecode(broot)
1294 dirs = list(map(os.fsdecode, bdirs))
1295 files = list(map(os.fsdecode, bfiles))
1296 yield (root, dirs, files, topfd)
1297 bdirs[:] = list(map(os.fsencode, dirs))
1298 bfiles[:] = list(map(os.fsencode, files))
1299
Charles-François Natali7372b062012-02-05 15:15:38 +01001300
Guido van Rossume7ba4952007-06-06 23:52:48 +00001301class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001302 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001303 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001304
1305 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001306 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001307 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1308 os.makedirs(path) # Should work
1309 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1310 os.makedirs(path)
1311
1312 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001313 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001314 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1315 os.makedirs(path)
1316 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1317 'dir5', 'dir6')
1318 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001319
Serhiy Storchakae304e332017-03-24 13:27:42 +02001320 def test_mode(self):
1321 with support.temp_umask(0o002):
1322 base = support.TESTFN
1323 parent = os.path.join(base, 'dir1')
1324 path = os.path.join(parent, 'dir2')
1325 os.makedirs(path, 0o555)
1326 self.assertTrue(os.path.exists(path))
1327 self.assertTrue(os.path.isdir(path))
1328 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001329 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1330 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001331
Terry Reedy5a22b652010-12-02 07:05:56 +00001332 def test_exist_ok_existing_directory(self):
1333 path = os.path.join(support.TESTFN, 'dir1')
1334 mode = 0o777
1335 old_mask = os.umask(0o022)
1336 os.makedirs(path, mode)
1337 self.assertRaises(OSError, os.makedirs, path, mode)
1338 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001339 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001340 os.makedirs(path, mode=mode, exist_ok=True)
1341 os.umask(old_mask)
1342
Martin Pantera82642f2015-11-19 04:48:44 +00001343 # Issue #25583: A drive root could raise PermissionError on Windows
1344 os.makedirs(os.path.abspath('/'), exist_ok=True)
1345
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001346 def test_exist_ok_s_isgid_directory(self):
1347 path = os.path.join(support.TESTFN, 'dir1')
1348 S_ISGID = stat.S_ISGID
1349 mode = 0o777
1350 old_mask = os.umask(0o022)
1351 try:
1352 existing_testfn_mode = stat.S_IMODE(
1353 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001354 try:
1355 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001356 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001357 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001358 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1359 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1360 # The os should apply S_ISGID from the parent dir for us, but
1361 # this test need not depend on that behavior. Be explicit.
1362 os.makedirs(path, mode | S_ISGID)
1363 # http://bugs.python.org/issue14992
1364 # Should not fail when the bit is already set.
1365 os.makedirs(path, mode, exist_ok=True)
1366 # remove the bit.
1367 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001368 # May work even when the bit is not already set when demanded.
1369 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001370 finally:
1371 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001372
1373 def test_exist_ok_existing_regular_file(self):
1374 base = support.TESTFN
1375 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001376 with open(path, 'w') as f:
1377 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001378 self.assertRaises(OSError, os.makedirs, path)
1379 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1380 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1381 os.remove(path)
1382
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001383 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001384 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001385 'dir4', 'dir5', 'dir6')
1386 # If the tests failed, the bottom-most directory ('../dir6')
1387 # may not have been created, so we look for the outermost directory
1388 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001389 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001390 path = os.path.dirname(path)
1391
1392 os.removedirs(path)
1393
Andrew Svetlov405faed2012-12-25 12:18:09 +02001394
R David Murrayf2ad1732014-12-25 18:36:56 -05001395@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1396class ChownFileTests(unittest.TestCase):
1397
Berker Peksag036a71b2015-07-21 09:29:48 +03001398 @classmethod
1399 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001400 os.mkdir(support.TESTFN)
1401
1402 def test_chown_uid_gid_arguments_must_be_index(self):
1403 stat = os.stat(support.TESTFN)
1404 uid = stat.st_uid
1405 gid = stat.st_gid
1406 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1407 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1408 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1409 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1410 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1411
Miss Islington (bot)12d174b2019-06-25 08:26:56 -07001412 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1413 def test_chown_gid(self):
1414 groups = os.getgroups()
1415 if len(groups) < 2:
1416 self.skipTest("test needs at least 2 groups")
1417
R David Murrayf2ad1732014-12-25 18:36:56 -05001418 gid_1, gid_2 = groups[:2]
1419 uid = os.stat(support.TESTFN).st_uid
Miss Islington (bot)12d174b2019-06-25 08:26:56 -07001420
R David Murrayf2ad1732014-12-25 18:36:56 -05001421 os.chown(support.TESTFN, uid, gid_1)
1422 gid = os.stat(support.TESTFN).st_gid
1423 self.assertEqual(gid, gid_1)
Miss Islington (bot)12d174b2019-06-25 08:26:56 -07001424
R David Murrayf2ad1732014-12-25 18:36:56 -05001425 os.chown(support.TESTFN, uid, gid_2)
1426 gid = os.stat(support.TESTFN).st_gid
1427 self.assertEqual(gid, gid_2)
1428
1429 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1430 "test needs root privilege and more than one user")
1431 def test_chown_with_root(self):
1432 uid_1, uid_2 = all_users[:2]
1433 gid = os.stat(support.TESTFN).st_gid
1434 os.chown(support.TESTFN, uid_1, gid)
1435 uid = os.stat(support.TESTFN).st_uid
1436 self.assertEqual(uid, uid_1)
1437 os.chown(support.TESTFN, uid_2, gid)
1438 uid = os.stat(support.TESTFN).st_uid
1439 self.assertEqual(uid, uid_2)
1440
1441 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1442 "test needs non-root account and more than one user")
1443 def test_chown_without_permission(self):
1444 uid_1, uid_2 = all_users[:2]
1445 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001446 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001447 os.chown(support.TESTFN, uid_1, gid)
1448 os.chown(support.TESTFN, uid_2, gid)
1449
Berker Peksag036a71b2015-07-21 09:29:48 +03001450 @classmethod
1451 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001452 os.rmdir(support.TESTFN)
1453
1454
Andrew Svetlov405faed2012-12-25 12:18:09 +02001455class RemoveDirsTests(unittest.TestCase):
1456 def setUp(self):
1457 os.makedirs(support.TESTFN)
1458
1459 def tearDown(self):
1460 support.rmtree(support.TESTFN)
1461
1462 def test_remove_all(self):
1463 dira = os.path.join(support.TESTFN, 'dira')
1464 os.mkdir(dira)
1465 dirb = os.path.join(dira, 'dirb')
1466 os.mkdir(dirb)
1467 os.removedirs(dirb)
1468 self.assertFalse(os.path.exists(dirb))
1469 self.assertFalse(os.path.exists(dira))
1470 self.assertFalse(os.path.exists(support.TESTFN))
1471
1472 def test_remove_partial(self):
1473 dira = os.path.join(support.TESTFN, 'dira')
1474 os.mkdir(dira)
1475 dirb = os.path.join(dira, 'dirb')
1476 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001477 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001478 os.removedirs(dirb)
1479 self.assertFalse(os.path.exists(dirb))
1480 self.assertTrue(os.path.exists(dira))
1481 self.assertTrue(os.path.exists(support.TESTFN))
1482
1483 def test_remove_nothing(self):
1484 dira = os.path.join(support.TESTFN, 'dira')
1485 os.mkdir(dira)
1486 dirb = os.path.join(dira, 'dirb')
1487 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001488 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001489 with self.assertRaises(OSError):
1490 os.removedirs(dirb)
1491 self.assertTrue(os.path.exists(dirb))
1492 self.assertTrue(os.path.exists(dira))
1493 self.assertTrue(os.path.exists(support.TESTFN))
1494
1495
Guido van Rossume7ba4952007-06-06 23:52:48 +00001496class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001497 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001498 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001499 f.write(b'hello')
1500 f.close()
1501 with open(os.devnull, 'rb') as f:
1502 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001503
Andrew Svetlov405faed2012-12-25 12:18:09 +02001504
Guido van Rossume7ba4952007-06-06 23:52:48 +00001505class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001506 def test_urandom_length(self):
1507 self.assertEqual(len(os.urandom(0)), 0)
1508 self.assertEqual(len(os.urandom(1)), 1)
1509 self.assertEqual(len(os.urandom(10)), 10)
1510 self.assertEqual(len(os.urandom(100)), 100)
1511 self.assertEqual(len(os.urandom(1000)), 1000)
1512
1513 def test_urandom_value(self):
1514 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001515 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001516 data2 = os.urandom(16)
1517 self.assertNotEqual(data1, data2)
1518
1519 def get_urandom_subprocess(self, count):
1520 code = '\n'.join((
1521 'import os, sys',
1522 'data = os.urandom(%s)' % count,
1523 'sys.stdout.buffer.write(data)',
1524 'sys.stdout.buffer.flush()'))
1525 out = assert_python_ok('-c', code)
1526 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001527 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001528 return stdout
1529
1530 def test_urandom_subprocess(self):
1531 data1 = self.get_urandom_subprocess(16)
1532 data2 = self.get_urandom_subprocess(16)
1533 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001534
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001535
Victor Stinner9b1f4742016-09-06 16:18:52 -07001536@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1537class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001538 @classmethod
1539 def setUpClass(cls):
1540 try:
1541 os.getrandom(1)
1542 except OSError as exc:
1543 if exc.errno == errno.ENOSYS:
1544 # Python compiled on a more recent Linux version
1545 # than the current Linux kernel
1546 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1547 else:
1548 raise
1549
Victor Stinner9b1f4742016-09-06 16:18:52 -07001550 def test_getrandom_type(self):
1551 data = os.getrandom(16)
1552 self.assertIsInstance(data, bytes)
1553 self.assertEqual(len(data), 16)
1554
1555 def test_getrandom0(self):
1556 empty = os.getrandom(0)
1557 self.assertEqual(empty, b'')
1558
1559 def test_getrandom_random(self):
1560 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1561
1562 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1563 # resource /dev/random
1564
1565 def test_getrandom_nonblock(self):
1566 # The call must not fail. Check also that the flag exists
1567 try:
1568 os.getrandom(1, os.GRND_NONBLOCK)
1569 except BlockingIOError:
1570 # System urandom is not initialized yet
1571 pass
1572
1573 def test_getrandom_value(self):
1574 data1 = os.getrandom(16)
1575 data2 = os.getrandom(16)
1576 self.assertNotEqual(data1, data2)
1577
1578
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001579# os.urandom() doesn't use a file descriptor when it is implemented with the
1580# getentropy() function, the getrandom() function or the getrandom() syscall
1581OS_URANDOM_DONT_USE_FD = (
1582 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1583 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1584 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001585
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001586@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1587 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001588@unittest.skipIf(sys.platform == "vxworks",
1589 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001590class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001591 @unittest.skipUnless(resource, "test requires the resource module")
1592 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001593 # Check urandom() failing when it is not able to open /dev/random.
1594 # We spawn a new process to make the test more robust (if getrlimit()
1595 # failed to restore the file descriptor limit after this, the whole
1596 # test suite would crash; this actually happened on the OS X Tiger
1597 # buildbot).
1598 code = """if 1:
1599 import errno
1600 import os
1601 import resource
1602
1603 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1604 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1605 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001606 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001607 except OSError as e:
1608 assert e.errno == errno.EMFILE, e.errno
1609 else:
1610 raise AssertionError("OSError not raised")
1611 """
1612 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001613
Antoine Pitroue472aea2014-04-26 14:33:03 +02001614 def test_urandom_fd_closed(self):
1615 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1616 # closed.
1617 code = """if 1:
1618 import os
1619 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001620 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001621 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001622 with test.support.SuppressCrashReport():
1623 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001624 sys.stdout.buffer.write(os.urandom(4))
1625 """
1626 rc, out, err = assert_python_ok('-Sc', code)
1627
1628 def test_urandom_fd_reopened(self):
1629 # Issue #21207: urandom() should detect its fd to /dev/urandom
1630 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001631 self.addCleanup(support.unlink, support.TESTFN)
1632 create_file(support.TESTFN, b"x" * 256)
1633
Antoine Pitroue472aea2014-04-26 14:33:03 +02001634 code = """if 1:
1635 import os
1636 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001637 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001638 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001639 with test.support.SuppressCrashReport():
1640 for fd in range(3, 256):
1641 try:
1642 os.close(fd)
1643 except OSError:
1644 pass
1645 else:
1646 # Found the urandom fd (XXX hopefully)
1647 break
1648 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001649 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001650 new_fd = f.fileno()
1651 # Issue #26935: posix allows new_fd and fd to be equal but
1652 # some libc implementations have dup2 return an error in this
1653 # case.
1654 if new_fd != fd:
1655 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001656 sys.stdout.buffer.write(os.urandom(4))
1657 sys.stdout.buffer.write(os.urandom(4))
1658 """.format(TESTFN=support.TESTFN)
1659 rc, out, err = assert_python_ok('-Sc', code)
1660 self.assertEqual(len(out), 8)
1661 self.assertNotEqual(out[0:4], out[4:8])
1662 rc, out2, err2 = assert_python_ok('-Sc', code)
1663 self.assertEqual(len(out2), 8)
1664 self.assertNotEqual(out2, out)
1665
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001666
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001667@contextlib.contextmanager
1668def _execvpe_mockup(defpath=None):
1669 """
1670 Stubs out execv and execve functions when used as context manager.
1671 Records exec calls. The mock execv and execve functions always raise an
1672 exception as they would normally never return.
1673 """
1674 # A list of tuples containing (function name, first arg, args)
1675 # of calls to execv or execve that have been made.
1676 calls = []
1677
1678 def mock_execv(name, *args):
1679 calls.append(('execv', name, args))
1680 raise RuntimeError("execv called")
1681
1682 def mock_execve(name, *args):
1683 calls.append(('execve', name, args))
1684 raise OSError(errno.ENOTDIR, "execve called")
1685
1686 try:
1687 orig_execv = os.execv
1688 orig_execve = os.execve
1689 orig_defpath = os.defpath
1690 os.execv = mock_execv
1691 os.execve = mock_execve
1692 if defpath is not None:
1693 os.defpath = defpath
1694 yield calls
1695 finally:
1696 os.execv = orig_execv
1697 os.execve = orig_execve
1698 os.defpath = orig_defpath
1699
pxinwrf2d7ac72019-05-21 18:46:37 +08001700@unittest.skipUnless(hasattr(os, 'execv'),
1701 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001702class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001703 @unittest.skipIf(USING_LINUXTHREADS,
1704 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001705 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001706 self.assertRaises(OSError, os.execvpe, 'no such app-',
1707 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001708
Steve Dowerbce26262016-11-19 19:17:26 -08001709 def test_execv_with_bad_arglist(self):
1710 self.assertRaises(ValueError, os.execv, 'notepad', ())
1711 self.assertRaises(ValueError, os.execv, 'notepad', [])
1712 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1713 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1714
Thomas Heller6790d602007-08-30 17:15:14 +00001715 def test_execvpe_with_bad_arglist(self):
1716 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001717 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1718 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001719
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001720 @unittest.skipUnless(hasattr(os, '_execvpe'),
1721 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001722 def _test_internal_execvpe(self, test_type):
1723 program_path = os.sep + 'absolutepath'
1724 if test_type is bytes:
1725 program = b'executable'
1726 fullpath = os.path.join(os.fsencode(program_path), program)
1727 native_fullpath = fullpath
1728 arguments = [b'progname', 'arg1', 'arg2']
1729 else:
1730 program = 'executable'
1731 arguments = ['progname', 'arg1', 'arg2']
1732 fullpath = os.path.join(program_path, program)
1733 if os.name != "nt":
1734 native_fullpath = os.fsencode(fullpath)
1735 else:
1736 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001737 env = {'spam': 'beans'}
1738
Victor Stinnerb745a742010-05-18 17:17:23 +00001739 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001740 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001741 self.assertRaises(RuntimeError,
1742 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001743 self.assertEqual(len(calls), 1)
1744 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1745
Victor Stinnerb745a742010-05-18 17:17:23 +00001746 # test os._execvpe() with a relative path:
1747 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001748 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001749 self.assertRaises(OSError,
1750 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001751 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001752 self.assertSequenceEqual(calls[0],
1753 ('execve', native_fullpath, (arguments, env)))
1754
1755 # test os._execvpe() with a relative path:
1756 # os.get_exec_path() reads the 'PATH' variable
1757 with _execvpe_mockup() as calls:
1758 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001759 if test_type is bytes:
1760 env_path[b'PATH'] = program_path
1761 else:
1762 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001763 self.assertRaises(OSError,
1764 os._execvpe, program, arguments, env=env_path)
1765 self.assertEqual(len(calls), 1)
1766 self.assertSequenceEqual(calls[0],
1767 ('execve', native_fullpath, (arguments, env_path)))
1768
1769 def test_internal_execvpe_str(self):
1770 self._test_internal_execvpe(str)
1771 if os.name != "nt":
1772 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001773
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001774 def test_execve_invalid_env(self):
1775 args = [sys.executable, '-c', 'pass']
1776
Ville Skyttä49b27342017-08-03 09:00:59 +03001777 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001778 newenv = os.environ.copy()
1779 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1780 with self.assertRaises(ValueError):
1781 os.execve(args[0], args, newenv)
1782
Ville Skyttä49b27342017-08-03 09:00:59 +03001783 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001784 newenv = os.environ.copy()
1785 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1786 with self.assertRaises(ValueError):
1787 os.execve(args[0], args, newenv)
1788
Ville Skyttä49b27342017-08-03 09:00:59 +03001789 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001790 newenv = os.environ.copy()
1791 newenv["FRUIT=ORANGE"] = "lemon"
1792 with self.assertRaises(ValueError):
1793 os.execve(args[0], args, newenv)
1794
Alexey Izbyshev83460312018-10-20 03:28:22 +03001795 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1796 def test_execve_with_empty_path(self):
1797 # bpo-32890: Check GetLastError() misuse
1798 try:
1799 os.execve('', ['arg'], {})
1800 except OSError as e:
1801 self.assertTrue(e.winerror is None or e.winerror != 0)
1802 else:
1803 self.fail('No OSError raised')
1804
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001805
Serhiy Storchaka43767632013-11-03 21:31:38 +02001806@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001807class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001808 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001809 try:
1810 os.stat(support.TESTFN)
1811 except FileNotFoundError:
1812 exists = False
1813 except OSError as exc:
1814 exists = True
1815 self.fail("file %s must not exist; os.stat failed with %s"
1816 % (support.TESTFN, exc))
1817 else:
1818 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001819
Thomas Wouters477c8d52006-05-27 19:21:47 +00001820 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001821 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001822
1823 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001824 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001825
1826 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001827 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001828
1829 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001830 self.addCleanup(support.unlink, support.TESTFN)
1831
Victor Stinnere77c9742016-03-25 10:28:23 +01001832 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001833 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001834
1835 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001836 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001837
Thomas Wouters477c8d52006-05-27 19:21:47 +00001838 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001839 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001840
Victor Stinnere77c9742016-03-25 10:28:23 +01001841
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001842class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001843 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001844 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1845 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001846 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001847 def get_single(f):
1848 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001849 if hasattr(os, f):
1850 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001851 return helper
1852 for f in singles:
1853 locals()["test_"+f] = get_single(f)
1854
Benjamin Peterson7522c742009-01-19 21:00:09 +00001855 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001856 try:
1857 f(support.make_bad_fd(), *args)
1858 except OSError as e:
1859 self.assertEqual(e.errno, errno.EBADF)
1860 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001861 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001862 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001863
Serhiy Storchaka43767632013-11-03 21:31:38 +02001864 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001865 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001866 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001867
Serhiy Storchaka43767632013-11-03 21:31:38 +02001868 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001869 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001870 fd = support.make_bad_fd()
1871 # Make sure none of the descriptors we are about to close are
1872 # currently valid (issue 6542).
1873 for i in range(10):
1874 try: os.fstat(fd+i)
1875 except OSError:
1876 pass
1877 else:
1878 break
1879 if i < 2:
1880 raise unittest.SkipTest(
1881 "Unable to acquire a range of invalid file descriptors")
1882 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001883
Serhiy Storchaka43767632013-11-03 21:31:38 +02001884 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001885 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001886 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001887
Serhiy Storchaka43767632013-11-03 21:31:38 +02001888 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001889 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001890 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001891
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001893 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001894 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001895
Serhiy Storchaka43767632013-11-03 21:31:38 +02001896 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001897 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001898 self.check(os.pathconf, "PC_NAME_MAX")
1899 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001900
Serhiy Storchaka43767632013-11-03 21:31:38 +02001901 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001902 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001903 self.check(os.truncate, 0)
1904 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001905
Serhiy Storchaka43767632013-11-03 21:31:38 +02001906 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001907 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001908 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001909
Serhiy Storchaka43767632013-11-03 21:31:38 +02001910 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001911 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001912 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001913
Victor Stinner57ddf782014-01-08 15:21:28 +01001914 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1915 def test_readv(self):
1916 buf = bytearray(10)
1917 self.check(os.readv, [buf])
1918
Serhiy Storchaka43767632013-11-03 21:31:38 +02001919 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001920 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001921 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001922
Serhiy Storchaka43767632013-11-03 21:31:38 +02001923 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001924 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001925 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001926
Victor Stinner57ddf782014-01-08 15:21:28 +01001927 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1928 def test_writev(self):
1929 self.check(os.writev, [b'abc'])
1930
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001931 def test_inheritable(self):
1932 self.check(os.get_inheritable)
1933 self.check(os.set_inheritable, True)
1934
1935 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1936 'needs os.get_blocking() and os.set_blocking()')
1937 def test_blocking(self):
1938 self.check(os.get_blocking)
1939 self.check(os.set_blocking, True)
1940
Brian Curtin1b9df392010-11-24 20:24:31 +00001941
1942class LinkTests(unittest.TestCase):
1943 def setUp(self):
1944 self.file1 = support.TESTFN
1945 self.file2 = os.path.join(support.TESTFN + "2")
1946
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001947 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001948 for file in (self.file1, self.file2):
1949 if os.path.exists(file):
1950 os.unlink(file)
1951
Brian Curtin1b9df392010-11-24 20:24:31 +00001952 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001953 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001954
xdegaye6a55d092017-11-12 17:57:04 +01001955 try:
1956 os.link(file1, file2)
1957 except PermissionError as e:
1958 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001959 with open(file1, "r") as f1, open(file2, "r") as f2:
1960 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1961
1962 def test_link(self):
1963 self._test_link(self.file1, self.file2)
1964
1965 def test_link_bytes(self):
1966 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1967 bytes(self.file2, sys.getfilesystemencoding()))
1968
Brian Curtinf498b752010-11-30 15:54:04 +00001969 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001970 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001971 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001972 except UnicodeError:
1973 raise unittest.SkipTest("Unable to encode for this platform.")
1974
Brian Curtinf498b752010-11-30 15:54:04 +00001975 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001976 self.file2 = self.file1 + "2"
1977 self._test_link(self.file1, self.file2)
1978
Serhiy Storchaka43767632013-11-03 21:31:38 +02001979@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1980class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001981 # uid_t and gid_t are 32-bit unsigned integers on Linux
1982 UID_OVERFLOW = (1 << 32)
1983 GID_OVERFLOW = (1 << 32)
1984
Serhiy Storchaka43767632013-11-03 21:31:38 +02001985 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1986 def test_setuid(self):
1987 if os.getuid() != 0:
1988 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001989 self.assertRaises(TypeError, os.setuid, 'not an int')
1990 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001991
Serhiy Storchaka43767632013-11-03 21:31:38 +02001992 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1993 def test_setgid(self):
1994 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1995 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001996 self.assertRaises(TypeError, os.setgid, 'not an int')
1997 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001998
Serhiy Storchaka43767632013-11-03 21:31:38 +02001999 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2000 def test_seteuid(self):
2001 if os.getuid() != 0:
2002 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002003 self.assertRaises(TypeError, os.setegid, 'not an int')
2004 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2007 def test_setegid(self):
2008 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2009 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002010 self.assertRaises(TypeError, os.setegid, 'not an int')
2011 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002012
Serhiy Storchaka43767632013-11-03 21:31:38 +02002013 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2014 def test_setreuid(self):
2015 if os.getuid() != 0:
2016 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002017 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2018 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2019 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2020 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002021
Serhiy Storchaka43767632013-11-03 21:31:38 +02002022 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2023 def test_setreuid_neg1(self):
2024 # Needs to accept -1. We run this in a subprocess to avoid
2025 # altering the test runner's process state (issue8045).
2026 subprocess.check_call([
2027 sys.executable, '-c',
2028 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002029
Serhiy Storchaka43767632013-11-03 21:31:38 +02002030 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2031 def test_setregid(self):
2032 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2033 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002034 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2035 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2036 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2037 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002038
Serhiy Storchaka43767632013-11-03 21:31:38 +02002039 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2040 def test_setregid_neg1(self):
2041 # Needs to accept -1. We run this in a subprocess to avoid
2042 # altering the test runner's process state (issue8045).
2043 subprocess.check_call([
2044 sys.executable, '-c',
2045 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002046
Serhiy Storchaka43767632013-11-03 21:31:38 +02002047@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2048class Pep383Tests(unittest.TestCase):
2049 def setUp(self):
2050 if support.TESTFN_UNENCODABLE:
2051 self.dir = support.TESTFN_UNENCODABLE
2052 elif support.TESTFN_NONASCII:
2053 self.dir = support.TESTFN_NONASCII
2054 else:
2055 self.dir = support.TESTFN
2056 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002057
Serhiy Storchaka43767632013-11-03 21:31:38 +02002058 bytesfn = []
2059 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002060 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002061 fn = os.fsencode(fn)
2062 except UnicodeEncodeError:
2063 return
2064 bytesfn.append(fn)
2065 add_filename(support.TESTFN_UNICODE)
2066 if support.TESTFN_UNENCODABLE:
2067 add_filename(support.TESTFN_UNENCODABLE)
2068 if support.TESTFN_NONASCII:
2069 add_filename(support.TESTFN_NONASCII)
2070 if not bytesfn:
2071 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002072
Serhiy Storchaka43767632013-11-03 21:31:38 +02002073 self.unicodefn = set()
2074 os.mkdir(self.dir)
2075 try:
2076 for fn in bytesfn:
2077 support.create_empty_file(os.path.join(self.bdir, fn))
2078 fn = os.fsdecode(fn)
2079 if fn in self.unicodefn:
2080 raise ValueError("duplicate filename")
2081 self.unicodefn.add(fn)
2082 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002083 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002084 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002085
Serhiy Storchaka43767632013-11-03 21:31:38 +02002086 def tearDown(self):
2087 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002088
Serhiy Storchaka43767632013-11-03 21:31:38 +02002089 def test_listdir(self):
2090 expected = self.unicodefn
2091 found = set(os.listdir(self.dir))
2092 self.assertEqual(found, expected)
2093 # test listdir without arguments
2094 current_directory = os.getcwd()
2095 try:
2096 os.chdir(os.sep)
2097 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2098 finally:
2099 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002100
Serhiy Storchaka43767632013-11-03 21:31:38 +02002101 def test_open(self):
2102 for fn in self.unicodefn:
2103 f = open(os.path.join(self.dir, fn), 'rb')
2104 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002105
Serhiy Storchaka43767632013-11-03 21:31:38 +02002106 @unittest.skipUnless(hasattr(os, 'statvfs'),
2107 "need os.statvfs()")
2108 def test_statvfs(self):
2109 # issue #9645
2110 for fn in self.unicodefn:
2111 # should not fail with file not found error
2112 fullname = os.path.join(self.dir, fn)
2113 os.statvfs(fullname)
2114
2115 def test_stat(self):
2116 for fn in self.unicodefn:
2117 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002118
Brian Curtineb24d742010-04-12 17:16:38 +00002119@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2120class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002121 def _kill(self, sig):
2122 # Start sys.executable as a subprocess and communicate from the
2123 # subprocess to the parent that the interpreter is ready. When it
2124 # becomes ready, send *sig* via os.kill to the subprocess and check
2125 # that the return code is equal to *sig*.
2126 import ctypes
2127 from ctypes import wintypes
2128 import msvcrt
2129
2130 # Since we can't access the contents of the process' stdout until the
2131 # process has exited, use PeekNamedPipe to see what's inside stdout
2132 # without waiting. This is done so we can tell that the interpreter
2133 # is started and running at a point where it could handle a signal.
2134 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2135 PeekNamedPipe.restype = wintypes.BOOL
2136 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2137 ctypes.POINTER(ctypes.c_char), # stdout buf
2138 wintypes.DWORD, # Buffer size
2139 ctypes.POINTER(wintypes.DWORD), # bytes read
2140 ctypes.POINTER(wintypes.DWORD), # bytes avail
2141 ctypes.POINTER(wintypes.DWORD)) # bytes left
2142 msg = "running"
2143 proc = subprocess.Popen([sys.executable, "-c",
2144 "import sys;"
2145 "sys.stdout.write('{}');"
2146 "sys.stdout.flush();"
2147 "input()".format(msg)],
2148 stdout=subprocess.PIPE,
2149 stderr=subprocess.PIPE,
2150 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002151 self.addCleanup(proc.stdout.close)
2152 self.addCleanup(proc.stderr.close)
2153 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002154
2155 count, max = 0, 100
2156 while count < max and proc.poll() is None:
2157 # Create a string buffer to store the result of stdout from the pipe
2158 buf = ctypes.create_string_buffer(len(msg))
2159 # Obtain the text currently in proc.stdout
2160 # Bytes read/avail/left are left as NULL and unused
2161 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2162 buf, ctypes.sizeof(buf), None, None, None)
2163 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2164 if buf.value:
2165 self.assertEqual(msg, buf.value.decode())
2166 break
2167 time.sleep(0.1)
2168 count += 1
2169 else:
2170 self.fail("Did not receive communication from the subprocess")
2171
Brian Curtineb24d742010-04-12 17:16:38 +00002172 os.kill(proc.pid, sig)
2173 self.assertEqual(proc.wait(), sig)
2174
2175 def test_kill_sigterm(self):
2176 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002177 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002178
2179 def test_kill_int(self):
2180 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002181 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002182
2183 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002184 tagname = "test_os_%s" % uuid.uuid1()
2185 m = mmap.mmap(-1, 1, tagname)
2186 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002187 # Run a script which has console control handling enabled.
2188 proc = subprocess.Popen([sys.executable,
2189 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002190 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002191 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2192 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002193 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002194 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002195 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002196 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002197 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002198 count += 1
2199 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002200 # Forcefully kill the process if we weren't able to signal it.
2201 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002202 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002203 os.kill(proc.pid, event)
2204 # proc.send_signal(event) could also be done here.
2205 # Allow time for the signal to be passed and the process to exit.
2206 time.sleep(0.5)
2207 if not proc.poll():
2208 # Forcefully kill the process if we weren't able to signal it.
2209 os.kill(proc.pid, signal.SIGINT)
2210 self.fail("subprocess did not stop on {}".format(name))
2211
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002212 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002213 def test_CTRL_C_EVENT(self):
2214 from ctypes import wintypes
2215 import ctypes
2216
2217 # Make a NULL value by creating a pointer with no argument.
2218 NULL = ctypes.POINTER(ctypes.c_int)()
2219 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2220 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2221 wintypes.BOOL)
2222 SetConsoleCtrlHandler.restype = wintypes.BOOL
2223
2224 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002225 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002226 # by subprocesses.
2227 SetConsoleCtrlHandler(NULL, 0)
2228
2229 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2230
2231 def test_CTRL_BREAK_EVENT(self):
2232 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2233
2234
Brian Curtind40e6f72010-07-08 21:39:08 +00002235@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002236class Win32ListdirTests(unittest.TestCase):
2237 """Test listdir on Windows."""
2238
2239 def setUp(self):
2240 self.created_paths = []
2241 for i in range(2):
2242 dir_name = 'SUB%d' % i
2243 dir_path = os.path.join(support.TESTFN, dir_name)
2244 file_name = 'FILE%d' % i
2245 file_path = os.path.join(support.TESTFN, file_name)
2246 os.makedirs(dir_path)
2247 with open(file_path, 'w') as f:
2248 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2249 self.created_paths.extend([dir_name, file_name])
2250 self.created_paths.sort()
2251
2252 def tearDown(self):
2253 shutil.rmtree(support.TESTFN)
2254
2255 def test_listdir_no_extended_path(self):
2256 """Test when the path is not an "extended" path."""
2257 # unicode
2258 self.assertEqual(
2259 sorted(os.listdir(support.TESTFN)),
2260 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002261
Tim Golden781bbeb2013-10-25 20:24:06 +01002262 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002263 self.assertEqual(
2264 sorted(os.listdir(os.fsencode(support.TESTFN))),
2265 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002266
2267 def test_listdir_extended_path(self):
2268 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002269 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002270 # unicode
2271 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2272 self.assertEqual(
2273 sorted(os.listdir(path)),
2274 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002275
Tim Golden781bbeb2013-10-25 20:24:06 +01002276 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002277 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2278 self.assertEqual(
2279 sorted(os.listdir(path)),
2280 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002281
2282
Berker Peksage0b5b202018-08-15 13:03:41 +03002283@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2284class ReadlinkTests(unittest.TestCase):
2285 filelink = 'readlinktest'
2286 filelink_target = os.path.abspath(__file__)
2287 filelinkb = os.fsencode(filelink)
2288 filelinkb_target = os.fsencode(filelink_target)
2289
Steve Dower9eb3d542019-08-21 15:52:42 -07002290 def assertPathEqual(self, left, right):
2291 left = os.path.normcase(left)
2292 right = os.path.normcase(right)
2293 if sys.platform == 'win32':
2294 # Bad practice to blindly strip the prefix as it may be required to
2295 # correctly refer to the file, but we're only comparing paths here.
2296 has_prefix = lambda p: p.startswith(
2297 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2298 if has_prefix(left):
2299 left = left[4:]
2300 if has_prefix(right):
2301 right = right[4:]
2302 self.assertEqual(left, right)
2303
Berker Peksage0b5b202018-08-15 13:03:41 +03002304 def setUp(self):
2305 self.assertTrue(os.path.exists(self.filelink_target))
2306 self.assertTrue(os.path.exists(self.filelinkb_target))
2307 self.assertFalse(os.path.exists(self.filelink))
2308 self.assertFalse(os.path.exists(self.filelinkb))
2309
2310 def test_not_symlink(self):
2311 filelink_target = FakePath(self.filelink_target)
2312 self.assertRaises(OSError, os.readlink, self.filelink_target)
2313 self.assertRaises(OSError, os.readlink, filelink_target)
2314
2315 def test_missing_link(self):
2316 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2317 self.assertRaises(FileNotFoundError, os.readlink,
2318 FakePath('missing-link'))
2319
2320 @support.skip_unless_symlink
2321 def test_pathlike(self):
2322 os.symlink(self.filelink_target, self.filelink)
2323 self.addCleanup(support.unlink, self.filelink)
2324 filelink = FakePath(self.filelink)
Steve Dower9eb3d542019-08-21 15:52:42 -07002325 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002326
2327 @support.skip_unless_symlink
2328 def test_pathlike_bytes(self):
2329 os.symlink(self.filelinkb_target, self.filelinkb)
2330 self.addCleanup(support.unlink, self.filelinkb)
2331 path = os.readlink(FakePath(self.filelinkb))
Steve Dower9eb3d542019-08-21 15:52:42 -07002332 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002333 self.assertIsInstance(path, bytes)
2334
2335 @support.skip_unless_symlink
2336 def test_bytes(self):
2337 os.symlink(self.filelinkb_target, self.filelinkb)
2338 self.addCleanup(support.unlink, self.filelinkb)
2339 path = os.readlink(self.filelinkb)
Steve Dower9eb3d542019-08-21 15:52:42 -07002340 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002341 self.assertIsInstance(path, bytes)
2342
2343
Tim Golden781bbeb2013-10-25 20:24:06 +01002344@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002345@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002346class Win32SymlinkTests(unittest.TestCase):
2347 filelink = 'filelinktest'
2348 filelink_target = os.path.abspath(__file__)
2349 dirlink = 'dirlinktest'
2350 dirlink_target = os.path.dirname(filelink_target)
2351 missing_link = 'missing link'
2352
2353 def setUp(self):
2354 assert os.path.exists(self.dirlink_target)
2355 assert os.path.exists(self.filelink_target)
2356 assert not os.path.exists(self.dirlink)
2357 assert not os.path.exists(self.filelink)
2358 assert not os.path.exists(self.missing_link)
2359
2360 def tearDown(self):
2361 if os.path.exists(self.filelink):
2362 os.remove(self.filelink)
2363 if os.path.exists(self.dirlink):
2364 os.rmdir(self.dirlink)
2365 if os.path.lexists(self.missing_link):
2366 os.remove(self.missing_link)
2367
2368 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002369 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002370 self.assertTrue(os.path.exists(self.dirlink))
2371 self.assertTrue(os.path.isdir(self.dirlink))
2372 self.assertTrue(os.path.islink(self.dirlink))
2373 self.check_stat(self.dirlink, self.dirlink_target)
2374
2375 def test_file_link(self):
2376 os.symlink(self.filelink_target, self.filelink)
2377 self.assertTrue(os.path.exists(self.filelink))
2378 self.assertTrue(os.path.isfile(self.filelink))
2379 self.assertTrue(os.path.islink(self.filelink))
2380 self.check_stat(self.filelink, self.filelink_target)
2381
2382 def _create_missing_dir_link(self):
2383 'Create a "directory" link to a non-existent target'
2384 linkname = self.missing_link
2385 if os.path.lexists(linkname):
2386 os.remove(linkname)
2387 target = r'c:\\target does not exist.29r3c740'
2388 assert not os.path.exists(target)
2389 target_is_dir = True
2390 os.symlink(target, linkname, target_is_dir)
2391
2392 def test_remove_directory_link_to_missing_target(self):
2393 self._create_missing_dir_link()
2394 # For compatibility with Unix, os.remove will check the
2395 # directory status and call RemoveDirectory if the symlink
2396 # was created with target_is_dir==True.
2397 os.remove(self.missing_link)
2398
Brian Curtind40e6f72010-07-08 21:39:08 +00002399 def test_isdir_on_directory_link_to_missing_target(self):
2400 self._create_missing_dir_link()
Steve Dower9eb3d542019-08-21 15:52:42 -07002401 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002402
Brian Curtind40e6f72010-07-08 21:39:08 +00002403 def test_rmdir_on_directory_link_to_missing_target(self):
2404 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002405 os.rmdir(self.missing_link)
2406
2407 def check_stat(self, link, target):
2408 self.assertEqual(os.stat(link), os.stat(target))
2409 self.assertNotEqual(os.lstat(link), os.stat(link))
2410
Brian Curtind25aef52011-06-13 15:16:04 -05002411 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002412 self.assertEqual(os.stat(bytes_link), os.stat(target))
2413 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002414
2415 def test_12084(self):
2416 level1 = os.path.abspath(support.TESTFN)
2417 level2 = os.path.join(level1, "level2")
2418 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002419 self.addCleanup(support.rmtree, level1)
2420
2421 os.mkdir(level1)
2422 os.mkdir(level2)
2423 os.mkdir(level3)
2424
2425 file1 = os.path.abspath(os.path.join(level1, "file1"))
2426 create_file(file1)
2427
2428 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002429 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002430 os.chdir(level2)
2431 link = os.path.join(level2, "link")
2432 os.symlink(os.path.relpath(file1), "link")
2433 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002434
Victor Stinnerae39d232016-03-24 17:12:55 +01002435 # Check os.stat calls from the same dir as the link
2436 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002437
Victor Stinnerae39d232016-03-24 17:12:55 +01002438 # Check os.stat calls from a dir below the link
2439 os.chdir(level1)
2440 self.assertEqual(os.stat(file1),
2441 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002442
Victor Stinnerae39d232016-03-24 17:12:55 +01002443 # Check os.stat calls from a dir above the link
2444 os.chdir(level3)
2445 self.assertEqual(os.stat(file1),
2446 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002447 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002448 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002449
SSE43c34aad2018-02-13 00:10:35 +07002450 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2451 and os.path.exists(r'C:\ProgramData'),
2452 'Test directories not found')
2453 def test_29248(self):
2454 # os.symlink() calls CreateSymbolicLink, which creates
2455 # the reparse data buffer with the print name stored
2456 # first, so the offset is always 0. CreateSymbolicLink
2457 # stores the "PrintName" DOS path (e.g. "C:\") first,
2458 # with an offset of 0, followed by the "SubstituteName"
2459 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2460 # the other hand, seems to have been created manually
2461 # with an inverted order.
2462 target = os.readlink(r'C:\Users\All Users')
2463 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2464
Steve Dower6921e732018-03-05 14:26:08 -08002465 def test_buffer_overflow(self):
2466 # Older versions would have a buffer overflow when detecting
2467 # whether a link source was a directory. This test ensures we
2468 # no longer crash, but does not otherwise validate the behavior
2469 segment = 'X' * 27
2470 path = os.path.join(*[segment] * 10)
2471 test_cases = [
2472 # overflow with absolute src
2473 ('\\' + path, segment),
2474 # overflow dest with relative src
2475 (segment, path),
2476 # overflow when joining src
2477 (path[:180], path[:180]),
2478 ]
2479 for src, dest in test_cases:
2480 try:
2481 os.symlink(src, dest)
2482 except FileNotFoundError:
2483 pass
2484 else:
2485 try:
2486 os.remove(dest)
2487 except OSError:
2488 pass
2489 # Also test with bytes, since that is a separate code path.
2490 try:
2491 os.symlink(os.fsencode(src), os.fsencode(dest))
2492 except FileNotFoundError:
2493 pass
2494 else:
2495 try:
2496 os.remove(dest)
2497 except OSError:
2498 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002499
Steve Dower9eb3d542019-08-21 15:52:42 -07002500 def test_appexeclink(self):
2501 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Miss Islington (bot)967d6252019-08-21 18:01:22 -07002502 if not os.path.isdir(root):
2503 self.skipTest("test requires a WindowsApps directory")
2504
Steve Dower9eb3d542019-08-21 15:52:42 -07002505 aliases = [os.path.join(root, a)
2506 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2507
2508 for alias in aliases:
2509 if support.verbose:
2510 print()
2511 print("Testing with", alias)
2512 st = os.lstat(alias)
2513 self.assertEqual(st, os.stat(alias))
2514 self.assertFalse(stat.S_ISLNK(st.st_mode))
2515 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2516 # testing the first one we see is sufficient
2517 break
2518 else:
2519 self.skipTest("test requires an app execution alias")
2520
Tim Golden0321cf22014-05-05 19:46:17 +01002521@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2522class Win32JunctionTests(unittest.TestCase):
2523 junction = 'junctiontest'
2524 junction_target = os.path.dirname(os.path.abspath(__file__))
2525
2526 def setUp(self):
2527 assert os.path.exists(self.junction_target)
Steve Dower9eb3d542019-08-21 15:52:42 -07002528 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002529
2530 def tearDown(self):
Steve Dower9eb3d542019-08-21 15:52:42 -07002531 if os.path.lexists(self.junction):
2532 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002533
2534 def test_create_junction(self):
2535 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dower9eb3d542019-08-21 15:52:42 -07002536 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002537 self.assertTrue(os.path.exists(self.junction))
2538 self.assertTrue(os.path.isdir(self.junction))
Steve Dower9eb3d542019-08-21 15:52:42 -07002539 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2540 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002541
Steve Dower9eb3d542019-08-21 15:52:42 -07002542 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002543 self.assertFalse(os.path.islink(self.junction))
Steve Dower9eb3d542019-08-21 15:52:42 -07002544 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2545 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002546
2547 def test_unlink_removes_junction(self):
2548 _winapi.CreateJunction(self.junction_target, self.junction)
2549 self.assertTrue(os.path.exists(self.junction))
Steve Dower9eb3d542019-08-21 15:52:42 -07002550 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002551
2552 os.unlink(self.junction)
2553 self.assertFalse(os.path.exists(self.junction))
2554
Mark Becwarb82bfac2019-02-02 16:08:23 -05002555@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2556class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002557 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002558 nt = support.import_module('nt')
2559 ctypes = support.import_module('ctypes')
2560 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002561
2562 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2563 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2564
2565 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2566 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2567 ctypes.wintypes.LPDWORD)
2568
2569 # This is a pseudo-handle that doesn't need to be closed
2570 hproc = kernel.GetCurrentProcess()
2571
2572 handle_count = ctypes.wintypes.DWORD()
2573 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2574 self.assertEqual(1, ok)
2575
2576 before_count = handle_count.value
2577
2578 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002579 filenames = [
2580 r'\\?\C:',
2581 r'\\?\NUL',
2582 r'\\?\CONIN',
2583 __file__,
2584 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002585
Berker Peksag6ef726a2019-04-22 18:46:28 +03002586 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002587 for name in filenames:
2588 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002589 nt._getfinalpathname(name)
2590 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002591 # Failure is expected
2592 pass
2593 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002594 os.stat(name)
2595 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002596 pass
2597
2598 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2599 self.assertEqual(1, ok)
2600
2601 handle_delta = handle_count.value - before_count
2602
2603 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002604
Jason R. Coombs3a092862013-05-27 23:21:28 -04002605@support.skip_unless_symlink
2606class NonLocalSymlinkTests(unittest.TestCase):
2607
2608 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002609 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002610 Create this structure:
2611
2612 base
2613 \___ some_dir
2614 """
2615 os.makedirs('base/some_dir')
2616
2617 def tearDown(self):
2618 shutil.rmtree('base')
2619
2620 def test_directory_link_nonlocal(self):
2621 """
2622 The symlink target should resolve relative to the link, not relative
2623 to the current directory.
2624
2625 Then, link base/some_link -> base/some_dir and ensure that some_link
2626 is resolved as a directory.
2627
2628 In issue13772, it was discovered that directory detection failed if
2629 the symlink target was not specified relative to the current
2630 directory, which was a defect in the implementation.
2631 """
2632 src = os.path.join('base', 'some_link')
2633 os.symlink('some_dir', src)
2634 assert os.path.isdir(src)
2635
2636
Victor Stinnere8d51452010-08-19 01:05:19 +00002637class FSEncodingTests(unittest.TestCase):
2638 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002639 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2640 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002641
Victor Stinnere8d51452010-08-19 01:05:19 +00002642 def test_identity(self):
2643 # assert fsdecode(fsencode(x)) == x
2644 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2645 try:
2646 bytesfn = os.fsencode(fn)
2647 except UnicodeEncodeError:
2648 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002649 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002650
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002651
Brett Cannonefb00c02012-02-29 18:31:31 -05002652
2653class DeviceEncodingTests(unittest.TestCase):
2654
2655 def test_bad_fd(self):
2656 # Return None when an fd doesn't actually exist.
2657 self.assertIsNone(os.device_encoding(123456))
2658
Paul Monson62dfd7d2019-04-25 11:36:45 -07002659 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002660 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002661 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002662 def test_device_encoding(self):
2663 encoding = os.device_encoding(0)
2664 self.assertIsNotNone(encoding)
2665 self.assertTrue(codecs.lookup(encoding))
2666
2667
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002668class PidTests(unittest.TestCase):
2669 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2670 def test_getppid(self):
2671 p = subprocess.Popen([sys.executable, '-c',
2672 'import os; print(os.getppid())'],
2673 stdout=subprocess.PIPE)
2674 stdout, _ = p.communicate()
2675 # We are the parent of our subprocess
2676 self.assertEqual(int(stdout), os.getpid())
2677
Victor Stinnerb0735092020-04-22 17:57:59 +02002678 def check_waitpid(self, code, exitcode):
2679 if sys.platform == 'win32':
2680 # On Windows, os.spawnv() simply joins arguments with spaces:
2681 # arguments need to be quoted
2682 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2683 else:
2684 args = [sys.executable, '-c', code]
2685 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
2686
2687 pid2, status = os.waitpid(pid, 0)
2688 if sys.platform == 'win32':
2689 self.assertEqual(status, exitcode << 8)
2690 else:
2691 self.assertTrue(os.WIFEXITED(status), status)
2692 self.assertEqual(os.WEXITSTATUS(status), exitcode)
2693 self.assertEqual(pid2, pid)
2694
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002695 def test_waitpid(self):
Victor Stinnerb0735092020-04-22 17:57:59 +02002696 self.check_waitpid(code='pass', exitcode=0)
2697
2698 def test_waitpid_exitcode(self):
2699 exitcode = 23
2700 code = f'import sys; sys.exit({exitcode})'
2701 self.check_waitpid(code, exitcode=exitcode)
2702
2703 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2704 def test_waitpid_windows(self):
2705 # bpo-40138: test os.waitpid() with exit code larger than INT_MAX.
2706 STATUS_CONTROL_C_EXIT = 0xC000013A
2707 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2708 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002709
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002710
Victor Stinner4659ccf2016-09-14 10:57:00 +02002711class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002712 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002713 self.exitcode = 17
2714
2715 filename = support.TESTFN
2716 self.addCleanup(support.unlink, filename)
2717
2718 if not with_env:
2719 code = 'import sys; sys.exit(%s)' % self.exitcode
2720 else:
2721 self.env = dict(os.environ)
2722 # create an unique key
2723 self.key = str(uuid.uuid4())
2724 self.env[self.key] = self.key
2725 # read the variable from os.environ to check that it exists
2726 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2727 % (self.key, self.exitcode))
2728
2729 with open(filename, "w") as fp:
2730 fp.write(code)
2731
Berker Peksag81816462016-09-15 20:19:47 +03002732 args = [sys.executable, filename]
2733 if use_bytes:
2734 args = [os.fsencode(a) for a in args]
2735 self.env = {os.fsencode(k): os.fsencode(v)
2736 for k, v in self.env.items()}
2737
2738 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002739
Berker Peksag4af23d72016-09-15 20:32:44 +03002740 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002741 def test_spawnl(self):
2742 args = self.create_args()
2743 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2744 self.assertEqual(exitcode, self.exitcode)
2745
Berker Peksag4af23d72016-09-15 20:32:44 +03002746 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002747 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002748 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002749 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2750 self.assertEqual(exitcode, self.exitcode)
2751
Berker Peksag4af23d72016-09-15 20:32:44 +03002752 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002753 def test_spawnlp(self):
2754 args = self.create_args()
2755 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2756 self.assertEqual(exitcode, self.exitcode)
2757
Berker Peksag4af23d72016-09-15 20:32:44 +03002758 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002759 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002760 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002761 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2762 self.assertEqual(exitcode, self.exitcode)
2763
Berker Peksag4af23d72016-09-15 20:32:44 +03002764 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002765 def test_spawnv(self):
2766 args = self.create_args()
2767 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2768 self.assertEqual(exitcode, self.exitcode)
2769
Berker Peksag4af23d72016-09-15 20:32:44 +03002770 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002771 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002772 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002773 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2774 self.assertEqual(exitcode, self.exitcode)
2775
Berker Peksag4af23d72016-09-15 20:32:44 +03002776 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002777 def test_spawnvp(self):
2778 args = self.create_args()
2779 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2780 self.assertEqual(exitcode, self.exitcode)
2781
Berker Peksag4af23d72016-09-15 20:32:44 +03002782 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002783 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002784 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002785 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2786 self.assertEqual(exitcode, self.exitcode)
2787
Berker Peksag4af23d72016-09-15 20:32:44 +03002788 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002789 def test_nowait(self):
2790 args = self.create_args()
2791 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2792 result = os.waitpid(pid, 0)
2793 self.assertEqual(result[0], pid)
2794 status = result[1]
2795 if hasattr(os, 'WIFEXITED'):
2796 self.assertTrue(os.WIFEXITED(status))
2797 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2798 else:
2799 self.assertEqual(status, self.exitcode << 8)
2800
Berker Peksag4af23d72016-09-15 20:32:44 +03002801 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002802 def test_spawnve_bytes(self):
2803 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2804 args = self.create_args(with_env=True, use_bytes=True)
2805 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2806 self.assertEqual(exitcode, self.exitcode)
2807
Steve Dower859fd7b2016-11-19 18:53:19 -08002808 @requires_os_func('spawnl')
2809 def test_spawnl_noargs(self):
2810 args = self.create_args()
2811 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002812 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002813
2814 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002815 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002816 args = self.create_args()
2817 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002818 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002819
2820 @requires_os_func('spawnv')
2821 def test_spawnv_noargs(self):
2822 args = self.create_args()
2823 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2824 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002825 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2826 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002827
2828 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002829 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002830 args = self.create_args()
2831 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2832 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002833 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2834 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002835
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002836 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002837 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002838
Ville Skyttä49b27342017-08-03 09:00:59 +03002839 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002840 newenv = os.environ.copy()
2841 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2842 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002843 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002844 except ValueError:
2845 pass
2846 else:
2847 self.assertEqual(exitcode, 127)
2848
Ville Skyttä49b27342017-08-03 09:00:59 +03002849 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002850 newenv = os.environ.copy()
2851 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2852 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002853 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002854 except ValueError:
2855 pass
2856 else:
2857 self.assertEqual(exitcode, 127)
2858
Ville Skyttä49b27342017-08-03 09:00:59 +03002859 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002860 newenv = os.environ.copy()
2861 newenv["FRUIT=ORANGE"] = "lemon"
2862 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002863 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002864 except ValueError:
2865 pass
2866 else:
2867 self.assertEqual(exitcode, 127)
2868
Ville Skyttä49b27342017-08-03 09:00:59 +03002869 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002870 filename = support.TESTFN
2871 self.addCleanup(support.unlink, filename)
2872 with open(filename, "w") as fp:
2873 fp.write('import sys, os\n'
2874 'if os.getenv("FRUIT") != "orange=lemon":\n'
2875 ' raise AssertionError')
2876 args = [sys.executable, filename]
2877 newenv = os.environ.copy()
2878 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002879 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002880 self.assertEqual(exitcode, 0)
2881
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002882 @requires_os_func('spawnve')
2883 def test_spawnve_invalid_env(self):
2884 self._test_invalid_env(os.spawnve)
2885
2886 @requires_os_func('spawnvpe')
2887 def test_spawnvpe_invalid_env(self):
2888 self._test_invalid_env(os.spawnvpe)
2889
Serhiy Storchaka77703942017-06-25 07:33:01 +03002890
Brian Curtin0151b8e2010-09-24 13:43:43 +00002891# The introduction of this TestCase caused at least two different errors on
2892# *nix buildbots. Temporarily skip this to let the buildbots move along.
2893@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002894@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2895class LoginTests(unittest.TestCase):
2896 def test_getlogin(self):
2897 user_name = os.getlogin()
2898 self.assertNotEqual(len(user_name), 0)
2899
2900
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002901@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2902 "needs os.getpriority and os.setpriority")
2903class ProgramPriorityTests(unittest.TestCase):
2904 """Tests for os.getpriority() and os.setpriority()."""
2905
2906 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002907
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002908 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2909 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2910 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002911 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2912 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002913 raise unittest.SkipTest("unable to reliably test setpriority "
2914 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002915 else:
2916 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002917 finally:
2918 try:
2919 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2920 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002921 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002922 raise
2923
2924
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002925class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002926
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002928
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002929 def __init__(self, conn):
2930 asynchat.async_chat.__init__(self, conn)
2931 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002932 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002933 self.closed = False
2934 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002935
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002936 def handle_read(self):
2937 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002938 if self.accumulate:
2939 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002940
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002941 def get_data(self):
2942 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002943
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002944 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002945 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002946 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002947
2948 def handle_error(self):
2949 raise
2950
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002951 def __init__(self, address):
2952 threading.Thread.__init__(self)
2953 asyncore.dispatcher.__init__(self)
2954 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2955 self.bind(address)
2956 self.listen(5)
2957 self.host, self.port = self.socket.getsockname()[:2]
2958 self.handler_instance = None
2959 self._active = False
2960 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002961
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002962 # --- public API
2963
2964 @property
2965 def running(self):
2966 return self._active
2967
2968 def start(self):
2969 assert not self.running
2970 self.__flag = threading.Event()
2971 threading.Thread.start(self)
2972 self.__flag.wait()
2973
2974 def stop(self):
2975 assert self.running
2976 self._active = False
2977 self.join()
2978
2979 def wait(self):
2980 # wait for handler connection to be closed, then stop the server
2981 while not getattr(self.handler_instance, "closed", False):
2982 time.sleep(0.001)
2983 self.stop()
2984
2985 # --- internals
2986
2987 def run(self):
2988 self._active = True
2989 self.__flag.set()
2990 while self._active and asyncore.socket_map:
2991 self._active_lock.acquire()
2992 asyncore.loop(timeout=0.001, count=1)
2993 self._active_lock.release()
2994 asyncore.close_all()
2995
2996 def handle_accept(self):
2997 conn, addr = self.accept()
2998 self.handler_instance = self.Handler(conn)
2999
3000 def handle_connect(self):
3001 self.close()
3002 handle_read = handle_connect
3003
3004 def writable(self):
3005 return 0
3006
3007 def handle_error(self):
3008 raise
3009
3010
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003011@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3012class TestSendfile(unittest.TestCase):
3013
Victor Stinner8c663fd2017-11-08 14:44:44 -08003014 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003015 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003016 not sys.platform.startswith("solaris") and \
3017 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003018 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3019 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003020 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3021 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003022
3023 @classmethod
3024 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003025 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01003026 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003027
3028 @classmethod
3029 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003030 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003031 support.unlink(support.TESTFN)
3032
3033 def setUp(self):
3034 self.server = SendfileTestServer((support.HOST, 0))
3035 self.server.start()
3036 self.client = socket.socket()
3037 self.client.connect((self.server.host, self.server.port))
3038 self.client.settimeout(1)
3039 # synchronize by waiting for "220 ready" response
3040 self.client.recv(1024)
3041 self.sockno = self.client.fileno()
3042 self.file = open(support.TESTFN, 'rb')
3043 self.fileno = self.file.fileno()
3044
3045 def tearDown(self):
3046 self.file.close()
3047 self.client.close()
3048 if self.server.running:
3049 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003050 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003051
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003052 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003053 """A higher level wrapper representing how an application is
3054 supposed to use sendfile().
3055 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003056 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003057 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003058 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003059 except OSError as err:
3060 if err.errno == errno.ECONNRESET:
3061 # disconnected
3062 raise
3063 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3064 # we have to retry send data
3065 continue
3066 else:
3067 raise
3068
3069 def test_send_whole_file(self):
3070 # normal send
3071 total_sent = 0
3072 offset = 0
3073 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003074 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003075 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3076 if sent == 0:
3077 break
3078 offset += sent
3079 total_sent += sent
3080 self.assertTrue(sent <= nbytes)
3081 self.assertEqual(offset, total_sent)
3082
3083 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003084 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003085 self.client.close()
3086 self.server.wait()
3087 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003088 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003089 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003090
3091 def test_send_at_certain_offset(self):
3092 # start sending a file at a certain offset
3093 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003094 offset = len(self.DATA) // 2
3095 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003096 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003097 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003098 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3099 if sent == 0:
3100 break
3101 offset += sent
3102 total_sent += sent
3103 self.assertTrue(sent <= nbytes)
3104
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003105 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003106 self.client.close()
3107 self.server.wait()
3108 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003109 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003110 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003111 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003112 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003113
3114 def test_offset_overflow(self):
3115 # specify an offset > file size
3116 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003117 try:
3118 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3119 except OSError as e:
3120 # Solaris can raise EINVAL if offset >= file length, ignore.
3121 if e.errno != errno.EINVAL:
3122 raise
3123 else:
3124 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003125 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003126 self.client.close()
3127 self.server.wait()
3128 data = self.server.handler_instance.get_data()
3129 self.assertEqual(data, b'')
3130
3131 def test_invalid_offset(self):
3132 with self.assertRaises(OSError) as cm:
3133 os.sendfile(self.sockno, self.fileno, -1, 4096)
3134 self.assertEqual(cm.exception.errno, errno.EINVAL)
3135
Martin Panterbf19d162015-09-09 01:01:13 +00003136 def test_keywords(self):
3137 # Keyword arguments should be supported
3138 os.sendfile(out=self.sockno, offset=0, count=4096,
3139 **{'in': self.fileno})
3140 if self.SUPPORT_HEADERS_TRAILERS:
3141 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003142 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003143
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003144 # --- headers / trailers tests
3145
Serhiy Storchaka43767632013-11-03 21:31:38 +02003146 @requires_headers_trailers
3147 def test_headers(self):
3148 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003149 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003150 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003151 headers=[b"x" * 512, b"y" * 256])
3152 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003153 total_sent += sent
3154 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003155 while total_sent < len(expected_data):
3156 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003157 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3158 offset, nbytes)
3159 if sent == 0:
3160 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003161 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003162 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003163 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003164
Serhiy Storchaka43767632013-11-03 21:31:38 +02003165 self.assertEqual(total_sent, len(expected_data))
3166 self.client.close()
3167 self.server.wait()
3168 data = self.server.handler_instance.get_data()
3169 self.assertEqual(hash(data), hash(expected_data))
3170
3171 @requires_headers_trailers
3172 def test_trailers(self):
3173 TESTFN2 = support.TESTFN + "2"
3174 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003175
3176 self.addCleanup(support.unlink, TESTFN2)
3177 create_file(TESTFN2, file_data)
3178
3179 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003180 os.sendfile(self.sockno, f.fileno(), 0, 5,
3181 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003182 self.client.close()
3183 self.server.wait()
3184 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003185 self.assertEqual(data, b"abcde123456789")
3186
3187 @requires_headers_trailers
3188 @requires_32b
3189 def test_headers_overflow_32bits(self):
3190 self.server.handler_instance.accumulate = False
3191 with self.assertRaises(OSError) as cm:
3192 os.sendfile(self.sockno, self.fileno, 0, 0,
3193 headers=[b"x" * 2**16] * 2**15)
3194 self.assertEqual(cm.exception.errno, errno.EINVAL)
3195
3196 @requires_headers_trailers
3197 @requires_32b
3198 def test_trailers_overflow_32bits(self):
3199 self.server.handler_instance.accumulate = False
3200 with self.assertRaises(OSError) as cm:
3201 os.sendfile(self.sockno, self.fileno, 0, 0,
3202 trailers=[b"x" * 2**16] * 2**15)
3203 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003204
Serhiy Storchaka43767632013-11-03 21:31:38 +02003205 @requires_headers_trailers
3206 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3207 'test needs os.SF_NODISKIO')
3208 def test_flags(self):
3209 try:
3210 os.sendfile(self.sockno, self.fileno, 0, 4096,
3211 flags=os.SF_NODISKIO)
3212 except OSError as err:
3213 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3214 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003215
3216
Larry Hastings9cf065c2012-06-22 16:30:09 -07003217def supports_extended_attributes():
3218 if not hasattr(os, "setxattr"):
3219 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003220
Larry Hastings9cf065c2012-06-22 16:30:09 -07003221 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003222 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003223 try:
3224 os.setxattr(fp.fileno(), b"user.test", b"")
3225 except OSError:
3226 return False
3227 finally:
3228 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003229
3230 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003231
3232
3233@unittest.skipUnless(supports_extended_attributes(),
3234 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003235# Kernels < 2.6.39 don't respect setxattr flags.
3236@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003237class ExtendedAttributeTests(unittest.TestCase):
3238
Larry Hastings9cf065c2012-06-22 16:30:09 -07003239 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003240 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003241 self.addCleanup(support.unlink, fn)
3242 create_file(fn)
3243
Benjamin Peterson799bd802011-08-31 22:15:17 -04003244 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003245 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003246 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003247
Victor Stinnerf12e5062011-10-16 22:12:03 +02003248 init_xattr = listxattr(fn)
3249 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003250
Larry Hastings9cf065c2012-06-22 16:30:09 -07003251 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003252 xattr = set(init_xattr)
3253 xattr.add("user.test")
3254 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003255 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3256 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3257 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003258
Benjamin Peterson799bd802011-08-31 22:15:17 -04003259 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003260 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003261 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003262
Benjamin Peterson799bd802011-08-31 22:15:17 -04003263 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003264 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003265 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003266
Larry Hastings9cf065c2012-06-22 16:30:09 -07003267 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003268 xattr.add("user.test2")
3269 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003270 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003271
Benjamin Peterson799bd802011-08-31 22:15:17 -04003272 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003273 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003274 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003275
Victor Stinnerf12e5062011-10-16 22:12:03 +02003276 xattr.remove("user.test")
3277 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003278 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3279 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3280 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3281 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003282 many = sorted("user.test{}".format(i) for i in range(100))
3283 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003284 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003285 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003286
Larry Hastings9cf065c2012-06-22 16:30:09 -07003287 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003288 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003289 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003290
3291 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3292 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003293
3294 def test_simple(self):
3295 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3296 os.listxattr)
3297
3298 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003299 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3300 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003301
3302 def test_fds(self):
3303 def getxattr(path, *args):
3304 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003305 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003306 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003307 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003308 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003309 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003310 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003311 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003312 def listxattr(path, *args):
3313 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003314 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003315 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3316
3317
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003318@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3319class TermsizeTests(unittest.TestCase):
3320 def test_does_not_crash(self):
3321 """Check if get_terminal_size() returns a meaningful value.
3322
3323 There's no easy portable way to actually check the size of the
3324 terminal, so let's check if it returns something sensible instead.
3325 """
3326 try:
3327 size = os.get_terminal_size()
3328 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003329 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003330 # Under win32 a generic OSError can be thrown if the
3331 # handle cannot be retrieved
3332 self.skipTest("failed to query terminal size")
3333 raise
3334
Antoine Pitroucfade362012-02-08 23:48:59 +01003335 self.assertGreaterEqual(size.columns, 0)
3336 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003337
3338 def test_stty_match(self):
3339 """Check if stty returns the same results
3340
3341 stty actually tests stdin, so get_terminal_size is invoked on
3342 stdin explicitly. If stty succeeded, then get_terminal_size()
3343 should work too.
3344 """
3345 try:
3346 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003347 except (FileNotFoundError, subprocess.CalledProcessError,
3348 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003349 self.skipTest("stty invocation failed")
3350 expected = (int(size[1]), int(size[0])) # reversed order
3351
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003352 try:
3353 actual = os.get_terminal_size(sys.__stdin__.fileno())
3354 except OSError as e:
3355 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3356 # Under win32 a generic OSError can be thrown if the
3357 # handle cannot be retrieved
3358 self.skipTest("failed to query terminal size")
3359 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003360 self.assertEqual(expected, actual)
3361
3362
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003363@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003364@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003365class MemfdCreateTests(unittest.TestCase):
3366 def test_memfd_create(self):
3367 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3368 self.assertNotEqual(fd, -1)
3369 self.addCleanup(os.close, fd)
3370 self.assertFalse(os.get_inheritable(fd))
3371 with open(fd, "wb", closefd=False) as f:
3372 f.write(b'memfd_create')
3373 self.assertEqual(f.tell(), 12)
3374
3375 fd2 = os.memfd_create("Hi")
3376 self.addCleanup(os.close, fd2)
3377 self.assertFalse(os.get_inheritable(fd2))
3378
3379
Victor Stinner292c8352012-10-30 02:17:38 +01003380class OSErrorTests(unittest.TestCase):
3381 def setUp(self):
3382 class Str(str):
3383 pass
3384
Victor Stinnerafe17062012-10-31 22:47:43 +01003385 self.bytes_filenames = []
3386 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003387 if support.TESTFN_UNENCODABLE is not None:
3388 decoded = support.TESTFN_UNENCODABLE
3389 else:
3390 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003391 self.unicode_filenames.append(decoded)
3392 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003393 if support.TESTFN_UNDECODABLE is not None:
3394 encoded = support.TESTFN_UNDECODABLE
3395 else:
3396 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003397 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003398 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003399 self.bytes_filenames.append(memoryview(encoded))
3400
3401 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003402
3403 def test_oserror_filename(self):
3404 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003405 (self.filenames, os.chdir,),
3406 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003407 (self.filenames, os.lstat,),
3408 (self.filenames, os.open, os.O_RDONLY),
3409 (self.filenames, os.rmdir,),
3410 (self.filenames, os.stat,),
3411 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003412 ]
3413 if sys.platform == "win32":
3414 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003415 (self.bytes_filenames, os.rename, b"dst"),
3416 (self.bytes_filenames, os.replace, b"dst"),
3417 (self.unicode_filenames, os.rename, "dst"),
3418 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003419 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003420 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003421 else:
3422 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003423 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003424 (self.filenames, os.rename, "dst"),
3425 (self.filenames, os.replace, "dst"),
3426 ))
3427 if hasattr(os, "chown"):
3428 funcs.append((self.filenames, os.chown, 0, 0))
3429 if hasattr(os, "lchown"):
3430 funcs.append((self.filenames, os.lchown, 0, 0))
3431 if hasattr(os, "truncate"):
3432 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003433 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003434 funcs.append((self.filenames, os.chflags, 0))
3435 if hasattr(os, "lchflags"):
3436 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003437 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003438 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003439 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003440 if sys.platform == "win32":
3441 funcs.append((self.bytes_filenames, os.link, b"dst"))
3442 funcs.append((self.unicode_filenames, os.link, "dst"))
3443 else:
3444 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003445 if hasattr(os, "listxattr"):
3446 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003447 (self.filenames, os.listxattr,),
3448 (self.filenames, os.getxattr, "user.test"),
3449 (self.filenames, os.setxattr, "user.test", b'user'),
3450 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003451 ))
3452 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003453 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003454 if hasattr(os, "readlink"):
Miss Islington (bot)c30c8692019-08-21 14:09:33 -07003455 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003456
Steve Dowercc16be82016-09-08 10:35:16 -07003457
Victor Stinnerafe17062012-10-31 22:47:43 +01003458 for filenames, func, *func_args in funcs:
3459 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003460 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003461 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003462 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003463 else:
3464 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3465 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003466 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003467 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003468 except UnicodeDecodeError:
3469 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003470 else:
3471 self.fail("No exception thrown by {}".format(func))
3472
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003473class CPUCountTests(unittest.TestCase):
3474 def test_cpu_count(self):
3475 cpus = os.cpu_count()
3476 if cpus is not None:
3477 self.assertIsInstance(cpus, int)
3478 self.assertGreater(cpus, 0)
3479 else:
3480 self.skipTest("Could not determine the number of CPUs")
3481
Victor Stinnerdaf45552013-08-28 00:53:59 +02003482
3483class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003484 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003485 fd = os.open(__file__, os.O_RDONLY)
3486 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003487 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003488
Victor Stinnerdaf45552013-08-28 00:53:59 +02003489 os.set_inheritable(fd, True)
3490 self.assertEqual(os.get_inheritable(fd), True)
3491
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003492 @unittest.skipIf(fcntl is None, "need fcntl")
3493 def test_get_inheritable_cloexec(self):
3494 fd = os.open(__file__, os.O_RDONLY)
3495 self.addCleanup(os.close, fd)
3496 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003497
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003498 # clear FD_CLOEXEC flag
3499 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3500 flags &= ~fcntl.FD_CLOEXEC
3501 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003502
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003503 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003504
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003505 @unittest.skipIf(fcntl is None, "need fcntl")
3506 def test_set_inheritable_cloexec(self):
3507 fd = os.open(__file__, os.O_RDONLY)
3508 self.addCleanup(os.close, fd)
3509 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3510 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003511
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003512 os.set_inheritable(fd, True)
3513 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3514 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003515
Victor Stinnerdaf45552013-08-28 00:53:59 +02003516 def test_open(self):
3517 fd = os.open(__file__, os.O_RDONLY)
3518 self.addCleanup(os.close, fd)
3519 self.assertEqual(os.get_inheritable(fd), False)
3520
3521 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3522 def test_pipe(self):
3523 rfd, wfd = os.pipe()
3524 self.addCleanup(os.close, rfd)
3525 self.addCleanup(os.close, wfd)
3526 self.assertEqual(os.get_inheritable(rfd), False)
3527 self.assertEqual(os.get_inheritable(wfd), False)
3528
3529 def test_dup(self):
3530 fd1 = os.open(__file__, os.O_RDONLY)
3531 self.addCleanup(os.close, fd1)
3532
3533 fd2 = os.dup(fd1)
3534 self.addCleanup(os.close, fd2)
3535 self.assertEqual(os.get_inheritable(fd2), False)
3536
Miss Islington (bot)3921d122019-08-23 12:04:27 -07003537 def test_dup_standard_stream(self):
3538 fd = os.dup(1)
3539 self.addCleanup(os.close, fd)
3540 self.assertGreater(fd, 0)
3541
Miss Islington (bot)693945d2019-06-17 01:45:26 -07003542 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3543 def test_dup_nul(self):
3544 # os.dup() was creating inheritable fds for character files.
3545 fd1 = os.open('NUL', os.O_RDONLY)
3546 self.addCleanup(os.close, fd1)
3547 fd2 = os.dup(fd1)
3548 self.addCleanup(os.close, fd2)
3549 self.assertFalse(os.get_inheritable(fd2))
3550
Victor Stinnerdaf45552013-08-28 00:53:59 +02003551 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3552 def test_dup2(self):
3553 fd = os.open(__file__, os.O_RDONLY)
3554 self.addCleanup(os.close, fd)
3555
3556 # inheritable by default
3557 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003558 self.addCleanup(os.close, fd2)
3559 self.assertEqual(os.dup2(fd, fd2), fd2)
3560 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003561
3562 # force non-inheritable
3563 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003564 self.addCleanup(os.close, fd3)
3565 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3566 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003567
3568 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3569 def test_openpty(self):
3570 master_fd, slave_fd = os.openpty()
3571 self.addCleanup(os.close, master_fd)
3572 self.addCleanup(os.close, slave_fd)
3573 self.assertEqual(os.get_inheritable(master_fd), False)
3574 self.assertEqual(os.get_inheritable(slave_fd), False)
3575
3576
Brett Cannon3f9183b2016-08-26 14:44:48 -07003577class PathTConverterTests(unittest.TestCase):
3578 # tuples of (function name, allows fd arguments, additional arguments to
3579 # function, cleanup function)
3580 functions = [
3581 ('stat', True, (), None),
3582 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003583 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003584 ('chflags', False, (0,), None),
3585 ('lchflags', False, (0,), None),
3586 ('open', False, (0,), getattr(os, 'close', None)),
3587 ]
3588
3589 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003590 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003591 if os.name == 'nt':
3592 bytes_fspath = bytes_filename = None
3593 else:
3594 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003595 bytes_fspath = FakePath(bytes_filename)
3596 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003597 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003598 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003599
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003600 int_fspath = FakePath(fd)
3601 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003602
3603 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3604 with self.subTest(name=name):
3605 try:
3606 fn = getattr(os, name)
3607 except AttributeError:
3608 continue
3609
Brett Cannon8f96a302016-08-26 19:30:11 -07003610 for path in (str_filename, bytes_filename, str_fspath,
3611 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003612 if path is None:
3613 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003614 with self.subTest(name=name, path=path):
3615 result = fn(path, *extra_args)
3616 if cleanup_fn is not None:
3617 cleanup_fn(result)
3618
3619 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003620 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003621 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003622
3623 if allow_fd:
3624 result = fn(fd, *extra_args) # should not fail
3625 if cleanup_fn is not None:
3626 cleanup_fn(result)
3627 else:
3628 with self.assertRaisesRegex(
3629 TypeError,
3630 'os.PathLike'):
3631 fn(fd, *extra_args)
3632
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003633 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003634 msg = r'__fspath__\(\) to return str or bytes, not %s'
3635 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003636 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003637 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003638 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003639 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003640 os.stat(FakePath(object()))
3641
Brett Cannon3f9183b2016-08-26 14:44:48 -07003642
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003643@unittest.skipUnless(hasattr(os, 'get_blocking'),
3644 'needs os.get_blocking() and os.set_blocking()')
3645class BlockingTests(unittest.TestCase):
3646 def test_blocking(self):
3647 fd = os.open(__file__, os.O_RDONLY)
3648 self.addCleanup(os.close, fd)
3649 self.assertEqual(os.get_blocking(fd), True)
3650
3651 os.set_blocking(fd, False)
3652 self.assertEqual(os.get_blocking(fd), False)
3653
3654 os.set_blocking(fd, True)
3655 self.assertEqual(os.get_blocking(fd), True)
3656
3657
Yury Selivanov97e2e062014-09-26 12:33:06 -04003658
3659class ExportsTests(unittest.TestCase):
3660 def test_os_all(self):
3661 self.assertIn('open', os.__all__)
3662 self.assertIn('walk', os.__all__)
3663
3664
Victor Stinner6036e442015-03-08 01:58:04 +01003665class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003666 check_no_resource_warning = support.check_no_resource_warning
3667
Victor Stinner6036e442015-03-08 01:58:04 +01003668 def setUp(self):
3669 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003670 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003671 self.addCleanup(support.rmtree, self.path)
3672 os.mkdir(self.path)
3673
3674 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003675 path = self.bytes_path if isinstance(name, bytes) else self.path
3676 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003677 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003678 return filename
3679
3680 def get_entries(self, names):
3681 entries = dict((entry.name, entry)
3682 for entry in os.scandir(self.path))
3683 self.assertEqual(sorted(entries.keys()), names)
3684 return entries
3685
3686 def assert_stat_equal(self, stat1, stat2, skip_fields):
3687 if skip_fields:
3688 for attr in dir(stat1):
3689 if not attr.startswith("st_"):
3690 continue
3691 if attr in ("st_dev", "st_ino", "st_nlink"):
3692 continue
3693 self.assertEqual(getattr(stat1, attr),
3694 getattr(stat2, attr),
3695 (stat1, stat2, attr))
3696 else:
3697 self.assertEqual(stat1, stat2)
3698
3699 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003700 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003701 self.assertEqual(entry.name, name)
3702 self.assertEqual(entry.path, os.path.join(self.path, name))
3703 self.assertEqual(entry.inode(),
3704 os.stat(entry.path, follow_symlinks=False).st_ino)
3705
3706 entry_stat = os.stat(entry.path)
3707 self.assertEqual(entry.is_dir(),
3708 stat.S_ISDIR(entry_stat.st_mode))
3709 self.assertEqual(entry.is_file(),
3710 stat.S_ISREG(entry_stat.st_mode))
3711 self.assertEqual(entry.is_symlink(),
3712 os.path.islink(entry.path))
3713
3714 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3715 self.assertEqual(entry.is_dir(follow_symlinks=False),
3716 stat.S_ISDIR(entry_lstat.st_mode))
3717 self.assertEqual(entry.is_file(follow_symlinks=False),
3718 stat.S_ISREG(entry_lstat.st_mode))
3719
3720 self.assert_stat_equal(entry.stat(),
3721 entry_stat,
3722 os.name == 'nt' and not is_symlink)
3723 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3724 entry_lstat,
3725 os.name == 'nt')
3726
3727 def test_attributes(self):
3728 link = hasattr(os, 'link')
3729 symlink = support.can_symlink()
3730
3731 dirname = os.path.join(self.path, "dir")
3732 os.mkdir(dirname)
3733 filename = self.create_file("file.txt")
3734 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003735 try:
3736 os.link(filename, os.path.join(self.path, "link_file.txt"))
3737 except PermissionError as e:
3738 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003739 if symlink:
3740 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3741 target_is_directory=True)
3742 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3743
3744 names = ['dir', 'file.txt']
3745 if link:
3746 names.append('link_file.txt')
3747 if symlink:
3748 names.extend(('symlink_dir', 'symlink_file.txt'))
3749 entries = self.get_entries(names)
3750
3751 entry = entries['dir']
3752 self.check_entry(entry, 'dir', True, False, False)
3753
3754 entry = entries['file.txt']
3755 self.check_entry(entry, 'file.txt', False, True, False)
3756
3757 if link:
3758 entry = entries['link_file.txt']
3759 self.check_entry(entry, 'link_file.txt', False, True, False)
3760
3761 if symlink:
3762 entry = entries['symlink_dir']
3763 self.check_entry(entry, 'symlink_dir', True, False, True)
3764
3765 entry = entries['symlink_file.txt']
3766 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3767
3768 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003769 path = self.bytes_path if isinstance(name, bytes) else self.path
3770 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003771 self.assertEqual(len(entries), 1)
3772
3773 entry = entries[0]
3774 self.assertEqual(entry.name, name)
3775 return entry
3776
Brett Cannon96881cd2016-06-10 14:37:21 -07003777 def create_file_entry(self, name='file.txt'):
3778 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003779 return self.get_entry(os.path.basename(filename))
3780
3781 def test_current_directory(self):
3782 filename = self.create_file()
3783 old_dir = os.getcwd()
3784 try:
3785 os.chdir(self.path)
3786
3787 # call scandir() without parameter: it must list the content
3788 # of the current directory
3789 entries = dict((entry.name, entry) for entry in os.scandir())
3790 self.assertEqual(sorted(entries.keys()),
3791 [os.path.basename(filename)])
3792 finally:
3793 os.chdir(old_dir)
3794
3795 def test_repr(self):
3796 entry = self.create_file_entry()
3797 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3798
Brett Cannon96881cd2016-06-10 14:37:21 -07003799 def test_fspath_protocol(self):
3800 entry = self.create_file_entry()
3801 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3802
3803 def test_fspath_protocol_bytes(self):
3804 bytes_filename = os.fsencode('bytesfile.txt')
3805 bytes_entry = self.create_file_entry(name=bytes_filename)
3806 fspath = os.fspath(bytes_entry)
3807 self.assertIsInstance(fspath, bytes)
3808 self.assertEqual(fspath,
3809 os.path.join(os.fsencode(self.path),bytes_filename))
3810
Victor Stinner6036e442015-03-08 01:58:04 +01003811 def test_removed_dir(self):
3812 path = os.path.join(self.path, 'dir')
3813
3814 os.mkdir(path)
3815 entry = self.get_entry('dir')
3816 os.rmdir(path)
3817
3818 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3819 if os.name == 'nt':
3820 self.assertTrue(entry.is_dir())
3821 self.assertFalse(entry.is_file())
3822 self.assertFalse(entry.is_symlink())
3823 if os.name == 'nt':
3824 self.assertRaises(FileNotFoundError, entry.inode)
3825 # don't fail
3826 entry.stat()
3827 entry.stat(follow_symlinks=False)
3828 else:
3829 self.assertGreater(entry.inode(), 0)
3830 self.assertRaises(FileNotFoundError, entry.stat)
3831 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3832
3833 def test_removed_file(self):
3834 entry = self.create_file_entry()
3835 os.unlink(entry.path)
3836
3837 self.assertFalse(entry.is_dir())
3838 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3839 if os.name == 'nt':
3840 self.assertTrue(entry.is_file())
3841 self.assertFalse(entry.is_symlink())
3842 if os.name == 'nt':
3843 self.assertRaises(FileNotFoundError, entry.inode)
3844 # don't fail
3845 entry.stat()
3846 entry.stat(follow_symlinks=False)
3847 else:
3848 self.assertGreater(entry.inode(), 0)
3849 self.assertRaises(FileNotFoundError, entry.stat)
3850 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3851
3852 def test_broken_symlink(self):
3853 if not support.can_symlink():
3854 return self.skipTest('cannot create symbolic link')
3855
3856 filename = self.create_file("file.txt")
3857 os.symlink(filename,
3858 os.path.join(self.path, "symlink.txt"))
3859 entries = self.get_entries(['file.txt', 'symlink.txt'])
3860 entry = entries['symlink.txt']
3861 os.unlink(filename)
3862
3863 self.assertGreater(entry.inode(), 0)
3864 self.assertFalse(entry.is_dir())
3865 self.assertFalse(entry.is_file()) # broken symlink returns False
3866 self.assertFalse(entry.is_dir(follow_symlinks=False))
3867 self.assertFalse(entry.is_file(follow_symlinks=False))
3868 self.assertTrue(entry.is_symlink())
3869 self.assertRaises(FileNotFoundError, entry.stat)
3870 # don't fail
3871 entry.stat(follow_symlinks=False)
3872
3873 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003874 self.create_file("file.txt")
3875
3876 path_bytes = os.fsencode(self.path)
3877 entries = list(os.scandir(path_bytes))
3878 self.assertEqual(len(entries), 1, entries)
3879 entry = entries[0]
3880
3881 self.assertEqual(entry.name, b'file.txt')
3882 self.assertEqual(entry.path,
3883 os.fsencode(os.path.join(self.path, 'file.txt')))
3884
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003885 def test_bytes_like(self):
3886 self.create_file("file.txt")
3887
3888 for cls in bytearray, memoryview:
3889 path_bytes = cls(os.fsencode(self.path))
3890 with self.assertWarns(DeprecationWarning):
3891 entries = list(os.scandir(path_bytes))
3892 self.assertEqual(len(entries), 1, entries)
3893 entry = entries[0]
3894
3895 self.assertEqual(entry.name, b'file.txt')
3896 self.assertEqual(entry.path,
3897 os.fsencode(os.path.join(self.path, 'file.txt')))
3898 self.assertIs(type(entry.name), bytes)
3899 self.assertIs(type(entry.path), bytes)
3900
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003901 @unittest.skipUnless(os.listdir in os.supports_fd,
3902 'fd support for listdir required for this test.')
3903 def test_fd(self):
3904 self.assertIn(os.scandir, os.supports_fd)
3905 self.create_file('file.txt')
3906 expected_names = ['file.txt']
3907 if support.can_symlink():
3908 os.symlink('file.txt', os.path.join(self.path, 'link'))
3909 expected_names.append('link')
3910
3911 fd = os.open(self.path, os.O_RDONLY)
3912 try:
3913 with os.scandir(fd) as it:
3914 entries = list(it)
3915 names = [entry.name for entry in entries]
3916 self.assertEqual(sorted(names), expected_names)
3917 self.assertEqual(names, os.listdir(fd))
3918 for entry in entries:
3919 self.assertEqual(entry.path, entry.name)
3920 self.assertEqual(os.fspath(entry), entry.name)
3921 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3922 if os.stat in os.supports_dir_fd:
3923 st = os.stat(entry.name, dir_fd=fd)
3924 self.assertEqual(entry.stat(), st)
3925 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3926 self.assertEqual(entry.stat(follow_symlinks=False), st)
3927 finally:
3928 os.close(fd)
3929
Victor Stinner6036e442015-03-08 01:58:04 +01003930 def test_empty_path(self):
3931 self.assertRaises(FileNotFoundError, os.scandir, '')
3932
3933 def test_consume_iterator_twice(self):
3934 self.create_file("file.txt")
3935 iterator = os.scandir(self.path)
3936
3937 entries = list(iterator)
3938 self.assertEqual(len(entries), 1, entries)
3939
3940 # check than consuming the iterator twice doesn't raise exception
3941 entries2 = list(iterator)
3942 self.assertEqual(len(entries2), 0, entries2)
3943
3944 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003945 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003946 self.assertRaises(TypeError, os.scandir, obj)
3947
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003948 def test_close(self):
3949 self.create_file("file.txt")
3950 self.create_file("file2.txt")
3951 iterator = os.scandir(self.path)
3952 next(iterator)
3953 iterator.close()
3954 # multiple closes
3955 iterator.close()
3956 with self.check_no_resource_warning():
3957 del iterator
3958
3959 def test_context_manager(self):
3960 self.create_file("file.txt")
3961 self.create_file("file2.txt")
3962 with os.scandir(self.path) as iterator:
3963 next(iterator)
3964 with self.check_no_resource_warning():
3965 del iterator
3966
3967 def test_context_manager_close(self):
3968 self.create_file("file.txt")
3969 self.create_file("file2.txt")
3970 with os.scandir(self.path) as iterator:
3971 next(iterator)
3972 iterator.close()
3973
3974 def test_context_manager_exception(self):
3975 self.create_file("file.txt")
3976 self.create_file("file2.txt")
3977 with self.assertRaises(ZeroDivisionError):
3978 with os.scandir(self.path) as iterator:
3979 next(iterator)
3980 1/0
3981 with self.check_no_resource_warning():
3982 del iterator
3983
3984 def test_resource_warning(self):
3985 self.create_file("file.txt")
3986 self.create_file("file2.txt")
3987 iterator = os.scandir(self.path)
3988 next(iterator)
3989 with self.assertWarns(ResourceWarning):
3990 del iterator
3991 support.gc_collect()
3992 # exhausted iterator
3993 iterator = os.scandir(self.path)
3994 list(iterator)
3995 with self.check_no_resource_warning():
3996 del iterator
3997
Victor Stinner6036e442015-03-08 01:58:04 +01003998
Ethan Furmancdc08792016-06-02 15:06:09 -07003999class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004000
4001 # Abstracted so it can be overridden to test pure Python implementation
4002 # if a C version is provided.
4003 fspath = staticmethod(os.fspath)
4004
Ethan Furmancdc08792016-06-02 15:06:09 -07004005 def test_return_bytes(self):
4006 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004007 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004008
4009 def test_return_string(self):
4010 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004011 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004012
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004013 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004014 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004015 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004016
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004017 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004018 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4019 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4020
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004021 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004022 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4023 self.assertTrue(issubclass(FakePath, os.PathLike))
4024 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004025
Ethan Furmancdc08792016-06-02 15:06:09 -07004026 def test_garbage_in_exception_out(self):
4027 vapor = type('blah', (), {})
4028 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004029 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004030
4031 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004032 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004033
Brett Cannon044283a2016-07-15 10:41:49 -07004034 def test_bad_pathlike(self):
4035 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004036 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004037 # __fspath__ attribute that is not callable.
4038 c = type('foo', (), {})
4039 c.__fspath__ = 1
4040 self.assertRaises(TypeError, self.fspath, c())
4041 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004042 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004043 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004044
Bar Harel0846e5d2019-12-23 20:31:00 +02004045 def test_pathlike_subclasshook(self):
4046 # bpo-38878: subclasshook causes subclass checks
4047 # true on abstract implementation.
4048 class A(os.PathLike):
4049 pass
4050 self.assertFalse(issubclass(FakePath, A))
4051 self.assertTrue(issubclass(FakePath, os.PathLike))
4052
Victor Stinnerc29b5852017-11-02 07:28:27 -07004053
4054class TimesTests(unittest.TestCase):
4055 def test_times(self):
4056 times = os.times()
4057 self.assertIsInstance(times, os.times_result)
4058
4059 for field in ('user', 'system', 'children_user', 'children_system',
4060 'elapsed'):
4061 value = getattr(times, field)
4062 self.assertIsInstance(value, float)
4063
4064 if os.name == 'nt':
4065 self.assertEqual(times.children_user, 0)
4066 self.assertEqual(times.children_system, 0)
4067 self.assertEqual(times.elapsed, 0)
4068
4069
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004070# Only test if the C version is provided, otherwise TestPEP519 already tested
4071# the pure Python implementation.
4072if hasattr(os, "_fspath"):
4073 class TestPEP519PurePython(TestPEP519):
4074
4075 """Explicitly test the pure Python implementation of os.fspath()."""
4076
4077 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004078
4079
Fred Drake2e2be372001-09-20 21:33:42 +00004080if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004081 unittest.main()