blob: 9e3a1695dfb34ec162c810ca1c8cbbea0f9f76ae [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200109 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Victor Stinner689830e2019-06-26 17:31:12 +0200144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Steve Dower772ec0f2019-09-04 14:42:54 -0700594 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
595 def test_stat_block_device(self):
596 # bpo-38030: os.stat fails for block devices
597 # Test a filename like "//./C:"
598 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
599 result = os.stat(fname)
600 self.assertEqual(result.st_mode, stat.S_IFBLK)
601
Victor Stinner47aacc82015-06-12 17:26:23 +0200602
603class UtimeTests(unittest.TestCase):
604 def setUp(self):
605 self.dirname = support.TESTFN
606 self.fname = os.path.join(self.dirname, "f1")
607
608 self.addCleanup(support.rmtree, self.dirname)
609 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100610 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200611
Victor Stinner47aacc82015-06-12 17:26:23 +0200612 def support_subsecond(self, filename):
613 # Heuristic to check if the filesystem supports timestamp with
614 # subsecond resolution: check if float and int timestamps are different
615 st = os.stat(filename)
616 return ((st.st_atime != st[7])
617 or (st.st_mtime != st[8])
618 or (st.st_ctime != st[9]))
619
620 def _test_utime(self, set_time, filename=None):
621 if not filename:
622 filename = self.fname
623
624 support_subsecond = self.support_subsecond(filename)
625 if support_subsecond:
626 # Timestamp with a resolution of 1 microsecond (10^-6).
627 #
628 # The resolution of the C internal function used by os.utime()
629 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
630 # test with a resolution of 1 ns requires more work:
631 # see the issue #15745.
632 atime_ns = 1002003000 # 1.002003 seconds
633 mtime_ns = 4005006000 # 4.005006 seconds
634 else:
635 # use a resolution of 1 second
636 atime_ns = 5 * 10**9
637 mtime_ns = 8 * 10**9
638
639 set_time(filename, (atime_ns, mtime_ns))
640 st = os.stat(filename)
641
642 if support_subsecond:
643 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
644 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
645 else:
646 self.assertEqual(st.st_atime, atime_ns * 1e-9)
647 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
648 self.assertEqual(st.st_atime_ns, atime_ns)
649 self.assertEqual(st.st_mtime_ns, mtime_ns)
650
651 def test_utime(self):
652 def set_time(filename, ns):
653 # test the ns keyword parameter
654 os.utime(filename, ns=ns)
655 self._test_utime(set_time)
656
657 @staticmethod
658 def ns_to_sec(ns):
659 # Convert a number of nanosecond (int) to a number of seconds (float).
660 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
661 # issue, os.utime() rounds towards minus infinity.
662 return (ns * 1e-9) + 0.5e-9
663
664 def test_utime_by_indexed(self):
665 # pass times as floating point seconds as the second indexed parameter
666 def set_time(filename, ns):
667 atime_ns, mtime_ns = ns
668 atime = self.ns_to_sec(atime_ns)
669 mtime = self.ns_to_sec(mtime_ns)
670 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
671 # or utime(time_t)
672 os.utime(filename, (atime, mtime))
673 self._test_utime(set_time)
674
675 def test_utime_by_times(self):
676 def set_time(filename, ns):
677 atime_ns, mtime_ns = ns
678 atime = self.ns_to_sec(atime_ns)
679 mtime = self.ns_to_sec(mtime_ns)
680 # test the times keyword parameter
681 os.utime(filename, times=(atime, mtime))
682 self._test_utime(set_time)
683
684 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
685 "follow_symlinks support for utime required "
686 "for this test.")
687 def test_utime_nofollow_symlinks(self):
688 def set_time(filename, ns):
689 # use follow_symlinks=False to test utimensat(timespec)
690 # or lutimes(timeval)
691 os.utime(filename, ns=ns, follow_symlinks=False)
692 self._test_utime(set_time)
693
694 @unittest.skipUnless(os.utime in os.supports_fd,
695 "fd support for utime required for this test.")
696 def test_utime_fd(self):
697 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100698 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200699 # use a file descriptor to test futimens(timespec)
700 # or futimes(timeval)
701 os.utime(fp.fileno(), ns=ns)
702 self._test_utime(set_time)
703
704 @unittest.skipUnless(os.utime in os.supports_dir_fd,
705 "dir_fd support for utime required for this test.")
706 def test_utime_dir_fd(self):
707 def set_time(filename, ns):
708 dirname, name = os.path.split(filename)
709 dirfd = os.open(dirname, os.O_RDONLY)
710 try:
711 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
712 os.utime(name, dir_fd=dirfd, ns=ns)
713 finally:
714 os.close(dirfd)
715 self._test_utime(set_time)
716
717 def test_utime_directory(self):
718 def set_time(filename, ns):
719 # test calling os.utime() on a directory
720 os.utime(filename, ns=ns)
721 self._test_utime(set_time, filename=self.dirname)
722
723 def _test_utime_current(self, set_time):
724 # Get the system clock
725 current = time.time()
726
727 # Call os.utime() to set the timestamp to the current system clock
728 set_time(self.fname)
729
730 if not self.support_subsecond(self.fname):
731 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700732 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200733 # On Windows, the usual resolution of time.time() is 15.6 ms.
734 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700735 #
736 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
737 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200738 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200739 st = os.stat(self.fname)
740 msg = ("st_time=%r, current=%r, dt=%r"
741 % (st.st_mtime, current, st.st_mtime - current))
742 self.assertAlmostEqual(st.st_mtime, current,
743 delta=delta, msg=msg)
744
745 def test_utime_current(self):
746 def set_time(filename):
747 # Set to the current time in the new way
748 os.utime(self.fname)
749 self._test_utime_current(set_time)
750
751 def test_utime_current_old(self):
752 def set_time(filename):
753 # Set to the current time in the old explicit way.
754 os.utime(self.fname, None)
755 self._test_utime_current(set_time)
756
757 def get_file_system(self, path):
758 if sys.platform == 'win32':
759 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
760 import ctypes
761 kernel32 = ctypes.windll.kernel32
762 buf = ctypes.create_unicode_buffer("", 100)
763 ok = kernel32.GetVolumeInformationW(root, None, 0,
764 None, None, None,
765 buf, len(buf))
766 if ok:
767 return buf.value
768 # return None if the filesystem is unknown
769
770 def test_large_time(self):
771 # Many filesystems are limited to the year 2038. At least, the test
772 # pass with NTFS filesystem.
773 if self.get_file_system(self.dirname) != "NTFS":
774 self.skipTest("requires NTFS")
775
776 large = 5000000000 # some day in 2128
777 os.utime(self.fname, (large, large))
778 self.assertEqual(os.stat(self.fname).st_mtime, large)
779
780 def test_utime_invalid_arguments(self):
781 # seconds and nanoseconds parameters are mutually exclusive
782 with self.assertRaises(ValueError):
783 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200784 with self.assertRaises(TypeError):
785 os.utime(self.fname, [5, 5])
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, (5,))
788 with self.assertRaises(TypeError):
789 os.utime(self.fname, (5, 5, 5))
790 with self.assertRaises(TypeError):
791 os.utime(self.fname, ns=[5, 5])
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, ns=(5,))
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, ns=(5, 5, 5))
796
797 if os.utime not in os.supports_follow_symlinks:
798 with self.assertRaises(NotImplementedError):
799 os.utime(self.fname, (5, 5), follow_symlinks=False)
800 if os.utime not in os.supports_fd:
801 with open(self.fname, 'wb', 0) as fp:
802 with self.assertRaises(TypeError):
803 os.utime(fp.fileno(), (5, 5))
804 if os.utime not in os.supports_dir_fd:
805 with self.assertRaises(NotImplementedError):
806 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200807
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300808 @support.cpython_only
809 def test_issue31577(self):
810 # The interpreter shouldn't crash in case utime() received a bad
811 # ns argument.
812 def get_bad_int(divmod_ret_val):
813 class BadInt:
814 def __divmod__(*args):
815 return divmod_ret_val
816 return BadInt()
817 with self.assertRaises(TypeError):
818 os.utime(self.fname, ns=(get_bad_int(42), 1))
819 with self.assertRaises(TypeError):
820 os.utime(self.fname, ns=(get_bad_int(()), 1))
821 with self.assertRaises(TypeError):
822 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
823
Victor Stinner47aacc82015-06-12 17:26:23 +0200824
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000825from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000826
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000827class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000828 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000829 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000830
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 def setUp(self):
832 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000835 for key, value in self._reference().items():
836 os.environ[key] = value
837
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 def tearDown(self):
839 os.environ.clear()
840 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000841 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000842 os.environb.clear()
843 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000844
Christian Heimes90333392007-11-01 19:08:42 +0000845 def _reference(self):
846 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
847
848 def _empty_mapping(self):
849 os.environ.clear()
850 return os.environ
851
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000852 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000855 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000856 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300857 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200858 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 value = popen.read().strip()
860 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000861
Xavier de Gayed1415312016-07-22 12:15:29 +0200862 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
863 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000864 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
866 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300867 it = iter(popen)
868 self.assertEqual(next(it), "line1\n")
869 self.assertEqual(next(it), "line2\n")
870 self.assertEqual(next(it), "line3\n")
871 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000872
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000873 # Verify environ keys and values from the OS are of the
874 # correct str type.
875 def test_keyvalue_types(self):
876 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000877 self.assertEqual(type(key), str)
878 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000879
Christian Heimes90333392007-11-01 19:08:42 +0000880 def test_items(self):
881 for key, value in self._reference().items():
882 self.assertEqual(os.environ.get(key), value)
883
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000884 # Issue 7310
885 def test___repr__(self):
886 """Check that the repr() of os.environ looks like environ({...})."""
887 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000888 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
889 '{!r}: {!r}'.format(key, value)
890 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000892 def test_get_exec_path(self):
893 defpath_list = os.defpath.split(os.pathsep)
894 test_path = ['/monty', '/python', '', '/flying/circus']
895 test_env = {'PATH': os.pathsep.join(test_path)}
896
897 saved_environ = os.environ
898 try:
899 os.environ = dict(test_env)
900 # Test that defaulting to os.environ works.
901 self.assertSequenceEqual(test_path, os.get_exec_path())
902 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
903 finally:
904 os.environ = saved_environ
905
906 # No PATH environment variable
907 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
908 # Empty PATH environment variable
909 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
910 # Supplied PATH environment variable
911 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
912
Victor Stinnerb745a742010-05-18 17:17:23 +0000913 if os.supports_bytes_environ:
914 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000915 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000916 # ignore BytesWarning warning
917 with warnings.catch_warnings(record=True):
918 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000919 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000920 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000921 pass
922 else:
923 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000924
925 # bytes key and/or value
926 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
927 ['abc'])
928 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
929 ['abc'])
930 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
931 ['abc'])
932
933 @unittest.skipUnless(os.supports_bytes_environ,
934 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000935 def test_environb(self):
936 # os.environ -> os.environb
937 value = 'euro\u20ac'
938 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000939 value_bytes = value.encode(sys.getfilesystemencoding(),
940 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000941 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000942 msg = "U+20AC character is not encodable to %s" % (
943 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000944 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['unicode'], value)
947 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000948
949 # os.environb -> os.environ
950 value = b'\xff'
951 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000953 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000955
Victor Stinner161e7b32020-01-24 11:53:44 +0100956 def test_putenv_unsetenv(self):
957 name = "PYTHONTESTVAR"
958 value = "testvalue"
959 code = f'import os; print(repr(os.environ.get({name!r})))'
960
961 with support.EnvironmentVarGuard() as env:
962 env.pop(name, None)
963
964 os.putenv(name, value)
965 proc = subprocess.run([sys.executable, '-c', code], check=True,
966 stdout=subprocess.PIPE, text=True)
967 self.assertEqual(proc.stdout.rstrip(), repr(value))
968
969 os.unsetenv(name)
970 proc = subprocess.run([sys.executable, '-c', code], check=True,
971 stdout=subprocess.PIPE, text=True)
972 self.assertEqual(proc.stdout.rstrip(), repr(None))
973
Victor Stinner13ff2452018-01-22 18:32:50 +0100974 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100975 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100976 def test_putenv_unsetenv_error(self):
977 # Empty variable name is invalid.
978 # "=" and null character are not allowed in a variable name.
979 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
980 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
981 self.assertRaises((OSError, ValueError), os.unsetenv, name)
982
Victor Stinnerb73dd022020-01-22 21:11:17 +0100983 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100984 # On Windows, an environment variable string ("name=value" string)
985 # is limited to 32,767 characters
986 longstr = 'x' * 32_768
987 self.assertRaises(ValueError, os.putenv, longstr, "1")
988 self.assertRaises(ValueError, os.putenv, "X", longstr)
989 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +0100990
Victor Stinner6d101392013-04-14 16:35:04 +0200991 def test_key_type(self):
992 missing = 'missingkey'
993 self.assertNotIn(missing, os.environ)
994
Victor Stinner839e5ea2013-04-14 16:43:03 +0200995 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200996 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200997 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200998 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200999
Victor Stinner839e5ea2013-04-14 16:43:03 +02001000 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001001 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001002 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001003 self.assertTrue(cm.exception.__suppress_context__)
1004
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001005 def _test_environ_iteration(self, collection):
1006 iterator = iter(collection)
1007 new_key = "__new_key__"
1008
1009 next(iterator) # start iteration over os.environ.items
1010
1011 # add a new key in os.environ mapping
1012 os.environ[new_key] = "test_environ_iteration"
1013
1014 try:
1015 next(iterator) # force iteration over modified mapping
1016 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1017 finally:
1018 del os.environ[new_key]
1019
1020 def test_iter_error_when_changing_os_environ(self):
1021 self._test_environ_iteration(os.environ)
1022
1023 def test_iter_error_when_changing_os_environ_items(self):
1024 self._test_environ_iteration(os.environ.items())
1025
1026 def test_iter_error_when_changing_os_environ_values(self):
1027 self._test_environ_iteration(os.environ.values())
1028
Victor Stinner6d101392013-04-14 16:35:04 +02001029
Tim Petersc4e09402003-04-25 07:11:48 +00001030class WalkTests(unittest.TestCase):
1031 """Tests for os.walk()."""
1032
Victor Stinner0561c532015-03-12 10:28:24 +01001033 # Wrapper to hide minor differences between os.walk and os.fwalk
1034 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001035 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001036 if 'follow_symlinks' in kwargs:
1037 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001038 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001039
Charles-François Natali7372b062012-02-05 15:15:38 +01001040 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001041 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001042 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001043
1044 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001045 # TESTFN/
1046 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001047 # tmp1
1048 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001049 # tmp2
1050 # SUB11/ no kids
1051 # SUB2/ a file kid and a dirsymlink kid
1052 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001053 # SUB21/ not readable
1054 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001055 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001056 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001057 # broken_link2
1058 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001059 # TEST2/
1060 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001061 self.walk_path = join(support.TESTFN, "TEST1")
1062 self.sub1_path = join(self.walk_path, "SUB1")
1063 self.sub11_path = join(self.sub1_path, "SUB11")
1064 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001065 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001066 tmp1_path = join(self.walk_path, "tmp1")
1067 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001068 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001069 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001070 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001071 t2_path = join(support.TESTFN, "TEST2")
1072 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001073 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001074 broken_link2_path = join(sub2_path, "broken_link2")
1075 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001076
1077 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001078 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001079 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001080 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001081 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001082
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001083 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001084 with open(path, "x") as f:
1085 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001086
Victor Stinner0561c532015-03-12 10:28:24 +01001087 if support.can_symlink():
1088 os.symlink(os.path.abspath(t2_path), self.link_path)
1089 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001090 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1091 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001092 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001093 ["broken_link", "broken_link2", "broken_link3",
1094 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001095 else:
pxinwr3e028b22019-02-15 13:04:47 +08001096 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001097
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001098 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001099 try:
1100 os.listdir(sub21_path)
1101 except PermissionError:
1102 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1103 else:
1104 os.chmod(sub21_path, stat.S_IRWXU)
1105 os.unlink(tmp5_path)
1106 os.rmdir(sub21_path)
1107 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001108
Victor Stinner0561c532015-03-12 10:28:24 +01001109 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001110 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001111 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001112
Tim Petersc4e09402003-04-25 07:11:48 +00001113 self.assertEqual(len(all), 4)
1114 # We can't know which order SUB1 and SUB2 will appear in.
1115 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1116 # flipped: TESTFN, SUB2, SUB1, SUB11
1117 flipped = all[0][1][0] != "SUB1"
1118 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001119 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001120 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001121 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1122 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1123 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1124 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001125
Brett Cannon3f9183b2016-08-26 14:44:48 -07001126 def test_walk_prune(self, walk_path=None):
1127 if walk_path is None:
1128 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001129 # Prune the search.
1130 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001131 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001132 all.append((root, dirs, files))
1133 # Don't descend into SUB1.
1134 if 'SUB1' in dirs:
1135 # Note that this also mutates the dirs we appended to all!
1136 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001137
Victor Stinner0561c532015-03-12 10:28:24 +01001138 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001139 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001140
1141 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001142 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001143 self.assertEqual(all[1], self.sub2_tree)
1144
Brett Cannon3f9183b2016-08-26 14:44:48 -07001145 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001146 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001147
Victor Stinner0561c532015-03-12 10:28:24 +01001148 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001149 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001150 all = list(self.walk(self.walk_path, topdown=False))
1151
Victor Stinner53b0a412016-03-26 01:12:36 +01001152 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001153 # We can't know which order SUB1 and SUB2 will appear in.
1154 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1155 # flipped: SUB2, SUB11, SUB1, TESTFN
1156 flipped = all[3][1][0] != "SUB1"
1157 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001158 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001159 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001160 self.assertEqual(all[3],
1161 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1162 self.assertEqual(all[flipped],
1163 (self.sub11_path, [], []))
1164 self.assertEqual(all[flipped + 1],
1165 (self.sub1_path, ["SUB11"], ["tmp2"]))
1166 self.assertEqual(all[2 - 2 * flipped],
1167 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001168
Victor Stinner0561c532015-03-12 10:28:24 +01001169 def test_walk_symlink(self):
1170 if not support.can_symlink():
1171 self.skipTest("need symlink support")
1172
1173 # Walk, following symlinks.
1174 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1175 for root, dirs, files in walk_it:
1176 if root == self.link_path:
1177 self.assertEqual(dirs, [])
1178 self.assertEqual(files, ["tmp4"])
1179 break
1180 else:
1181 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001182
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001183 def test_walk_bad_dir(self):
1184 # Walk top-down.
1185 errors = []
1186 walk_it = self.walk(self.walk_path, onerror=errors.append)
1187 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001188 self.assertEqual(errors, [])
1189 dir1 = 'SUB1'
1190 path1 = os.path.join(root, dir1)
1191 path1new = os.path.join(root, dir1 + '.new')
1192 os.rename(path1, path1new)
1193 try:
1194 roots = [r for r, d, f in walk_it]
1195 self.assertTrue(errors)
1196 self.assertNotIn(path1, roots)
1197 self.assertNotIn(path1new, roots)
1198 for dir2 in dirs:
1199 if dir2 != dir1:
1200 self.assertIn(os.path.join(root, dir2), roots)
1201 finally:
1202 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001203
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001204 def test_walk_many_open_files(self):
1205 depth = 30
1206 base = os.path.join(support.TESTFN, 'deep')
1207 p = os.path.join(base, *(['d']*depth))
1208 os.makedirs(p)
1209
1210 iters = [self.walk(base, topdown=False) for j in range(100)]
1211 for i in range(depth + 1):
1212 expected = (p, ['d'] if i else [], [])
1213 for it in iters:
1214 self.assertEqual(next(it), expected)
1215 p = os.path.dirname(p)
1216
1217 iters = [self.walk(base, topdown=True) for j in range(100)]
1218 p = base
1219 for i in range(depth + 1):
1220 expected = (p, ['d'] if i < depth else [], [])
1221 for it in iters:
1222 self.assertEqual(next(it), expected)
1223 p = os.path.join(p, 'd')
1224
Charles-François Natali7372b062012-02-05 15:15:38 +01001225
1226@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1227class FwalkTests(WalkTests):
1228 """Tests for os.fwalk()."""
1229
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001230 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001231 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001232 yield (root, dirs, files)
1233
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001234 def fwalk(self, *args, **kwargs):
1235 return os.fwalk(*args, **kwargs)
1236
Larry Hastingsc48fe982012-06-25 04:49:05 -07001237 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1238 """
1239 compare with walk() results.
1240 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001241 walk_kwargs = walk_kwargs.copy()
1242 fwalk_kwargs = fwalk_kwargs.copy()
1243 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1244 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1245 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001246
Charles-François Natali7372b062012-02-05 15:15:38 +01001247 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001248 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001249 expected[root] = (set(dirs), set(files))
1250
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001251 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001252 self.assertIn(root, expected)
1253 self.assertEqual(expected[root], (set(dirs), set(files)))
1254
Larry Hastingsc48fe982012-06-25 04:49:05 -07001255 def test_compare_to_walk(self):
1256 kwargs = {'top': support.TESTFN}
1257 self._compare_to_walk(kwargs, kwargs)
1258
Charles-François Natali7372b062012-02-05 15:15:38 +01001259 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001260 try:
1261 fd = os.open(".", os.O_RDONLY)
1262 walk_kwargs = {'top': support.TESTFN}
1263 fwalk_kwargs = walk_kwargs.copy()
1264 fwalk_kwargs['dir_fd'] = fd
1265 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1266 finally:
1267 os.close(fd)
1268
1269 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001270 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001271 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1272 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001273 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001274 # check that the FD is valid
1275 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001276 # redundant check
1277 os.stat(rootfd)
1278 # check that listdir() returns consistent information
1279 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001280
1281 def test_fd_leak(self):
1282 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1283 # we both check that calling fwalk() a large number of times doesn't
1284 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1285 minfd = os.dup(1)
1286 os.close(minfd)
1287 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001288 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001289 pass
1290 newfd = os.dup(1)
1291 self.addCleanup(os.close, newfd)
1292 self.assertEqual(newfd, minfd)
1293
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001294 # fwalk() keeps file descriptors open
1295 test_walk_many_open_files = None
1296
1297
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001298class BytesWalkTests(WalkTests):
1299 """Tests for os.walk() with bytes."""
1300 def walk(self, top, **kwargs):
1301 if 'follow_symlinks' in kwargs:
1302 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1303 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1304 root = os.fsdecode(broot)
1305 dirs = list(map(os.fsdecode, bdirs))
1306 files = list(map(os.fsdecode, bfiles))
1307 yield (root, dirs, files)
1308 bdirs[:] = list(map(os.fsencode, dirs))
1309 bfiles[:] = list(map(os.fsencode, files))
1310
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001311@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1312class BytesFwalkTests(FwalkTests):
1313 """Tests for os.walk() with bytes."""
1314 def fwalk(self, top='.', *args, **kwargs):
1315 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1316 root = os.fsdecode(broot)
1317 dirs = list(map(os.fsdecode, bdirs))
1318 files = list(map(os.fsdecode, bfiles))
1319 yield (root, dirs, files, topfd)
1320 bdirs[:] = list(map(os.fsencode, dirs))
1321 bfiles[:] = list(map(os.fsencode, files))
1322
Charles-François Natali7372b062012-02-05 15:15:38 +01001323
Guido van Rossume7ba4952007-06-06 23:52:48 +00001324class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001325 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001326 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001327
1328 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001329 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001330 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1331 os.makedirs(path) # Should work
1332 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1333 os.makedirs(path)
1334
1335 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001336 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001337 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1338 os.makedirs(path)
1339 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1340 'dir5', 'dir6')
1341 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001342
Serhiy Storchakae304e332017-03-24 13:27:42 +02001343 def test_mode(self):
1344 with support.temp_umask(0o002):
1345 base = support.TESTFN
1346 parent = os.path.join(base, 'dir1')
1347 path = os.path.join(parent, 'dir2')
1348 os.makedirs(path, 0o555)
1349 self.assertTrue(os.path.exists(path))
1350 self.assertTrue(os.path.isdir(path))
1351 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001352 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1353 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001354
Terry Reedy5a22b652010-12-02 07:05:56 +00001355 def test_exist_ok_existing_directory(self):
1356 path = os.path.join(support.TESTFN, 'dir1')
1357 mode = 0o777
1358 old_mask = os.umask(0o022)
1359 os.makedirs(path, mode)
1360 self.assertRaises(OSError, os.makedirs, path, mode)
1361 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001362 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001363 os.makedirs(path, mode=mode, exist_ok=True)
1364 os.umask(old_mask)
1365
Martin Pantera82642f2015-11-19 04:48:44 +00001366 # Issue #25583: A drive root could raise PermissionError on Windows
1367 os.makedirs(os.path.abspath('/'), exist_ok=True)
1368
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001369 def test_exist_ok_s_isgid_directory(self):
1370 path = os.path.join(support.TESTFN, 'dir1')
1371 S_ISGID = stat.S_ISGID
1372 mode = 0o777
1373 old_mask = os.umask(0o022)
1374 try:
1375 existing_testfn_mode = stat.S_IMODE(
1376 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001377 try:
1378 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001379 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001380 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001381 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1382 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1383 # The os should apply S_ISGID from the parent dir for us, but
1384 # this test need not depend on that behavior. Be explicit.
1385 os.makedirs(path, mode | S_ISGID)
1386 # http://bugs.python.org/issue14992
1387 # Should not fail when the bit is already set.
1388 os.makedirs(path, mode, exist_ok=True)
1389 # remove the bit.
1390 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001391 # May work even when the bit is not already set when demanded.
1392 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001393 finally:
1394 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001395
1396 def test_exist_ok_existing_regular_file(self):
1397 base = support.TESTFN
1398 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001399 with open(path, 'w') as f:
1400 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001401 self.assertRaises(OSError, os.makedirs, path)
1402 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1403 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1404 os.remove(path)
1405
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001406 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001407 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001408 'dir4', 'dir5', 'dir6')
1409 # If the tests failed, the bottom-most directory ('../dir6')
1410 # may not have been created, so we look for the outermost directory
1411 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001412 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001413 path = os.path.dirname(path)
1414
1415 os.removedirs(path)
1416
Andrew Svetlov405faed2012-12-25 12:18:09 +02001417
R David Murrayf2ad1732014-12-25 18:36:56 -05001418@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1419class ChownFileTests(unittest.TestCase):
1420
Berker Peksag036a71b2015-07-21 09:29:48 +03001421 @classmethod
1422 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001423 os.mkdir(support.TESTFN)
1424
1425 def test_chown_uid_gid_arguments_must_be_index(self):
1426 stat = os.stat(support.TESTFN)
1427 uid = stat.st_uid
1428 gid = stat.st_gid
1429 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1430 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1431 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1432 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1433 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1434
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001435 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1436 def test_chown_gid(self):
1437 groups = os.getgroups()
1438 if len(groups) < 2:
1439 self.skipTest("test needs at least 2 groups")
1440
R David Murrayf2ad1732014-12-25 18:36:56 -05001441 gid_1, gid_2 = groups[:2]
1442 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001443
R David Murrayf2ad1732014-12-25 18:36:56 -05001444 os.chown(support.TESTFN, uid, gid_1)
1445 gid = os.stat(support.TESTFN).st_gid
1446 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001447
R David Murrayf2ad1732014-12-25 18:36:56 -05001448 os.chown(support.TESTFN, uid, gid_2)
1449 gid = os.stat(support.TESTFN).st_gid
1450 self.assertEqual(gid, gid_2)
1451
1452 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1453 "test needs root privilege and more than one user")
1454 def test_chown_with_root(self):
1455 uid_1, uid_2 = all_users[:2]
1456 gid = os.stat(support.TESTFN).st_gid
1457 os.chown(support.TESTFN, uid_1, gid)
1458 uid = os.stat(support.TESTFN).st_uid
1459 self.assertEqual(uid, uid_1)
1460 os.chown(support.TESTFN, uid_2, gid)
1461 uid = os.stat(support.TESTFN).st_uid
1462 self.assertEqual(uid, uid_2)
1463
1464 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1465 "test needs non-root account and more than one user")
1466 def test_chown_without_permission(self):
1467 uid_1, uid_2 = all_users[:2]
1468 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001469 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001470 os.chown(support.TESTFN, uid_1, gid)
1471 os.chown(support.TESTFN, uid_2, gid)
1472
Berker Peksag036a71b2015-07-21 09:29:48 +03001473 @classmethod
1474 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001475 os.rmdir(support.TESTFN)
1476
1477
Andrew Svetlov405faed2012-12-25 12:18:09 +02001478class RemoveDirsTests(unittest.TestCase):
1479 def setUp(self):
1480 os.makedirs(support.TESTFN)
1481
1482 def tearDown(self):
1483 support.rmtree(support.TESTFN)
1484
1485 def test_remove_all(self):
1486 dira = os.path.join(support.TESTFN, 'dira')
1487 os.mkdir(dira)
1488 dirb = os.path.join(dira, 'dirb')
1489 os.mkdir(dirb)
1490 os.removedirs(dirb)
1491 self.assertFalse(os.path.exists(dirb))
1492 self.assertFalse(os.path.exists(dira))
1493 self.assertFalse(os.path.exists(support.TESTFN))
1494
1495 def test_remove_partial(self):
1496 dira = os.path.join(support.TESTFN, 'dira')
1497 os.mkdir(dira)
1498 dirb = os.path.join(dira, 'dirb')
1499 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001500 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001501 os.removedirs(dirb)
1502 self.assertFalse(os.path.exists(dirb))
1503 self.assertTrue(os.path.exists(dira))
1504 self.assertTrue(os.path.exists(support.TESTFN))
1505
1506 def test_remove_nothing(self):
1507 dira = os.path.join(support.TESTFN, 'dira')
1508 os.mkdir(dira)
1509 dirb = os.path.join(dira, 'dirb')
1510 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001511 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001512 with self.assertRaises(OSError):
1513 os.removedirs(dirb)
1514 self.assertTrue(os.path.exists(dirb))
1515 self.assertTrue(os.path.exists(dira))
1516 self.assertTrue(os.path.exists(support.TESTFN))
1517
1518
Guido van Rossume7ba4952007-06-06 23:52:48 +00001519class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001520 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001521 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001522 f.write(b'hello')
1523 f.close()
1524 with open(os.devnull, 'rb') as f:
1525 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001526
Andrew Svetlov405faed2012-12-25 12:18:09 +02001527
Guido van Rossume7ba4952007-06-06 23:52:48 +00001528class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001529 def test_urandom_length(self):
1530 self.assertEqual(len(os.urandom(0)), 0)
1531 self.assertEqual(len(os.urandom(1)), 1)
1532 self.assertEqual(len(os.urandom(10)), 10)
1533 self.assertEqual(len(os.urandom(100)), 100)
1534 self.assertEqual(len(os.urandom(1000)), 1000)
1535
1536 def test_urandom_value(self):
1537 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001538 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001539 data2 = os.urandom(16)
1540 self.assertNotEqual(data1, data2)
1541
1542 def get_urandom_subprocess(self, count):
1543 code = '\n'.join((
1544 'import os, sys',
1545 'data = os.urandom(%s)' % count,
1546 'sys.stdout.buffer.write(data)',
1547 'sys.stdout.buffer.flush()'))
1548 out = assert_python_ok('-c', code)
1549 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001550 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001551 return stdout
1552
1553 def test_urandom_subprocess(self):
1554 data1 = self.get_urandom_subprocess(16)
1555 data2 = self.get_urandom_subprocess(16)
1556 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001557
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001558
Victor Stinner9b1f4742016-09-06 16:18:52 -07001559@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1560class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001561 @classmethod
1562 def setUpClass(cls):
1563 try:
1564 os.getrandom(1)
1565 except OSError as exc:
1566 if exc.errno == errno.ENOSYS:
1567 # Python compiled on a more recent Linux version
1568 # than the current Linux kernel
1569 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1570 else:
1571 raise
1572
Victor Stinner9b1f4742016-09-06 16:18:52 -07001573 def test_getrandom_type(self):
1574 data = os.getrandom(16)
1575 self.assertIsInstance(data, bytes)
1576 self.assertEqual(len(data), 16)
1577
1578 def test_getrandom0(self):
1579 empty = os.getrandom(0)
1580 self.assertEqual(empty, b'')
1581
1582 def test_getrandom_random(self):
1583 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1584
1585 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1586 # resource /dev/random
1587
1588 def test_getrandom_nonblock(self):
1589 # The call must not fail. Check also that the flag exists
1590 try:
1591 os.getrandom(1, os.GRND_NONBLOCK)
1592 except BlockingIOError:
1593 # System urandom is not initialized yet
1594 pass
1595
1596 def test_getrandom_value(self):
1597 data1 = os.getrandom(16)
1598 data2 = os.getrandom(16)
1599 self.assertNotEqual(data1, data2)
1600
1601
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001602# os.urandom() doesn't use a file descriptor when it is implemented with the
1603# getentropy() function, the getrandom() function or the getrandom() syscall
1604OS_URANDOM_DONT_USE_FD = (
1605 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1606 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1607 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001608
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001609@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1610 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001611@unittest.skipIf(sys.platform == "vxworks",
1612 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001613class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001614 @unittest.skipUnless(resource, "test requires the resource module")
1615 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001616 # Check urandom() failing when it is not able to open /dev/random.
1617 # We spawn a new process to make the test more robust (if getrlimit()
1618 # failed to restore the file descriptor limit after this, the whole
1619 # test suite would crash; this actually happened on the OS X Tiger
1620 # buildbot).
1621 code = """if 1:
1622 import errno
1623 import os
1624 import resource
1625
1626 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1627 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1628 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001629 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001630 except OSError as e:
1631 assert e.errno == errno.EMFILE, e.errno
1632 else:
1633 raise AssertionError("OSError not raised")
1634 """
1635 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001636
Antoine Pitroue472aea2014-04-26 14:33:03 +02001637 def test_urandom_fd_closed(self):
1638 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1639 # closed.
1640 code = """if 1:
1641 import os
1642 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001643 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001644 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001645 with test.support.SuppressCrashReport():
1646 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001647 sys.stdout.buffer.write(os.urandom(4))
1648 """
1649 rc, out, err = assert_python_ok('-Sc', code)
1650
1651 def test_urandom_fd_reopened(self):
1652 # Issue #21207: urandom() should detect its fd to /dev/urandom
1653 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001654 self.addCleanup(support.unlink, support.TESTFN)
1655 create_file(support.TESTFN, b"x" * 256)
1656
Antoine Pitroue472aea2014-04-26 14:33:03 +02001657 code = """if 1:
1658 import os
1659 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001660 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001661 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001662 with test.support.SuppressCrashReport():
1663 for fd in range(3, 256):
1664 try:
1665 os.close(fd)
1666 except OSError:
1667 pass
1668 else:
1669 # Found the urandom fd (XXX hopefully)
1670 break
1671 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001672 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001673 new_fd = f.fileno()
1674 # Issue #26935: posix allows new_fd and fd to be equal but
1675 # some libc implementations have dup2 return an error in this
1676 # case.
1677 if new_fd != fd:
1678 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001679 sys.stdout.buffer.write(os.urandom(4))
1680 sys.stdout.buffer.write(os.urandom(4))
1681 """.format(TESTFN=support.TESTFN)
1682 rc, out, err = assert_python_ok('-Sc', code)
1683 self.assertEqual(len(out), 8)
1684 self.assertNotEqual(out[0:4], out[4:8])
1685 rc, out2, err2 = assert_python_ok('-Sc', code)
1686 self.assertEqual(len(out2), 8)
1687 self.assertNotEqual(out2, out)
1688
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001689
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001690@contextlib.contextmanager
1691def _execvpe_mockup(defpath=None):
1692 """
1693 Stubs out execv and execve functions when used as context manager.
1694 Records exec calls. The mock execv and execve functions always raise an
1695 exception as they would normally never return.
1696 """
1697 # A list of tuples containing (function name, first arg, args)
1698 # of calls to execv or execve that have been made.
1699 calls = []
1700
1701 def mock_execv(name, *args):
1702 calls.append(('execv', name, args))
1703 raise RuntimeError("execv called")
1704
1705 def mock_execve(name, *args):
1706 calls.append(('execve', name, args))
1707 raise OSError(errno.ENOTDIR, "execve called")
1708
1709 try:
1710 orig_execv = os.execv
1711 orig_execve = os.execve
1712 orig_defpath = os.defpath
1713 os.execv = mock_execv
1714 os.execve = mock_execve
1715 if defpath is not None:
1716 os.defpath = defpath
1717 yield calls
1718 finally:
1719 os.execv = orig_execv
1720 os.execve = orig_execve
1721 os.defpath = orig_defpath
1722
pxinwrf2d7ac72019-05-21 18:46:37 +08001723@unittest.skipUnless(hasattr(os, 'execv'),
1724 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001725class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001726 @unittest.skipIf(USING_LINUXTHREADS,
1727 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001728 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001729 self.assertRaises(OSError, os.execvpe, 'no such app-',
1730 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001731
Steve Dowerbce26262016-11-19 19:17:26 -08001732 def test_execv_with_bad_arglist(self):
1733 self.assertRaises(ValueError, os.execv, 'notepad', ())
1734 self.assertRaises(ValueError, os.execv, 'notepad', [])
1735 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1736 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1737
Thomas Heller6790d602007-08-30 17:15:14 +00001738 def test_execvpe_with_bad_arglist(self):
1739 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001740 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1741 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001742
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001743 @unittest.skipUnless(hasattr(os, '_execvpe'),
1744 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001745 def _test_internal_execvpe(self, test_type):
1746 program_path = os.sep + 'absolutepath'
1747 if test_type is bytes:
1748 program = b'executable'
1749 fullpath = os.path.join(os.fsencode(program_path), program)
1750 native_fullpath = fullpath
1751 arguments = [b'progname', 'arg1', 'arg2']
1752 else:
1753 program = 'executable'
1754 arguments = ['progname', 'arg1', 'arg2']
1755 fullpath = os.path.join(program_path, program)
1756 if os.name != "nt":
1757 native_fullpath = os.fsencode(fullpath)
1758 else:
1759 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001760 env = {'spam': 'beans'}
1761
Victor Stinnerb745a742010-05-18 17:17:23 +00001762 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001763 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001764 self.assertRaises(RuntimeError,
1765 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001766 self.assertEqual(len(calls), 1)
1767 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1768
Victor Stinnerb745a742010-05-18 17:17:23 +00001769 # test os._execvpe() with a relative path:
1770 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001771 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001772 self.assertRaises(OSError,
1773 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001774 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001775 self.assertSequenceEqual(calls[0],
1776 ('execve', native_fullpath, (arguments, env)))
1777
1778 # test os._execvpe() with a relative path:
1779 # os.get_exec_path() reads the 'PATH' variable
1780 with _execvpe_mockup() as calls:
1781 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001782 if test_type is bytes:
1783 env_path[b'PATH'] = program_path
1784 else:
1785 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001786 self.assertRaises(OSError,
1787 os._execvpe, program, arguments, env=env_path)
1788 self.assertEqual(len(calls), 1)
1789 self.assertSequenceEqual(calls[0],
1790 ('execve', native_fullpath, (arguments, env_path)))
1791
1792 def test_internal_execvpe_str(self):
1793 self._test_internal_execvpe(str)
1794 if os.name != "nt":
1795 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001796
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001797 def test_execve_invalid_env(self):
1798 args = [sys.executable, '-c', 'pass']
1799
Ville Skyttä49b27342017-08-03 09:00:59 +03001800 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001801 newenv = os.environ.copy()
1802 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1803 with self.assertRaises(ValueError):
1804 os.execve(args[0], args, newenv)
1805
Ville Skyttä49b27342017-08-03 09:00:59 +03001806 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001807 newenv = os.environ.copy()
1808 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1809 with self.assertRaises(ValueError):
1810 os.execve(args[0], args, newenv)
1811
Ville Skyttä49b27342017-08-03 09:00:59 +03001812 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001813 newenv = os.environ.copy()
1814 newenv["FRUIT=ORANGE"] = "lemon"
1815 with self.assertRaises(ValueError):
1816 os.execve(args[0], args, newenv)
1817
Alexey Izbyshev83460312018-10-20 03:28:22 +03001818 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1819 def test_execve_with_empty_path(self):
1820 # bpo-32890: Check GetLastError() misuse
1821 try:
1822 os.execve('', ['arg'], {})
1823 except OSError as e:
1824 self.assertTrue(e.winerror is None or e.winerror != 0)
1825 else:
1826 self.fail('No OSError raised')
1827
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001828
Serhiy Storchaka43767632013-11-03 21:31:38 +02001829@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001830class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001831 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001832 try:
1833 os.stat(support.TESTFN)
1834 except FileNotFoundError:
1835 exists = False
1836 except OSError as exc:
1837 exists = True
1838 self.fail("file %s must not exist; os.stat failed with %s"
1839 % (support.TESTFN, exc))
1840 else:
1841 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001842
Thomas Wouters477c8d52006-05-27 19:21:47 +00001843 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001844 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001845
1846 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001847 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001848
1849 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001850 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001851
1852 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001853 self.addCleanup(support.unlink, support.TESTFN)
1854
Victor Stinnere77c9742016-03-25 10:28:23 +01001855 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001856 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001857
1858 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001859 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001860
Thomas Wouters477c8d52006-05-27 19:21:47 +00001861 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001862 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001863
Victor Stinnere77c9742016-03-25 10:28:23 +01001864
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001865class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001866 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001867 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1868 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001869 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001870 def get_single(f):
1871 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001872 if hasattr(os, f):
1873 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001874 return helper
1875 for f in singles:
1876 locals()["test_"+f] = get_single(f)
1877
Benjamin Peterson7522c742009-01-19 21:00:09 +00001878 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001879 try:
1880 f(support.make_bad_fd(), *args)
1881 except OSError as e:
1882 self.assertEqual(e.errno, errno.EBADF)
1883 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001884 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001885 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001886
Serhiy Storchaka43767632013-11-03 21:31:38 +02001887 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001888 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001889 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001890
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001892 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001893 fd = support.make_bad_fd()
1894 # Make sure none of the descriptors we are about to close are
1895 # currently valid (issue 6542).
1896 for i in range(10):
1897 try: os.fstat(fd+i)
1898 except OSError:
1899 pass
1900 else:
1901 break
1902 if i < 2:
1903 raise unittest.SkipTest(
1904 "Unable to acquire a range of invalid file descriptors")
1905 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001906
Serhiy Storchaka43767632013-11-03 21:31:38 +02001907 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001908 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001909 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001910
Serhiy Storchaka43767632013-11-03 21:31:38 +02001911 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001912 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001913 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001914
Serhiy Storchaka43767632013-11-03 21:31:38 +02001915 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001916 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001917 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001918
Serhiy Storchaka43767632013-11-03 21:31:38 +02001919 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001920 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001921 self.check(os.pathconf, "PC_NAME_MAX")
1922 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001923
Serhiy Storchaka43767632013-11-03 21:31:38 +02001924 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001925 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001926 self.check(os.truncate, 0)
1927 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001928
Serhiy Storchaka43767632013-11-03 21:31:38 +02001929 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001930 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001931 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001932
Serhiy Storchaka43767632013-11-03 21:31:38 +02001933 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001934 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001935 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001936
Victor Stinner57ddf782014-01-08 15:21:28 +01001937 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1938 def test_readv(self):
1939 buf = bytearray(10)
1940 self.check(os.readv, [buf])
1941
Serhiy Storchaka43767632013-11-03 21:31:38 +02001942 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001943 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001944 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001945
Serhiy Storchaka43767632013-11-03 21:31:38 +02001946 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001947 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001948 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001949
Victor Stinner57ddf782014-01-08 15:21:28 +01001950 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1951 def test_writev(self):
1952 self.check(os.writev, [b'abc'])
1953
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001954 def test_inheritable(self):
1955 self.check(os.get_inheritable)
1956 self.check(os.set_inheritable, True)
1957
1958 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1959 'needs os.get_blocking() and os.set_blocking()')
1960 def test_blocking(self):
1961 self.check(os.get_blocking)
1962 self.check(os.set_blocking, True)
1963
Brian Curtin1b9df392010-11-24 20:24:31 +00001964
1965class LinkTests(unittest.TestCase):
1966 def setUp(self):
1967 self.file1 = support.TESTFN
1968 self.file2 = os.path.join(support.TESTFN + "2")
1969
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001970 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001971 for file in (self.file1, self.file2):
1972 if os.path.exists(file):
1973 os.unlink(file)
1974
Brian Curtin1b9df392010-11-24 20:24:31 +00001975 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001976 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001977
xdegaye6a55d092017-11-12 17:57:04 +01001978 try:
1979 os.link(file1, file2)
1980 except PermissionError as e:
1981 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001982 with open(file1, "r") as f1, open(file2, "r") as f2:
1983 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1984
1985 def test_link(self):
1986 self._test_link(self.file1, self.file2)
1987
1988 def test_link_bytes(self):
1989 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1990 bytes(self.file2, sys.getfilesystemencoding()))
1991
Brian Curtinf498b752010-11-30 15:54:04 +00001992 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001993 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001994 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001995 except UnicodeError:
1996 raise unittest.SkipTest("Unable to encode for this platform.")
1997
Brian Curtinf498b752010-11-30 15:54:04 +00001998 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001999 self.file2 = self.file1 + "2"
2000 self._test_link(self.file1, self.file2)
2001
Serhiy Storchaka43767632013-11-03 21:31:38 +02002002@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2003class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002004 # uid_t and gid_t are 32-bit unsigned integers on Linux
2005 UID_OVERFLOW = (1 << 32)
2006 GID_OVERFLOW = (1 << 32)
2007
Serhiy Storchaka43767632013-11-03 21:31:38 +02002008 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2009 def test_setuid(self):
2010 if os.getuid() != 0:
2011 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002012 self.assertRaises(TypeError, os.setuid, 'not an int')
2013 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002014
Serhiy Storchaka43767632013-11-03 21:31:38 +02002015 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2016 def test_setgid(self):
2017 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2018 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002019 self.assertRaises(TypeError, os.setgid, 'not an int')
2020 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002021
Serhiy Storchaka43767632013-11-03 21:31:38 +02002022 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2023 def test_seteuid(self):
2024 if os.getuid() != 0:
2025 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002026 self.assertRaises(TypeError, os.setegid, 'not an int')
2027 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002028
Serhiy Storchaka43767632013-11-03 21:31:38 +02002029 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2030 def test_setegid(self):
2031 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2032 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002033 self.assertRaises(TypeError, os.setegid, 'not an int')
2034 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002035
Serhiy Storchaka43767632013-11-03 21:31:38 +02002036 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2037 def test_setreuid(self):
2038 if os.getuid() != 0:
2039 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002040 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2041 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2042 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2043 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002044
Serhiy Storchaka43767632013-11-03 21:31:38 +02002045 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2046 def test_setreuid_neg1(self):
2047 # Needs to accept -1. We run this in a subprocess to avoid
2048 # altering the test runner's process state (issue8045).
2049 subprocess.check_call([
2050 sys.executable, '-c',
2051 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002052
Serhiy Storchaka43767632013-11-03 21:31:38 +02002053 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2054 def test_setregid(self):
2055 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2056 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002057 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2058 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2059 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2060 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002061
Serhiy Storchaka43767632013-11-03 21:31:38 +02002062 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2063 def test_setregid_neg1(self):
2064 # Needs to accept -1. We run this in a subprocess to avoid
2065 # altering the test runner's process state (issue8045).
2066 subprocess.check_call([
2067 sys.executable, '-c',
2068 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002069
Serhiy Storchaka43767632013-11-03 21:31:38 +02002070@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2071class Pep383Tests(unittest.TestCase):
2072 def setUp(self):
2073 if support.TESTFN_UNENCODABLE:
2074 self.dir = support.TESTFN_UNENCODABLE
2075 elif support.TESTFN_NONASCII:
2076 self.dir = support.TESTFN_NONASCII
2077 else:
2078 self.dir = support.TESTFN
2079 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002080
Serhiy Storchaka43767632013-11-03 21:31:38 +02002081 bytesfn = []
2082 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002083 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002084 fn = os.fsencode(fn)
2085 except UnicodeEncodeError:
2086 return
2087 bytesfn.append(fn)
2088 add_filename(support.TESTFN_UNICODE)
2089 if support.TESTFN_UNENCODABLE:
2090 add_filename(support.TESTFN_UNENCODABLE)
2091 if support.TESTFN_NONASCII:
2092 add_filename(support.TESTFN_NONASCII)
2093 if not bytesfn:
2094 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002095
Serhiy Storchaka43767632013-11-03 21:31:38 +02002096 self.unicodefn = set()
2097 os.mkdir(self.dir)
2098 try:
2099 for fn in bytesfn:
2100 support.create_empty_file(os.path.join(self.bdir, fn))
2101 fn = os.fsdecode(fn)
2102 if fn in self.unicodefn:
2103 raise ValueError("duplicate filename")
2104 self.unicodefn.add(fn)
2105 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002106 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002107 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002108
Serhiy Storchaka43767632013-11-03 21:31:38 +02002109 def tearDown(self):
2110 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002111
Serhiy Storchaka43767632013-11-03 21:31:38 +02002112 def test_listdir(self):
2113 expected = self.unicodefn
2114 found = set(os.listdir(self.dir))
2115 self.assertEqual(found, expected)
2116 # test listdir without arguments
2117 current_directory = os.getcwd()
2118 try:
2119 os.chdir(os.sep)
2120 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2121 finally:
2122 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002123
Serhiy Storchaka43767632013-11-03 21:31:38 +02002124 def test_open(self):
2125 for fn in self.unicodefn:
2126 f = open(os.path.join(self.dir, fn), 'rb')
2127 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002128
Serhiy Storchaka43767632013-11-03 21:31:38 +02002129 @unittest.skipUnless(hasattr(os, 'statvfs'),
2130 "need os.statvfs()")
2131 def test_statvfs(self):
2132 # issue #9645
2133 for fn in self.unicodefn:
2134 # should not fail with file not found error
2135 fullname = os.path.join(self.dir, fn)
2136 os.statvfs(fullname)
2137
2138 def test_stat(self):
2139 for fn in self.unicodefn:
2140 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002141
Brian Curtineb24d742010-04-12 17:16:38 +00002142@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2143class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002144 def _kill(self, sig):
2145 # Start sys.executable as a subprocess and communicate from the
2146 # subprocess to the parent that the interpreter is ready. When it
2147 # becomes ready, send *sig* via os.kill to the subprocess and check
2148 # that the return code is equal to *sig*.
2149 import ctypes
2150 from ctypes import wintypes
2151 import msvcrt
2152
2153 # Since we can't access the contents of the process' stdout until the
2154 # process has exited, use PeekNamedPipe to see what's inside stdout
2155 # without waiting. This is done so we can tell that the interpreter
2156 # is started and running at a point where it could handle a signal.
2157 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2158 PeekNamedPipe.restype = wintypes.BOOL
2159 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2160 ctypes.POINTER(ctypes.c_char), # stdout buf
2161 wintypes.DWORD, # Buffer size
2162 ctypes.POINTER(wintypes.DWORD), # bytes read
2163 ctypes.POINTER(wintypes.DWORD), # bytes avail
2164 ctypes.POINTER(wintypes.DWORD)) # bytes left
2165 msg = "running"
2166 proc = subprocess.Popen([sys.executable, "-c",
2167 "import sys;"
2168 "sys.stdout.write('{}');"
2169 "sys.stdout.flush();"
2170 "input()".format(msg)],
2171 stdout=subprocess.PIPE,
2172 stderr=subprocess.PIPE,
2173 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002174 self.addCleanup(proc.stdout.close)
2175 self.addCleanup(proc.stderr.close)
2176 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002177
2178 count, max = 0, 100
2179 while count < max and proc.poll() is None:
2180 # Create a string buffer to store the result of stdout from the pipe
2181 buf = ctypes.create_string_buffer(len(msg))
2182 # Obtain the text currently in proc.stdout
2183 # Bytes read/avail/left are left as NULL and unused
2184 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2185 buf, ctypes.sizeof(buf), None, None, None)
2186 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2187 if buf.value:
2188 self.assertEqual(msg, buf.value.decode())
2189 break
2190 time.sleep(0.1)
2191 count += 1
2192 else:
2193 self.fail("Did not receive communication from the subprocess")
2194
Brian Curtineb24d742010-04-12 17:16:38 +00002195 os.kill(proc.pid, sig)
2196 self.assertEqual(proc.wait(), sig)
2197
2198 def test_kill_sigterm(self):
2199 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002200 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002201
2202 def test_kill_int(self):
2203 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002204 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002205
2206 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002207 tagname = "test_os_%s" % uuid.uuid1()
2208 m = mmap.mmap(-1, 1, tagname)
2209 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002210 # Run a script which has console control handling enabled.
2211 proc = subprocess.Popen([sys.executable,
2212 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002213 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002214 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2215 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002216 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002217 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002218 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002219 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002220 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002221 count += 1
2222 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002223 # Forcefully kill the process if we weren't able to signal it.
2224 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002225 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002226 os.kill(proc.pid, event)
2227 # proc.send_signal(event) could also be done here.
2228 # Allow time for the signal to be passed and the process to exit.
2229 time.sleep(0.5)
2230 if not proc.poll():
2231 # Forcefully kill the process if we weren't able to signal it.
2232 os.kill(proc.pid, signal.SIGINT)
2233 self.fail("subprocess did not stop on {}".format(name))
2234
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002235 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002236 def test_CTRL_C_EVENT(self):
2237 from ctypes import wintypes
2238 import ctypes
2239
2240 # Make a NULL value by creating a pointer with no argument.
2241 NULL = ctypes.POINTER(ctypes.c_int)()
2242 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2243 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2244 wintypes.BOOL)
2245 SetConsoleCtrlHandler.restype = wintypes.BOOL
2246
2247 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002248 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002249 # by subprocesses.
2250 SetConsoleCtrlHandler(NULL, 0)
2251
2252 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2253
2254 def test_CTRL_BREAK_EVENT(self):
2255 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2256
2257
Brian Curtind40e6f72010-07-08 21:39:08 +00002258@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002259class Win32ListdirTests(unittest.TestCase):
2260 """Test listdir on Windows."""
2261
2262 def setUp(self):
2263 self.created_paths = []
2264 for i in range(2):
2265 dir_name = 'SUB%d' % i
2266 dir_path = os.path.join(support.TESTFN, dir_name)
2267 file_name = 'FILE%d' % i
2268 file_path = os.path.join(support.TESTFN, file_name)
2269 os.makedirs(dir_path)
2270 with open(file_path, 'w') as f:
2271 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2272 self.created_paths.extend([dir_name, file_name])
2273 self.created_paths.sort()
2274
2275 def tearDown(self):
2276 shutil.rmtree(support.TESTFN)
2277
2278 def test_listdir_no_extended_path(self):
2279 """Test when the path is not an "extended" path."""
2280 # unicode
2281 self.assertEqual(
2282 sorted(os.listdir(support.TESTFN)),
2283 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002284
Tim Golden781bbeb2013-10-25 20:24:06 +01002285 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002286 self.assertEqual(
2287 sorted(os.listdir(os.fsencode(support.TESTFN))),
2288 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002289
2290 def test_listdir_extended_path(self):
2291 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002292 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002293 # unicode
2294 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2295 self.assertEqual(
2296 sorted(os.listdir(path)),
2297 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002298
Tim Golden781bbeb2013-10-25 20:24:06 +01002299 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002300 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2301 self.assertEqual(
2302 sorted(os.listdir(path)),
2303 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002304
2305
Berker Peksage0b5b202018-08-15 13:03:41 +03002306@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2307class ReadlinkTests(unittest.TestCase):
2308 filelink = 'readlinktest'
2309 filelink_target = os.path.abspath(__file__)
2310 filelinkb = os.fsencode(filelink)
2311 filelinkb_target = os.fsencode(filelink_target)
2312
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002313 def assertPathEqual(self, left, right):
2314 left = os.path.normcase(left)
2315 right = os.path.normcase(right)
2316 if sys.platform == 'win32':
2317 # Bad practice to blindly strip the prefix as it may be required to
2318 # correctly refer to the file, but we're only comparing paths here.
2319 has_prefix = lambda p: p.startswith(
2320 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2321 if has_prefix(left):
2322 left = left[4:]
2323 if has_prefix(right):
2324 right = right[4:]
2325 self.assertEqual(left, right)
2326
Berker Peksage0b5b202018-08-15 13:03:41 +03002327 def setUp(self):
2328 self.assertTrue(os.path.exists(self.filelink_target))
2329 self.assertTrue(os.path.exists(self.filelinkb_target))
2330 self.assertFalse(os.path.exists(self.filelink))
2331 self.assertFalse(os.path.exists(self.filelinkb))
2332
2333 def test_not_symlink(self):
2334 filelink_target = FakePath(self.filelink_target)
2335 self.assertRaises(OSError, os.readlink, self.filelink_target)
2336 self.assertRaises(OSError, os.readlink, filelink_target)
2337
2338 def test_missing_link(self):
2339 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2340 self.assertRaises(FileNotFoundError, os.readlink,
2341 FakePath('missing-link'))
2342
2343 @support.skip_unless_symlink
2344 def test_pathlike(self):
2345 os.symlink(self.filelink_target, self.filelink)
2346 self.addCleanup(support.unlink, self.filelink)
2347 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002348 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002349
2350 @support.skip_unless_symlink
2351 def test_pathlike_bytes(self):
2352 os.symlink(self.filelinkb_target, self.filelinkb)
2353 self.addCleanup(support.unlink, self.filelinkb)
2354 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002355 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002356 self.assertIsInstance(path, bytes)
2357
2358 @support.skip_unless_symlink
2359 def test_bytes(self):
2360 os.symlink(self.filelinkb_target, self.filelinkb)
2361 self.addCleanup(support.unlink, self.filelinkb)
2362 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002363 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002364 self.assertIsInstance(path, bytes)
2365
2366
Tim Golden781bbeb2013-10-25 20:24:06 +01002367@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002368@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002369class Win32SymlinkTests(unittest.TestCase):
2370 filelink = 'filelinktest'
2371 filelink_target = os.path.abspath(__file__)
2372 dirlink = 'dirlinktest'
2373 dirlink_target = os.path.dirname(filelink_target)
2374 missing_link = 'missing link'
2375
2376 def setUp(self):
2377 assert os.path.exists(self.dirlink_target)
2378 assert os.path.exists(self.filelink_target)
2379 assert not os.path.exists(self.dirlink)
2380 assert not os.path.exists(self.filelink)
2381 assert not os.path.exists(self.missing_link)
2382
2383 def tearDown(self):
2384 if os.path.exists(self.filelink):
2385 os.remove(self.filelink)
2386 if os.path.exists(self.dirlink):
2387 os.rmdir(self.dirlink)
2388 if os.path.lexists(self.missing_link):
2389 os.remove(self.missing_link)
2390
2391 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002392 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002393 self.assertTrue(os.path.exists(self.dirlink))
2394 self.assertTrue(os.path.isdir(self.dirlink))
2395 self.assertTrue(os.path.islink(self.dirlink))
2396 self.check_stat(self.dirlink, self.dirlink_target)
2397
2398 def test_file_link(self):
2399 os.symlink(self.filelink_target, self.filelink)
2400 self.assertTrue(os.path.exists(self.filelink))
2401 self.assertTrue(os.path.isfile(self.filelink))
2402 self.assertTrue(os.path.islink(self.filelink))
2403 self.check_stat(self.filelink, self.filelink_target)
2404
2405 def _create_missing_dir_link(self):
2406 'Create a "directory" link to a non-existent target'
2407 linkname = self.missing_link
2408 if os.path.lexists(linkname):
2409 os.remove(linkname)
2410 target = r'c:\\target does not exist.29r3c740'
2411 assert not os.path.exists(target)
2412 target_is_dir = True
2413 os.symlink(target, linkname, target_is_dir)
2414
2415 def test_remove_directory_link_to_missing_target(self):
2416 self._create_missing_dir_link()
2417 # For compatibility with Unix, os.remove will check the
2418 # directory status and call RemoveDirectory if the symlink
2419 # was created with target_is_dir==True.
2420 os.remove(self.missing_link)
2421
Brian Curtind40e6f72010-07-08 21:39:08 +00002422 def test_isdir_on_directory_link_to_missing_target(self):
2423 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002424 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002425
Brian Curtind40e6f72010-07-08 21:39:08 +00002426 def test_rmdir_on_directory_link_to_missing_target(self):
2427 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002428 os.rmdir(self.missing_link)
2429
2430 def check_stat(self, link, target):
2431 self.assertEqual(os.stat(link), os.stat(target))
2432 self.assertNotEqual(os.lstat(link), os.stat(link))
2433
Brian Curtind25aef52011-06-13 15:16:04 -05002434 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002435 self.assertEqual(os.stat(bytes_link), os.stat(target))
2436 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002437
2438 def test_12084(self):
2439 level1 = os.path.abspath(support.TESTFN)
2440 level2 = os.path.join(level1, "level2")
2441 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002442 self.addCleanup(support.rmtree, level1)
2443
2444 os.mkdir(level1)
2445 os.mkdir(level2)
2446 os.mkdir(level3)
2447
2448 file1 = os.path.abspath(os.path.join(level1, "file1"))
2449 create_file(file1)
2450
2451 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002452 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002453 os.chdir(level2)
2454 link = os.path.join(level2, "link")
2455 os.symlink(os.path.relpath(file1), "link")
2456 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002457
Victor Stinnerae39d232016-03-24 17:12:55 +01002458 # Check os.stat calls from the same dir as the link
2459 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002460
Victor Stinnerae39d232016-03-24 17:12:55 +01002461 # Check os.stat calls from a dir below the link
2462 os.chdir(level1)
2463 self.assertEqual(os.stat(file1),
2464 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002465
Victor Stinnerae39d232016-03-24 17:12:55 +01002466 # Check os.stat calls from a dir above the link
2467 os.chdir(level3)
2468 self.assertEqual(os.stat(file1),
2469 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002470 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002471 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002472
SSE43c34aad2018-02-13 00:10:35 +07002473 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2474 and os.path.exists(r'C:\ProgramData'),
2475 'Test directories not found')
2476 def test_29248(self):
2477 # os.symlink() calls CreateSymbolicLink, which creates
2478 # the reparse data buffer with the print name stored
2479 # first, so the offset is always 0. CreateSymbolicLink
2480 # stores the "PrintName" DOS path (e.g. "C:\") first,
2481 # with an offset of 0, followed by the "SubstituteName"
2482 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2483 # the other hand, seems to have been created manually
2484 # with an inverted order.
2485 target = os.readlink(r'C:\Users\All Users')
2486 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2487
Steve Dower6921e732018-03-05 14:26:08 -08002488 def test_buffer_overflow(self):
2489 # Older versions would have a buffer overflow when detecting
2490 # whether a link source was a directory. This test ensures we
2491 # no longer crash, but does not otherwise validate the behavior
2492 segment = 'X' * 27
2493 path = os.path.join(*[segment] * 10)
2494 test_cases = [
2495 # overflow with absolute src
2496 ('\\' + path, segment),
2497 # overflow dest with relative src
2498 (segment, path),
2499 # overflow when joining src
2500 (path[:180], path[:180]),
2501 ]
2502 for src, dest in test_cases:
2503 try:
2504 os.symlink(src, dest)
2505 except FileNotFoundError:
2506 pass
2507 else:
2508 try:
2509 os.remove(dest)
2510 except OSError:
2511 pass
2512 # Also test with bytes, since that is a separate code path.
2513 try:
2514 os.symlink(os.fsencode(src), os.fsencode(dest))
2515 except FileNotFoundError:
2516 pass
2517 else:
2518 try:
2519 os.remove(dest)
2520 except OSError:
2521 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002522
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002523 def test_appexeclink(self):
2524 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002525 if not os.path.isdir(root):
2526 self.skipTest("test requires a WindowsApps directory")
2527
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002528 aliases = [os.path.join(root, a)
2529 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2530
2531 for alias in aliases:
2532 if support.verbose:
2533 print()
2534 print("Testing with", alias)
2535 st = os.lstat(alias)
2536 self.assertEqual(st, os.stat(alias))
2537 self.assertFalse(stat.S_ISLNK(st.st_mode))
2538 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2539 # testing the first one we see is sufficient
2540 break
2541 else:
2542 self.skipTest("test requires an app execution alias")
2543
Tim Golden0321cf22014-05-05 19:46:17 +01002544@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2545class Win32JunctionTests(unittest.TestCase):
2546 junction = 'junctiontest'
2547 junction_target = os.path.dirname(os.path.abspath(__file__))
2548
2549 def setUp(self):
2550 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002551 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002552
2553 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002554 if os.path.lexists(self.junction):
2555 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002556
2557 def test_create_junction(self):
2558 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002559 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002560 self.assertTrue(os.path.exists(self.junction))
2561 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002562 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2563 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002564
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002565 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002566 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002567 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2568 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002569
2570 def test_unlink_removes_junction(self):
2571 _winapi.CreateJunction(self.junction_target, self.junction)
2572 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002573 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002574
2575 os.unlink(self.junction)
2576 self.assertFalse(os.path.exists(self.junction))
2577
Mark Becwarb82bfac2019-02-02 16:08:23 -05002578@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2579class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002580 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002581 nt = support.import_module('nt')
2582 ctypes = support.import_module('ctypes')
2583 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002584
2585 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2586 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2587
2588 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2589 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2590 ctypes.wintypes.LPDWORD)
2591
2592 # This is a pseudo-handle that doesn't need to be closed
2593 hproc = kernel.GetCurrentProcess()
2594
2595 handle_count = ctypes.wintypes.DWORD()
2596 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2597 self.assertEqual(1, ok)
2598
2599 before_count = handle_count.value
2600
2601 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002602 filenames = [
2603 r'\\?\C:',
2604 r'\\?\NUL',
2605 r'\\?\CONIN',
2606 __file__,
2607 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002608
Berker Peksag6ef726a2019-04-22 18:46:28 +03002609 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002610 for name in filenames:
2611 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002612 nt._getfinalpathname(name)
2613 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002614 # Failure is expected
2615 pass
2616 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002617 os.stat(name)
2618 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002619 pass
2620
2621 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2622 self.assertEqual(1, ok)
2623
2624 handle_delta = handle_count.value - before_count
2625
2626 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002627
Jason R. Coombs3a092862013-05-27 23:21:28 -04002628@support.skip_unless_symlink
2629class NonLocalSymlinkTests(unittest.TestCase):
2630
2631 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002632 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002633 Create this structure:
2634
2635 base
2636 \___ some_dir
2637 """
2638 os.makedirs('base/some_dir')
2639
2640 def tearDown(self):
2641 shutil.rmtree('base')
2642
2643 def test_directory_link_nonlocal(self):
2644 """
2645 The symlink target should resolve relative to the link, not relative
2646 to the current directory.
2647
2648 Then, link base/some_link -> base/some_dir and ensure that some_link
2649 is resolved as a directory.
2650
2651 In issue13772, it was discovered that directory detection failed if
2652 the symlink target was not specified relative to the current
2653 directory, which was a defect in the implementation.
2654 """
2655 src = os.path.join('base', 'some_link')
2656 os.symlink('some_dir', src)
2657 assert os.path.isdir(src)
2658
2659
Victor Stinnere8d51452010-08-19 01:05:19 +00002660class FSEncodingTests(unittest.TestCase):
2661 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002662 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2663 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002664
Victor Stinnere8d51452010-08-19 01:05:19 +00002665 def test_identity(self):
2666 # assert fsdecode(fsencode(x)) == x
2667 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2668 try:
2669 bytesfn = os.fsencode(fn)
2670 except UnicodeEncodeError:
2671 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002672 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002673
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002674
Brett Cannonefb00c02012-02-29 18:31:31 -05002675
2676class DeviceEncodingTests(unittest.TestCase):
2677
2678 def test_bad_fd(self):
2679 # Return None when an fd doesn't actually exist.
2680 self.assertIsNone(os.device_encoding(123456))
2681
Paul Monson62dfd7d2019-04-25 11:36:45 -07002682 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002683 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002684 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002685 def test_device_encoding(self):
2686 encoding = os.device_encoding(0)
2687 self.assertIsNotNone(encoding)
2688 self.assertTrue(codecs.lookup(encoding))
2689
2690
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002691class PidTests(unittest.TestCase):
2692 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2693 def test_getppid(self):
2694 p = subprocess.Popen([sys.executable, '-c',
2695 'import os; print(os.getppid())'],
2696 stdout=subprocess.PIPE)
2697 stdout, _ = p.communicate()
2698 # We are the parent of our subprocess
2699 self.assertEqual(int(stdout), os.getpid())
2700
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002701 def test_waitpid(self):
2702 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002703 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002704 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002705 status = os.waitpid(pid, 0)
2706 self.assertEqual(status, (pid, 0))
2707
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002708
Victor Stinner4659ccf2016-09-14 10:57:00 +02002709class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002710 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002711 self.exitcode = 17
2712
2713 filename = support.TESTFN
2714 self.addCleanup(support.unlink, filename)
2715
2716 if not with_env:
2717 code = 'import sys; sys.exit(%s)' % self.exitcode
2718 else:
2719 self.env = dict(os.environ)
2720 # create an unique key
2721 self.key = str(uuid.uuid4())
2722 self.env[self.key] = self.key
2723 # read the variable from os.environ to check that it exists
2724 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2725 % (self.key, self.exitcode))
2726
2727 with open(filename, "w") as fp:
2728 fp.write(code)
2729
Berker Peksag81816462016-09-15 20:19:47 +03002730 args = [sys.executable, filename]
2731 if use_bytes:
2732 args = [os.fsencode(a) for a in args]
2733 self.env = {os.fsencode(k): os.fsencode(v)
2734 for k, v in self.env.items()}
2735
2736 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002737
Berker Peksag4af23d72016-09-15 20:32:44 +03002738 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002739 def test_spawnl(self):
2740 args = self.create_args()
2741 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2742 self.assertEqual(exitcode, self.exitcode)
2743
Berker Peksag4af23d72016-09-15 20:32:44 +03002744 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002745 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002746 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002747 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2748 self.assertEqual(exitcode, self.exitcode)
2749
Berker Peksag4af23d72016-09-15 20:32:44 +03002750 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002751 def test_spawnlp(self):
2752 args = self.create_args()
2753 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2754 self.assertEqual(exitcode, self.exitcode)
2755
Berker Peksag4af23d72016-09-15 20:32:44 +03002756 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002757 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002758 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002759 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2760 self.assertEqual(exitcode, self.exitcode)
2761
Berker Peksag4af23d72016-09-15 20:32:44 +03002762 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002763 def test_spawnv(self):
2764 args = self.create_args()
2765 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2766 self.assertEqual(exitcode, self.exitcode)
2767
Berker Peksag4af23d72016-09-15 20:32:44 +03002768 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002769 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002770 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002771 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2772 self.assertEqual(exitcode, self.exitcode)
2773
Berker Peksag4af23d72016-09-15 20:32:44 +03002774 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002775 def test_spawnvp(self):
2776 args = self.create_args()
2777 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2778 self.assertEqual(exitcode, self.exitcode)
2779
Berker Peksag4af23d72016-09-15 20:32:44 +03002780 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002781 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002782 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002783 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2784 self.assertEqual(exitcode, self.exitcode)
2785
Berker Peksag4af23d72016-09-15 20:32:44 +03002786 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002787 def test_nowait(self):
2788 args = self.create_args()
2789 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2790 result = os.waitpid(pid, 0)
2791 self.assertEqual(result[0], pid)
2792 status = result[1]
2793 if hasattr(os, 'WIFEXITED'):
2794 self.assertTrue(os.WIFEXITED(status))
2795 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2796 else:
2797 self.assertEqual(status, self.exitcode << 8)
2798
Berker Peksag4af23d72016-09-15 20:32:44 +03002799 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002800 def test_spawnve_bytes(self):
2801 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2802 args = self.create_args(with_env=True, use_bytes=True)
2803 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2804 self.assertEqual(exitcode, self.exitcode)
2805
Steve Dower859fd7b2016-11-19 18:53:19 -08002806 @requires_os_func('spawnl')
2807 def test_spawnl_noargs(self):
2808 args = self.create_args()
2809 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002810 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002811
2812 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002813 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002814 args = self.create_args()
2815 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002816 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002817
2818 @requires_os_func('spawnv')
2819 def test_spawnv_noargs(self):
2820 args = self.create_args()
2821 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2822 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002823 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2824 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002825
2826 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002827 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002828 args = self.create_args()
2829 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2830 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002831 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2832 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002833
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002834 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002835 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002836
Ville Skyttä49b27342017-08-03 09:00:59 +03002837 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002838 newenv = os.environ.copy()
2839 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2840 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002841 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002842 except ValueError:
2843 pass
2844 else:
2845 self.assertEqual(exitcode, 127)
2846
Ville Skyttä49b27342017-08-03 09:00:59 +03002847 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002848 newenv = os.environ.copy()
2849 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2850 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002851 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002852 except ValueError:
2853 pass
2854 else:
2855 self.assertEqual(exitcode, 127)
2856
Ville Skyttä49b27342017-08-03 09:00:59 +03002857 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002858 newenv = os.environ.copy()
2859 newenv["FRUIT=ORANGE"] = "lemon"
2860 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002861 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002862 except ValueError:
2863 pass
2864 else:
2865 self.assertEqual(exitcode, 127)
2866
Ville Skyttä49b27342017-08-03 09:00:59 +03002867 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002868 filename = support.TESTFN
2869 self.addCleanup(support.unlink, filename)
2870 with open(filename, "w") as fp:
2871 fp.write('import sys, os\n'
2872 'if os.getenv("FRUIT") != "orange=lemon":\n'
2873 ' raise AssertionError')
2874 args = [sys.executable, filename]
2875 newenv = os.environ.copy()
2876 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002877 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002878 self.assertEqual(exitcode, 0)
2879
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002880 @requires_os_func('spawnve')
2881 def test_spawnve_invalid_env(self):
2882 self._test_invalid_env(os.spawnve)
2883
2884 @requires_os_func('spawnvpe')
2885 def test_spawnvpe_invalid_env(self):
2886 self._test_invalid_env(os.spawnvpe)
2887
Serhiy Storchaka77703942017-06-25 07:33:01 +03002888
Brian Curtin0151b8e2010-09-24 13:43:43 +00002889# The introduction of this TestCase caused at least two different errors on
2890# *nix buildbots. Temporarily skip this to let the buildbots move along.
2891@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002892@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2893class LoginTests(unittest.TestCase):
2894 def test_getlogin(self):
2895 user_name = os.getlogin()
2896 self.assertNotEqual(len(user_name), 0)
2897
2898
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002899@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2900 "needs os.getpriority and os.setpriority")
2901class ProgramPriorityTests(unittest.TestCase):
2902 """Tests for os.getpriority() and os.setpriority()."""
2903
2904 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002905
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002906 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2907 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2908 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002909 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2910 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002911 raise unittest.SkipTest("unable to reliably test setpriority "
2912 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002913 else:
2914 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002915 finally:
2916 try:
2917 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2918 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002919 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002920 raise
2921
2922
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002923class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002924
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002925 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002926
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927 def __init__(self, conn):
2928 asynchat.async_chat.__init__(self, conn)
2929 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002930 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 self.closed = False
2932 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002933
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002934 def handle_read(self):
2935 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002936 if self.accumulate:
2937 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002938
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002939 def get_data(self):
2940 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002941
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002942 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002943 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002944 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002945
2946 def handle_error(self):
2947 raise
2948
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002949 def __init__(self, address):
2950 threading.Thread.__init__(self)
2951 asyncore.dispatcher.__init__(self)
2952 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2953 self.bind(address)
2954 self.listen(5)
2955 self.host, self.port = self.socket.getsockname()[:2]
2956 self.handler_instance = None
2957 self._active = False
2958 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002959
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002960 # --- public API
2961
2962 @property
2963 def running(self):
2964 return self._active
2965
2966 def start(self):
2967 assert not self.running
2968 self.__flag = threading.Event()
2969 threading.Thread.start(self)
2970 self.__flag.wait()
2971
2972 def stop(self):
2973 assert self.running
2974 self._active = False
2975 self.join()
2976
2977 def wait(self):
2978 # wait for handler connection to be closed, then stop the server
2979 while not getattr(self.handler_instance, "closed", False):
2980 time.sleep(0.001)
2981 self.stop()
2982
2983 # --- internals
2984
2985 def run(self):
2986 self._active = True
2987 self.__flag.set()
2988 while self._active and asyncore.socket_map:
2989 self._active_lock.acquire()
2990 asyncore.loop(timeout=0.001, count=1)
2991 self._active_lock.release()
2992 asyncore.close_all()
2993
2994 def handle_accept(self):
2995 conn, addr = self.accept()
2996 self.handler_instance = self.Handler(conn)
2997
2998 def handle_connect(self):
2999 self.close()
3000 handle_read = handle_connect
3001
3002 def writable(self):
3003 return 0
3004
3005 def handle_error(self):
3006 raise
3007
3008
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003009@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3010class TestSendfile(unittest.TestCase):
3011
Victor Stinner8c663fd2017-11-08 14:44:44 -08003012 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003013 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003014 not sys.platform.startswith("solaris") and \
3015 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003016 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3017 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003018 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3019 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003020
3021 @classmethod
3022 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003023 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01003024 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003025
3026 @classmethod
3027 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003028 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003029 support.unlink(support.TESTFN)
3030
3031 def setUp(self):
3032 self.server = SendfileTestServer((support.HOST, 0))
3033 self.server.start()
3034 self.client = socket.socket()
3035 self.client.connect((self.server.host, self.server.port))
3036 self.client.settimeout(1)
3037 # synchronize by waiting for "220 ready" response
3038 self.client.recv(1024)
3039 self.sockno = self.client.fileno()
3040 self.file = open(support.TESTFN, 'rb')
3041 self.fileno = self.file.fileno()
3042
3043 def tearDown(self):
3044 self.file.close()
3045 self.client.close()
3046 if self.server.running:
3047 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003048 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003049
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003050 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003051 """A higher level wrapper representing how an application is
3052 supposed to use sendfile().
3053 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003054 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003055 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003056 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003057 except OSError as err:
3058 if err.errno == errno.ECONNRESET:
3059 # disconnected
3060 raise
3061 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3062 # we have to retry send data
3063 continue
3064 else:
3065 raise
3066
3067 def test_send_whole_file(self):
3068 # normal send
3069 total_sent = 0
3070 offset = 0
3071 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003072 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003073 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3074 if sent == 0:
3075 break
3076 offset += sent
3077 total_sent += sent
3078 self.assertTrue(sent <= nbytes)
3079 self.assertEqual(offset, total_sent)
3080
3081 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003082 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003083 self.client.close()
3084 self.server.wait()
3085 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003086 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003087 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003088
3089 def test_send_at_certain_offset(self):
3090 # start sending a file at a certain offset
3091 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003092 offset = len(self.DATA) // 2
3093 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003094 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003095 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003096 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3097 if sent == 0:
3098 break
3099 offset += sent
3100 total_sent += sent
3101 self.assertTrue(sent <= nbytes)
3102
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003103 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003104 self.client.close()
3105 self.server.wait()
3106 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003107 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003108 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003109 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003110 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003111
3112 def test_offset_overflow(self):
3113 # specify an offset > file size
3114 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003115 try:
3116 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3117 except OSError as e:
3118 # Solaris can raise EINVAL if offset >= file length, ignore.
3119 if e.errno != errno.EINVAL:
3120 raise
3121 else:
3122 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003123 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003124 self.client.close()
3125 self.server.wait()
3126 data = self.server.handler_instance.get_data()
3127 self.assertEqual(data, b'')
3128
3129 def test_invalid_offset(self):
3130 with self.assertRaises(OSError) as cm:
3131 os.sendfile(self.sockno, self.fileno, -1, 4096)
3132 self.assertEqual(cm.exception.errno, errno.EINVAL)
3133
Martin Panterbf19d162015-09-09 01:01:13 +00003134 def test_keywords(self):
3135 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003136 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3137 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003138 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003139 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3140 offset=0, count=4096,
3141 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003142
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003143 # --- headers / trailers tests
3144
Serhiy Storchaka43767632013-11-03 21:31:38 +02003145 @requires_headers_trailers
3146 def test_headers(self):
3147 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003148 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003149 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003150 headers=[b"x" * 512, b"y" * 256])
3151 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003152 total_sent += sent
3153 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003154 while total_sent < len(expected_data):
3155 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003156 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3157 offset, nbytes)
3158 if sent == 0:
3159 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003160 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003161 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003162 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003163
Serhiy Storchaka43767632013-11-03 21:31:38 +02003164 self.assertEqual(total_sent, len(expected_data))
3165 self.client.close()
3166 self.server.wait()
3167 data = self.server.handler_instance.get_data()
3168 self.assertEqual(hash(data), hash(expected_data))
3169
3170 @requires_headers_trailers
3171 def test_trailers(self):
3172 TESTFN2 = support.TESTFN + "2"
3173 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003174
3175 self.addCleanup(support.unlink, TESTFN2)
3176 create_file(TESTFN2, file_data)
3177
3178 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003179 os.sendfile(self.sockno, f.fileno(), 0, 5,
3180 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003181 self.client.close()
3182 self.server.wait()
3183 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003184 self.assertEqual(data, b"abcde123456789")
3185
3186 @requires_headers_trailers
3187 @requires_32b
3188 def test_headers_overflow_32bits(self):
3189 self.server.handler_instance.accumulate = False
3190 with self.assertRaises(OSError) as cm:
3191 os.sendfile(self.sockno, self.fileno, 0, 0,
3192 headers=[b"x" * 2**16] * 2**15)
3193 self.assertEqual(cm.exception.errno, errno.EINVAL)
3194
3195 @requires_headers_trailers
3196 @requires_32b
3197 def test_trailers_overflow_32bits(self):
3198 self.server.handler_instance.accumulate = False
3199 with self.assertRaises(OSError) as cm:
3200 os.sendfile(self.sockno, self.fileno, 0, 0,
3201 trailers=[b"x" * 2**16] * 2**15)
3202 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003203
Serhiy Storchaka43767632013-11-03 21:31:38 +02003204 @requires_headers_trailers
3205 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3206 'test needs os.SF_NODISKIO')
3207 def test_flags(self):
3208 try:
3209 os.sendfile(self.sockno, self.fileno, 0, 4096,
3210 flags=os.SF_NODISKIO)
3211 except OSError as err:
3212 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3213 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003214
3215
Larry Hastings9cf065c2012-06-22 16:30:09 -07003216def supports_extended_attributes():
3217 if not hasattr(os, "setxattr"):
3218 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003219
Larry Hastings9cf065c2012-06-22 16:30:09 -07003220 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003221 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003222 try:
3223 os.setxattr(fp.fileno(), b"user.test", b"")
3224 except OSError:
3225 return False
3226 finally:
3227 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003228
3229 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003230
3231
3232@unittest.skipUnless(supports_extended_attributes(),
3233 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003234# Kernels < 2.6.39 don't respect setxattr flags.
3235@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003236class ExtendedAttributeTests(unittest.TestCase):
3237
Larry Hastings9cf065c2012-06-22 16:30:09 -07003238 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003239 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003240 self.addCleanup(support.unlink, fn)
3241 create_file(fn)
3242
Benjamin Peterson799bd802011-08-31 22:15:17 -04003243 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003244 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003245 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003246
Victor Stinnerf12e5062011-10-16 22:12:03 +02003247 init_xattr = listxattr(fn)
3248 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003249
Larry Hastings9cf065c2012-06-22 16:30:09 -07003250 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003251 xattr = set(init_xattr)
3252 xattr.add("user.test")
3253 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003254 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3255 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3256 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003257
Benjamin Peterson799bd802011-08-31 22:15:17 -04003258 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003259 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003260 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003261
Benjamin Peterson799bd802011-08-31 22:15:17 -04003262 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003263 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003264 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003265
Larry Hastings9cf065c2012-06-22 16:30:09 -07003266 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003267 xattr.add("user.test2")
3268 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003269 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003270
Benjamin Peterson799bd802011-08-31 22:15:17 -04003271 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003272 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003273 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003274
Victor Stinnerf12e5062011-10-16 22:12:03 +02003275 xattr.remove("user.test")
3276 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003277 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3278 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3279 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3280 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003281 many = sorted("user.test{}".format(i) for i in range(100))
3282 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003283 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003284 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003285
Larry Hastings9cf065c2012-06-22 16:30:09 -07003286 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003287 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003288 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003289
3290 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3291 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003292
3293 def test_simple(self):
3294 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3295 os.listxattr)
3296
3297 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003298 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3299 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003300
3301 def test_fds(self):
3302 def getxattr(path, *args):
3303 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003304 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003305 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003306 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003307 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003308 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003309 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003310 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003311 def listxattr(path, *args):
3312 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003313 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003314 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3315
3316
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003317@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3318class TermsizeTests(unittest.TestCase):
3319 def test_does_not_crash(self):
3320 """Check if get_terminal_size() returns a meaningful value.
3321
3322 There's no easy portable way to actually check the size of the
3323 terminal, so let's check if it returns something sensible instead.
3324 """
3325 try:
3326 size = os.get_terminal_size()
3327 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003328 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003329 # Under win32 a generic OSError can be thrown if the
3330 # handle cannot be retrieved
3331 self.skipTest("failed to query terminal size")
3332 raise
3333
Antoine Pitroucfade362012-02-08 23:48:59 +01003334 self.assertGreaterEqual(size.columns, 0)
3335 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003336
3337 def test_stty_match(self):
3338 """Check if stty returns the same results
3339
3340 stty actually tests stdin, so get_terminal_size is invoked on
3341 stdin explicitly. If stty succeeded, then get_terminal_size()
3342 should work too.
3343 """
3344 try:
3345 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003346 except (FileNotFoundError, subprocess.CalledProcessError,
3347 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003348 self.skipTest("stty invocation failed")
3349 expected = (int(size[1]), int(size[0])) # reversed order
3350
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003351 try:
3352 actual = os.get_terminal_size(sys.__stdin__.fileno())
3353 except OSError as e:
3354 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3355 # Under win32 a generic OSError can be thrown if the
3356 # handle cannot be retrieved
3357 self.skipTest("failed to query terminal size")
3358 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003359 self.assertEqual(expected, actual)
3360
3361
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003362@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003363@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003364class MemfdCreateTests(unittest.TestCase):
3365 def test_memfd_create(self):
3366 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3367 self.assertNotEqual(fd, -1)
3368 self.addCleanup(os.close, fd)
3369 self.assertFalse(os.get_inheritable(fd))
3370 with open(fd, "wb", closefd=False) as f:
3371 f.write(b'memfd_create')
3372 self.assertEqual(f.tell(), 12)
3373
3374 fd2 = os.memfd_create("Hi")
3375 self.addCleanup(os.close, fd2)
3376 self.assertFalse(os.get_inheritable(fd2))
3377
3378
Victor Stinner292c8352012-10-30 02:17:38 +01003379class OSErrorTests(unittest.TestCase):
3380 def setUp(self):
3381 class Str(str):
3382 pass
3383
Victor Stinnerafe17062012-10-31 22:47:43 +01003384 self.bytes_filenames = []
3385 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003386 if support.TESTFN_UNENCODABLE is not None:
3387 decoded = support.TESTFN_UNENCODABLE
3388 else:
3389 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003390 self.unicode_filenames.append(decoded)
3391 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003392 if support.TESTFN_UNDECODABLE is not None:
3393 encoded = support.TESTFN_UNDECODABLE
3394 else:
3395 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003396 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003397 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003398 self.bytes_filenames.append(memoryview(encoded))
3399
3400 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003401
3402 def test_oserror_filename(self):
3403 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003404 (self.filenames, os.chdir,),
3405 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003406 (self.filenames, os.lstat,),
3407 (self.filenames, os.open, os.O_RDONLY),
3408 (self.filenames, os.rmdir,),
3409 (self.filenames, os.stat,),
3410 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003411 ]
3412 if sys.platform == "win32":
3413 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003414 (self.bytes_filenames, os.rename, b"dst"),
3415 (self.bytes_filenames, os.replace, b"dst"),
3416 (self.unicode_filenames, os.rename, "dst"),
3417 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003418 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003419 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003420 else:
3421 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003422 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003423 (self.filenames, os.rename, "dst"),
3424 (self.filenames, os.replace, "dst"),
3425 ))
3426 if hasattr(os, "chown"):
3427 funcs.append((self.filenames, os.chown, 0, 0))
3428 if hasattr(os, "lchown"):
3429 funcs.append((self.filenames, os.lchown, 0, 0))
3430 if hasattr(os, "truncate"):
3431 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003432 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003433 funcs.append((self.filenames, os.chflags, 0))
3434 if hasattr(os, "lchflags"):
3435 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003436 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003437 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003438 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003439 if sys.platform == "win32":
3440 funcs.append((self.bytes_filenames, os.link, b"dst"))
3441 funcs.append((self.unicode_filenames, os.link, "dst"))
3442 else:
3443 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003444 if hasattr(os, "listxattr"):
3445 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003446 (self.filenames, os.listxattr,),
3447 (self.filenames, os.getxattr, "user.test"),
3448 (self.filenames, os.setxattr, "user.test", b'user'),
3449 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003450 ))
3451 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003452 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003453 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003454 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003455
Steve Dowercc16be82016-09-08 10:35:16 -07003456
Victor Stinnerafe17062012-10-31 22:47:43 +01003457 for filenames, func, *func_args in funcs:
3458 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003459 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003460 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003461 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003462 else:
3463 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3464 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003465 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003466 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003467 except UnicodeDecodeError:
3468 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003469 else:
3470 self.fail("No exception thrown by {}".format(func))
3471
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003472class CPUCountTests(unittest.TestCase):
3473 def test_cpu_count(self):
3474 cpus = os.cpu_count()
3475 if cpus is not None:
3476 self.assertIsInstance(cpus, int)
3477 self.assertGreater(cpus, 0)
3478 else:
3479 self.skipTest("Could not determine the number of CPUs")
3480
Victor Stinnerdaf45552013-08-28 00:53:59 +02003481
3482class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003483 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003484 fd = os.open(__file__, os.O_RDONLY)
3485 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003486 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003487
Victor Stinnerdaf45552013-08-28 00:53:59 +02003488 os.set_inheritable(fd, True)
3489 self.assertEqual(os.get_inheritable(fd), True)
3490
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003491 @unittest.skipIf(fcntl is None, "need fcntl")
3492 def test_get_inheritable_cloexec(self):
3493 fd = os.open(__file__, os.O_RDONLY)
3494 self.addCleanup(os.close, fd)
3495 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003496
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003497 # clear FD_CLOEXEC flag
3498 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3499 flags &= ~fcntl.FD_CLOEXEC
3500 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003501
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003502 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003503
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003504 @unittest.skipIf(fcntl is None, "need fcntl")
3505 def test_set_inheritable_cloexec(self):
3506 fd = os.open(__file__, os.O_RDONLY)
3507 self.addCleanup(os.close, fd)
3508 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3509 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003510
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003511 os.set_inheritable(fd, True)
3512 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3513 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003514
Victor Stinnerdaf45552013-08-28 00:53:59 +02003515 def test_open(self):
3516 fd = os.open(__file__, os.O_RDONLY)
3517 self.addCleanup(os.close, fd)
3518 self.assertEqual(os.get_inheritable(fd), False)
3519
3520 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3521 def test_pipe(self):
3522 rfd, wfd = os.pipe()
3523 self.addCleanup(os.close, rfd)
3524 self.addCleanup(os.close, wfd)
3525 self.assertEqual(os.get_inheritable(rfd), False)
3526 self.assertEqual(os.get_inheritable(wfd), False)
3527
3528 def test_dup(self):
3529 fd1 = os.open(__file__, os.O_RDONLY)
3530 self.addCleanup(os.close, fd1)
3531
3532 fd2 = os.dup(fd1)
3533 self.addCleanup(os.close, fd2)
3534 self.assertEqual(os.get_inheritable(fd2), False)
3535
Zackery Spytz5be66602019-08-23 12:38:41 -06003536 def test_dup_standard_stream(self):
3537 fd = os.dup(1)
3538 self.addCleanup(os.close, fd)
3539 self.assertGreater(fd, 0)
3540
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003541 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3542 def test_dup_nul(self):
3543 # os.dup() was creating inheritable fds for character files.
3544 fd1 = os.open('NUL', os.O_RDONLY)
3545 self.addCleanup(os.close, fd1)
3546 fd2 = os.dup(fd1)
3547 self.addCleanup(os.close, fd2)
3548 self.assertFalse(os.get_inheritable(fd2))
3549
Victor Stinnerdaf45552013-08-28 00:53:59 +02003550 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3551 def test_dup2(self):
3552 fd = os.open(__file__, os.O_RDONLY)
3553 self.addCleanup(os.close, fd)
3554
3555 # inheritable by default
3556 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003557 self.addCleanup(os.close, fd2)
3558 self.assertEqual(os.dup2(fd, fd2), fd2)
3559 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003560
3561 # force non-inheritable
3562 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003563 self.addCleanup(os.close, fd3)
3564 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3565 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003566
3567 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3568 def test_openpty(self):
3569 master_fd, slave_fd = os.openpty()
3570 self.addCleanup(os.close, master_fd)
3571 self.addCleanup(os.close, slave_fd)
3572 self.assertEqual(os.get_inheritable(master_fd), False)
3573 self.assertEqual(os.get_inheritable(slave_fd), False)
3574
3575
Brett Cannon3f9183b2016-08-26 14:44:48 -07003576class PathTConverterTests(unittest.TestCase):
3577 # tuples of (function name, allows fd arguments, additional arguments to
3578 # function, cleanup function)
3579 functions = [
3580 ('stat', True, (), None),
3581 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003582 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003583 ('chflags', False, (0,), None),
3584 ('lchflags', False, (0,), None),
3585 ('open', False, (0,), getattr(os, 'close', None)),
3586 ]
3587
3588 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003589 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003590 if os.name == 'nt':
3591 bytes_fspath = bytes_filename = None
3592 else:
3593 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003594 bytes_fspath = FakePath(bytes_filename)
3595 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003596 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003597 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003598
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003599 int_fspath = FakePath(fd)
3600 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003601
3602 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3603 with self.subTest(name=name):
3604 try:
3605 fn = getattr(os, name)
3606 except AttributeError:
3607 continue
3608
Brett Cannon8f96a302016-08-26 19:30:11 -07003609 for path in (str_filename, bytes_filename, str_fspath,
3610 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003611 if path is None:
3612 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003613 with self.subTest(name=name, path=path):
3614 result = fn(path, *extra_args)
3615 if cleanup_fn is not None:
3616 cleanup_fn(result)
3617
3618 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003619 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003620 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003621
3622 if allow_fd:
3623 result = fn(fd, *extra_args) # should not fail
3624 if cleanup_fn is not None:
3625 cleanup_fn(result)
3626 else:
3627 with self.assertRaisesRegex(
3628 TypeError,
3629 'os.PathLike'):
3630 fn(fd, *extra_args)
3631
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003632 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003633 msg = r'__fspath__\(\) to return str or bytes, not %s'
3634 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003635 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003636 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003637 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003638 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003639 os.stat(FakePath(object()))
3640
Brett Cannon3f9183b2016-08-26 14:44:48 -07003641
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003642@unittest.skipUnless(hasattr(os, 'get_blocking'),
3643 'needs os.get_blocking() and os.set_blocking()')
3644class BlockingTests(unittest.TestCase):
3645 def test_blocking(self):
3646 fd = os.open(__file__, os.O_RDONLY)
3647 self.addCleanup(os.close, fd)
3648 self.assertEqual(os.get_blocking(fd), True)
3649
3650 os.set_blocking(fd, False)
3651 self.assertEqual(os.get_blocking(fd), False)
3652
3653 os.set_blocking(fd, True)
3654 self.assertEqual(os.get_blocking(fd), True)
3655
3656
Yury Selivanov97e2e062014-09-26 12:33:06 -04003657
3658class ExportsTests(unittest.TestCase):
3659 def test_os_all(self):
3660 self.assertIn('open', os.__all__)
3661 self.assertIn('walk', os.__all__)
3662
3663
Eddie Elizondob3966632019-11-05 07:16:14 -08003664class TestDirEntry(unittest.TestCase):
3665 def setUp(self):
3666 self.path = os.path.realpath(support.TESTFN)
3667 self.addCleanup(support.rmtree, self.path)
3668 os.mkdir(self.path)
3669
3670 def test_uninstantiable(self):
3671 self.assertRaises(TypeError, os.DirEntry)
3672
3673 def test_unpickable(self):
3674 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3675 entry = [entry for entry in os.scandir(self.path)].pop()
3676 self.assertIsInstance(entry, os.DirEntry)
3677 self.assertEqual(entry.name, "file.txt")
3678 import pickle
3679 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3680
3681
Victor Stinner6036e442015-03-08 01:58:04 +01003682class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003683 check_no_resource_warning = support.check_no_resource_warning
3684
Victor Stinner6036e442015-03-08 01:58:04 +01003685 def setUp(self):
3686 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003687 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003688 self.addCleanup(support.rmtree, self.path)
3689 os.mkdir(self.path)
3690
3691 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003692 path = self.bytes_path if isinstance(name, bytes) else self.path
3693 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003694 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003695 return filename
3696
3697 def get_entries(self, names):
3698 entries = dict((entry.name, entry)
3699 for entry in os.scandir(self.path))
3700 self.assertEqual(sorted(entries.keys()), names)
3701 return entries
3702
3703 def assert_stat_equal(self, stat1, stat2, skip_fields):
3704 if skip_fields:
3705 for attr in dir(stat1):
3706 if not attr.startswith("st_"):
3707 continue
3708 if attr in ("st_dev", "st_ino", "st_nlink"):
3709 continue
3710 self.assertEqual(getattr(stat1, attr),
3711 getattr(stat2, attr),
3712 (stat1, stat2, attr))
3713 else:
3714 self.assertEqual(stat1, stat2)
3715
Eddie Elizondob3966632019-11-05 07:16:14 -08003716 def test_uninstantiable(self):
3717 scandir_iter = os.scandir(self.path)
3718 self.assertRaises(TypeError, type(scandir_iter))
3719 scandir_iter.close()
3720
3721 def test_unpickable(self):
3722 filename = self.create_file("file.txt")
3723 scandir_iter = os.scandir(self.path)
3724 import pickle
3725 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3726 scandir_iter.close()
3727
Victor Stinner6036e442015-03-08 01:58:04 +01003728 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003729 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003730 self.assertEqual(entry.name, name)
3731 self.assertEqual(entry.path, os.path.join(self.path, name))
3732 self.assertEqual(entry.inode(),
3733 os.stat(entry.path, follow_symlinks=False).st_ino)
3734
3735 entry_stat = os.stat(entry.path)
3736 self.assertEqual(entry.is_dir(),
3737 stat.S_ISDIR(entry_stat.st_mode))
3738 self.assertEqual(entry.is_file(),
3739 stat.S_ISREG(entry_stat.st_mode))
3740 self.assertEqual(entry.is_symlink(),
3741 os.path.islink(entry.path))
3742
3743 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3744 self.assertEqual(entry.is_dir(follow_symlinks=False),
3745 stat.S_ISDIR(entry_lstat.st_mode))
3746 self.assertEqual(entry.is_file(follow_symlinks=False),
3747 stat.S_ISREG(entry_lstat.st_mode))
3748
3749 self.assert_stat_equal(entry.stat(),
3750 entry_stat,
3751 os.name == 'nt' and not is_symlink)
3752 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3753 entry_lstat,
3754 os.name == 'nt')
3755
3756 def test_attributes(self):
3757 link = hasattr(os, 'link')
3758 symlink = support.can_symlink()
3759
3760 dirname = os.path.join(self.path, "dir")
3761 os.mkdir(dirname)
3762 filename = self.create_file("file.txt")
3763 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003764 try:
3765 os.link(filename, os.path.join(self.path, "link_file.txt"))
3766 except PermissionError as e:
3767 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003768 if symlink:
3769 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3770 target_is_directory=True)
3771 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3772
3773 names = ['dir', 'file.txt']
3774 if link:
3775 names.append('link_file.txt')
3776 if symlink:
3777 names.extend(('symlink_dir', 'symlink_file.txt'))
3778 entries = self.get_entries(names)
3779
3780 entry = entries['dir']
3781 self.check_entry(entry, 'dir', True, False, False)
3782
3783 entry = entries['file.txt']
3784 self.check_entry(entry, 'file.txt', False, True, False)
3785
3786 if link:
3787 entry = entries['link_file.txt']
3788 self.check_entry(entry, 'link_file.txt', False, True, False)
3789
3790 if symlink:
3791 entry = entries['symlink_dir']
3792 self.check_entry(entry, 'symlink_dir', True, False, True)
3793
3794 entry = entries['symlink_file.txt']
3795 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3796
3797 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003798 path = self.bytes_path if isinstance(name, bytes) else self.path
3799 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003800 self.assertEqual(len(entries), 1)
3801
3802 entry = entries[0]
3803 self.assertEqual(entry.name, name)
3804 return entry
3805
Brett Cannon96881cd2016-06-10 14:37:21 -07003806 def create_file_entry(self, name='file.txt'):
3807 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003808 return self.get_entry(os.path.basename(filename))
3809
3810 def test_current_directory(self):
3811 filename = self.create_file()
3812 old_dir = os.getcwd()
3813 try:
3814 os.chdir(self.path)
3815
3816 # call scandir() without parameter: it must list the content
3817 # of the current directory
3818 entries = dict((entry.name, entry) for entry in os.scandir())
3819 self.assertEqual(sorted(entries.keys()),
3820 [os.path.basename(filename)])
3821 finally:
3822 os.chdir(old_dir)
3823
3824 def test_repr(self):
3825 entry = self.create_file_entry()
3826 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3827
Brett Cannon96881cd2016-06-10 14:37:21 -07003828 def test_fspath_protocol(self):
3829 entry = self.create_file_entry()
3830 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3831
3832 def test_fspath_protocol_bytes(self):
3833 bytes_filename = os.fsencode('bytesfile.txt')
3834 bytes_entry = self.create_file_entry(name=bytes_filename)
3835 fspath = os.fspath(bytes_entry)
3836 self.assertIsInstance(fspath, bytes)
3837 self.assertEqual(fspath,
3838 os.path.join(os.fsencode(self.path),bytes_filename))
3839
Victor Stinner6036e442015-03-08 01:58:04 +01003840 def test_removed_dir(self):
3841 path = os.path.join(self.path, 'dir')
3842
3843 os.mkdir(path)
3844 entry = self.get_entry('dir')
3845 os.rmdir(path)
3846
3847 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3848 if os.name == 'nt':
3849 self.assertTrue(entry.is_dir())
3850 self.assertFalse(entry.is_file())
3851 self.assertFalse(entry.is_symlink())
3852 if os.name == 'nt':
3853 self.assertRaises(FileNotFoundError, entry.inode)
3854 # don't fail
3855 entry.stat()
3856 entry.stat(follow_symlinks=False)
3857 else:
3858 self.assertGreater(entry.inode(), 0)
3859 self.assertRaises(FileNotFoundError, entry.stat)
3860 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3861
3862 def test_removed_file(self):
3863 entry = self.create_file_entry()
3864 os.unlink(entry.path)
3865
3866 self.assertFalse(entry.is_dir())
3867 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3868 if os.name == 'nt':
3869 self.assertTrue(entry.is_file())
3870 self.assertFalse(entry.is_symlink())
3871 if os.name == 'nt':
3872 self.assertRaises(FileNotFoundError, entry.inode)
3873 # don't fail
3874 entry.stat()
3875 entry.stat(follow_symlinks=False)
3876 else:
3877 self.assertGreater(entry.inode(), 0)
3878 self.assertRaises(FileNotFoundError, entry.stat)
3879 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3880
3881 def test_broken_symlink(self):
3882 if not support.can_symlink():
3883 return self.skipTest('cannot create symbolic link')
3884
3885 filename = self.create_file("file.txt")
3886 os.symlink(filename,
3887 os.path.join(self.path, "symlink.txt"))
3888 entries = self.get_entries(['file.txt', 'symlink.txt'])
3889 entry = entries['symlink.txt']
3890 os.unlink(filename)
3891
3892 self.assertGreater(entry.inode(), 0)
3893 self.assertFalse(entry.is_dir())
3894 self.assertFalse(entry.is_file()) # broken symlink returns False
3895 self.assertFalse(entry.is_dir(follow_symlinks=False))
3896 self.assertFalse(entry.is_file(follow_symlinks=False))
3897 self.assertTrue(entry.is_symlink())
3898 self.assertRaises(FileNotFoundError, entry.stat)
3899 # don't fail
3900 entry.stat(follow_symlinks=False)
3901
3902 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003903 self.create_file("file.txt")
3904
3905 path_bytes = os.fsencode(self.path)
3906 entries = list(os.scandir(path_bytes))
3907 self.assertEqual(len(entries), 1, entries)
3908 entry = entries[0]
3909
3910 self.assertEqual(entry.name, b'file.txt')
3911 self.assertEqual(entry.path,
3912 os.fsencode(os.path.join(self.path, 'file.txt')))
3913
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003914 def test_bytes_like(self):
3915 self.create_file("file.txt")
3916
3917 for cls in bytearray, memoryview:
3918 path_bytes = cls(os.fsencode(self.path))
3919 with self.assertWarns(DeprecationWarning):
3920 entries = list(os.scandir(path_bytes))
3921 self.assertEqual(len(entries), 1, entries)
3922 entry = entries[0]
3923
3924 self.assertEqual(entry.name, b'file.txt')
3925 self.assertEqual(entry.path,
3926 os.fsencode(os.path.join(self.path, 'file.txt')))
3927 self.assertIs(type(entry.name), bytes)
3928 self.assertIs(type(entry.path), bytes)
3929
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003930 @unittest.skipUnless(os.listdir in os.supports_fd,
3931 'fd support for listdir required for this test.')
3932 def test_fd(self):
3933 self.assertIn(os.scandir, os.supports_fd)
3934 self.create_file('file.txt')
3935 expected_names = ['file.txt']
3936 if support.can_symlink():
3937 os.symlink('file.txt', os.path.join(self.path, 'link'))
3938 expected_names.append('link')
3939
3940 fd = os.open(self.path, os.O_RDONLY)
3941 try:
3942 with os.scandir(fd) as it:
3943 entries = list(it)
3944 names = [entry.name for entry in entries]
3945 self.assertEqual(sorted(names), expected_names)
3946 self.assertEqual(names, os.listdir(fd))
3947 for entry in entries:
3948 self.assertEqual(entry.path, entry.name)
3949 self.assertEqual(os.fspath(entry), entry.name)
3950 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3951 if os.stat in os.supports_dir_fd:
3952 st = os.stat(entry.name, dir_fd=fd)
3953 self.assertEqual(entry.stat(), st)
3954 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3955 self.assertEqual(entry.stat(follow_symlinks=False), st)
3956 finally:
3957 os.close(fd)
3958
Victor Stinner6036e442015-03-08 01:58:04 +01003959 def test_empty_path(self):
3960 self.assertRaises(FileNotFoundError, os.scandir, '')
3961
3962 def test_consume_iterator_twice(self):
3963 self.create_file("file.txt")
3964 iterator = os.scandir(self.path)
3965
3966 entries = list(iterator)
3967 self.assertEqual(len(entries), 1, entries)
3968
3969 # check than consuming the iterator twice doesn't raise exception
3970 entries2 = list(iterator)
3971 self.assertEqual(len(entries2), 0, entries2)
3972
3973 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003974 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003975 self.assertRaises(TypeError, os.scandir, obj)
3976
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003977 def test_close(self):
3978 self.create_file("file.txt")
3979 self.create_file("file2.txt")
3980 iterator = os.scandir(self.path)
3981 next(iterator)
3982 iterator.close()
3983 # multiple closes
3984 iterator.close()
3985 with self.check_no_resource_warning():
3986 del iterator
3987
3988 def test_context_manager(self):
3989 self.create_file("file.txt")
3990 self.create_file("file2.txt")
3991 with os.scandir(self.path) as iterator:
3992 next(iterator)
3993 with self.check_no_resource_warning():
3994 del iterator
3995
3996 def test_context_manager_close(self):
3997 self.create_file("file.txt")
3998 self.create_file("file2.txt")
3999 with os.scandir(self.path) as iterator:
4000 next(iterator)
4001 iterator.close()
4002
4003 def test_context_manager_exception(self):
4004 self.create_file("file.txt")
4005 self.create_file("file2.txt")
4006 with self.assertRaises(ZeroDivisionError):
4007 with os.scandir(self.path) as iterator:
4008 next(iterator)
4009 1/0
4010 with self.check_no_resource_warning():
4011 del iterator
4012
4013 def test_resource_warning(self):
4014 self.create_file("file.txt")
4015 self.create_file("file2.txt")
4016 iterator = os.scandir(self.path)
4017 next(iterator)
4018 with self.assertWarns(ResourceWarning):
4019 del iterator
4020 support.gc_collect()
4021 # exhausted iterator
4022 iterator = os.scandir(self.path)
4023 list(iterator)
4024 with self.check_no_resource_warning():
4025 del iterator
4026
Victor Stinner6036e442015-03-08 01:58:04 +01004027
Ethan Furmancdc08792016-06-02 15:06:09 -07004028class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004029
4030 # Abstracted so it can be overridden to test pure Python implementation
4031 # if a C version is provided.
4032 fspath = staticmethod(os.fspath)
4033
Ethan Furmancdc08792016-06-02 15:06:09 -07004034 def test_return_bytes(self):
4035 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004036 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004037
4038 def test_return_string(self):
4039 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004040 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004041
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004042 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004043 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004044 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004045
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004046 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004047 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4048 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4049
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004050 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004051 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4052 self.assertTrue(issubclass(FakePath, os.PathLike))
4053 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004054
Ethan Furmancdc08792016-06-02 15:06:09 -07004055 def test_garbage_in_exception_out(self):
4056 vapor = type('blah', (), {})
4057 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004058 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004059
4060 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004061 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004062
Brett Cannon044283a2016-07-15 10:41:49 -07004063 def test_bad_pathlike(self):
4064 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004065 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004066 # __fspath__ attribute that is not callable.
4067 c = type('foo', (), {})
4068 c.__fspath__ = 1
4069 self.assertRaises(TypeError, self.fspath, c())
4070 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004071 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004072 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004073
Bar Hareleae87e32019-12-22 11:57:27 +02004074 def test_pathlike_subclasshook(self):
4075 # bpo-38878: subclasshook causes subclass checks
4076 # true on abstract implementation.
4077 class A(os.PathLike):
4078 pass
4079 self.assertFalse(issubclass(FakePath, A))
4080 self.assertTrue(issubclass(FakePath, os.PathLike))
4081
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004082 def test_pathlike_class_getitem(self):
4083 self.assertIs(os.PathLike[bytes], os.PathLike)
4084
Victor Stinnerc29b5852017-11-02 07:28:27 -07004085
4086class TimesTests(unittest.TestCase):
4087 def test_times(self):
4088 times = os.times()
4089 self.assertIsInstance(times, os.times_result)
4090
4091 for field in ('user', 'system', 'children_user', 'children_system',
4092 'elapsed'):
4093 value = getattr(times, field)
4094 self.assertIsInstance(value, float)
4095
4096 if os.name == 'nt':
4097 self.assertEqual(times.children_user, 0)
4098 self.assertEqual(times.children_system, 0)
4099 self.assertEqual(times.elapsed, 0)
4100
4101
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004102# Only test if the C version is provided, otherwise TestPEP519 already tested
4103# the pure Python implementation.
4104if hasattr(os, "_fspath"):
4105 class TestPEP519PurePython(TestPEP519):
4106
4107 """Explicitly test the pure Python implementation of os.fspath()."""
4108
4109 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004110
4111
Fred Drake2e2be372001-09-20 21:33:42 +00004112if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004113 unittest.main()