blob: 4a076e3bbf5426bbf1ef0522938e3457b8e428c0 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dower9eb3d542019-08-21 15:52:42 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Miss Islington (bot)68c1c392019-06-28 09:23:06 -070025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Miss Islington (bot)63429c82019-06-26 09:14:30 -070086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Miss Islington (bot)68c1c392019-06-28 09:23:06 -070091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Miss Islington (bot)e3761ca2019-06-28 11:07:10 -0700109 with support.change_cwd(tmpdir) as path:
Miss Islington (bot)68c1c392019-06-28 09:23:06 -0700110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Miss Islington (bot)63429c82019-06-26 09:14:30 -0700144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Miss Islington (bot)cad7abf2019-09-04 15:18:05 -0700594 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
595 def test_stat_block_device(self):
596 # bpo-38030: os.stat fails for block devices
597 # Test a filename like "//./C:"
598 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
599 result = os.stat(fname)
600 self.assertEqual(result.st_mode, stat.S_IFBLK)
601
Victor Stinner47aacc82015-06-12 17:26:23 +0200602
603class UtimeTests(unittest.TestCase):
604 def setUp(self):
605 self.dirname = support.TESTFN
606 self.fname = os.path.join(self.dirname, "f1")
607
608 self.addCleanup(support.rmtree, self.dirname)
609 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100610 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200611
Victor Stinner47aacc82015-06-12 17:26:23 +0200612 def support_subsecond(self, filename):
613 # Heuristic to check if the filesystem supports timestamp with
614 # subsecond resolution: check if float and int timestamps are different
615 st = os.stat(filename)
616 return ((st.st_atime != st[7])
617 or (st.st_mtime != st[8])
618 or (st.st_ctime != st[9]))
619
620 def _test_utime(self, set_time, filename=None):
621 if not filename:
622 filename = self.fname
623
624 support_subsecond = self.support_subsecond(filename)
625 if support_subsecond:
626 # Timestamp with a resolution of 1 microsecond (10^-6).
627 #
628 # The resolution of the C internal function used by os.utime()
629 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
630 # test with a resolution of 1 ns requires more work:
631 # see the issue #15745.
632 atime_ns = 1002003000 # 1.002003 seconds
633 mtime_ns = 4005006000 # 4.005006 seconds
634 else:
635 # use a resolution of 1 second
636 atime_ns = 5 * 10**9
637 mtime_ns = 8 * 10**9
638
639 set_time(filename, (atime_ns, mtime_ns))
640 st = os.stat(filename)
641
642 if support_subsecond:
643 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
644 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
645 else:
646 self.assertEqual(st.st_atime, atime_ns * 1e-9)
647 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
648 self.assertEqual(st.st_atime_ns, atime_ns)
649 self.assertEqual(st.st_mtime_ns, mtime_ns)
650
651 def test_utime(self):
652 def set_time(filename, ns):
653 # test the ns keyword parameter
654 os.utime(filename, ns=ns)
655 self._test_utime(set_time)
656
657 @staticmethod
658 def ns_to_sec(ns):
659 # Convert a number of nanosecond (int) to a number of seconds (float).
660 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
661 # issue, os.utime() rounds towards minus infinity.
662 return (ns * 1e-9) + 0.5e-9
663
664 def test_utime_by_indexed(self):
665 # pass times as floating point seconds as the second indexed parameter
666 def set_time(filename, ns):
667 atime_ns, mtime_ns = ns
668 atime = self.ns_to_sec(atime_ns)
669 mtime = self.ns_to_sec(mtime_ns)
670 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
671 # or utime(time_t)
672 os.utime(filename, (atime, mtime))
673 self._test_utime(set_time)
674
675 def test_utime_by_times(self):
676 def set_time(filename, ns):
677 atime_ns, mtime_ns = ns
678 atime = self.ns_to_sec(atime_ns)
679 mtime = self.ns_to_sec(mtime_ns)
680 # test the times keyword parameter
681 os.utime(filename, times=(atime, mtime))
682 self._test_utime(set_time)
683
684 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
685 "follow_symlinks support for utime required "
686 "for this test.")
687 def test_utime_nofollow_symlinks(self):
688 def set_time(filename, ns):
689 # use follow_symlinks=False to test utimensat(timespec)
690 # or lutimes(timeval)
691 os.utime(filename, ns=ns, follow_symlinks=False)
692 self._test_utime(set_time)
693
694 @unittest.skipUnless(os.utime in os.supports_fd,
695 "fd support for utime required for this test.")
696 def test_utime_fd(self):
697 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100698 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200699 # use a file descriptor to test futimens(timespec)
700 # or futimes(timeval)
701 os.utime(fp.fileno(), ns=ns)
702 self._test_utime(set_time)
703
704 @unittest.skipUnless(os.utime in os.supports_dir_fd,
705 "dir_fd support for utime required for this test.")
706 def test_utime_dir_fd(self):
707 def set_time(filename, ns):
708 dirname, name = os.path.split(filename)
709 dirfd = os.open(dirname, os.O_RDONLY)
710 try:
711 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
712 os.utime(name, dir_fd=dirfd, ns=ns)
713 finally:
714 os.close(dirfd)
715 self._test_utime(set_time)
716
717 def test_utime_directory(self):
718 def set_time(filename, ns):
719 # test calling os.utime() on a directory
720 os.utime(filename, ns=ns)
721 self._test_utime(set_time, filename=self.dirname)
722
723 def _test_utime_current(self, set_time):
724 # Get the system clock
725 current = time.time()
726
727 # Call os.utime() to set the timestamp to the current system clock
728 set_time(self.fname)
729
730 if not self.support_subsecond(self.fname):
731 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700732 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200733 # On Windows, the usual resolution of time.time() is 15.6 ms.
734 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700735 #
736 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
737 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200738 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200739 st = os.stat(self.fname)
740 msg = ("st_time=%r, current=%r, dt=%r"
741 % (st.st_mtime, current, st.st_mtime - current))
742 self.assertAlmostEqual(st.st_mtime, current,
743 delta=delta, msg=msg)
744
745 def test_utime_current(self):
746 def set_time(filename):
747 # Set to the current time in the new way
748 os.utime(self.fname)
749 self._test_utime_current(set_time)
750
751 def test_utime_current_old(self):
752 def set_time(filename):
753 # Set to the current time in the old explicit way.
754 os.utime(self.fname, None)
755 self._test_utime_current(set_time)
756
757 def get_file_system(self, path):
758 if sys.platform == 'win32':
759 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
760 import ctypes
761 kernel32 = ctypes.windll.kernel32
762 buf = ctypes.create_unicode_buffer("", 100)
763 ok = kernel32.GetVolumeInformationW(root, None, 0,
764 None, None, None,
765 buf, len(buf))
766 if ok:
767 return buf.value
768 # return None if the filesystem is unknown
769
770 def test_large_time(self):
771 # Many filesystems are limited to the year 2038. At least, the test
772 # pass with NTFS filesystem.
773 if self.get_file_system(self.dirname) != "NTFS":
774 self.skipTest("requires NTFS")
775
776 large = 5000000000 # some day in 2128
777 os.utime(self.fname, (large, large))
778 self.assertEqual(os.stat(self.fname).st_mtime, large)
779
780 def test_utime_invalid_arguments(self):
781 # seconds and nanoseconds parameters are mutually exclusive
782 with self.assertRaises(ValueError):
783 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200784 with self.assertRaises(TypeError):
785 os.utime(self.fname, [5, 5])
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, (5,))
788 with self.assertRaises(TypeError):
789 os.utime(self.fname, (5, 5, 5))
790 with self.assertRaises(TypeError):
791 os.utime(self.fname, ns=[5, 5])
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, ns=(5,))
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, ns=(5, 5, 5))
796
797 if os.utime not in os.supports_follow_symlinks:
798 with self.assertRaises(NotImplementedError):
799 os.utime(self.fname, (5, 5), follow_symlinks=False)
800 if os.utime not in os.supports_fd:
801 with open(self.fname, 'wb', 0) as fp:
802 with self.assertRaises(TypeError):
803 os.utime(fp.fileno(), (5, 5))
804 if os.utime not in os.supports_dir_fd:
805 with self.assertRaises(NotImplementedError):
806 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200807
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300808 @support.cpython_only
809 def test_issue31577(self):
810 # The interpreter shouldn't crash in case utime() received a bad
811 # ns argument.
812 def get_bad_int(divmod_ret_val):
813 class BadInt:
814 def __divmod__(*args):
815 return divmod_ret_val
816 return BadInt()
817 with self.assertRaises(TypeError):
818 os.utime(self.fname, ns=(get_bad_int(42), 1))
819 with self.assertRaises(TypeError):
820 os.utime(self.fname, ns=(get_bad_int(()), 1))
821 with self.assertRaises(TypeError):
822 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
823
Victor Stinner47aacc82015-06-12 17:26:23 +0200824
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000825from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000826
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000827class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000828 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000829 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000830
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 def setUp(self):
832 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000835 for key, value in self._reference().items():
836 os.environ[key] = value
837
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 def tearDown(self):
839 os.environ.clear()
840 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000841 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000842 os.environb.clear()
843 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000844
Christian Heimes90333392007-11-01 19:08:42 +0000845 def _reference(self):
846 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
847
848 def _empty_mapping(self):
849 os.environ.clear()
850 return os.environ
851
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000852 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000855 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000856 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300857 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200858 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 value = popen.read().strip()
860 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000861
Xavier de Gayed1415312016-07-22 12:15:29 +0200862 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
863 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000864 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
866 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300867 it = iter(popen)
868 self.assertEqual(next(it), "line1\n")
869 self.assertEqual(next(it), "line2\n")
870 self.assertEqual(next(it), "line3\n")
871 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000872
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000873 # Verify environ keys and values from the OS are of the
874 # correct str type.
875 def test_keyvalue_types(self):
876 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000877 self.assertEqual(type(key), str)
878 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000879
Christian Heimes90333392007-11-01 19:08:42 +0000880 def test_items(self):
881 for key, value in self._reference().items():
882 self.assertEqual(os.environ.get(key), value)
883
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000884 # Issue 7310
885 def test___repr__(self):
886 """Check that the repr() of os.environ looks like environ({...})."""
887 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000888 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
889 '{!r}: {!r}'.format(key, value)
890 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000892 def test_get_exec_path(self):
893 defpath_list = os.defpath.split(os.pathsep)
894 test_path = ['/monty', '/python', '', '/flying/circus']
895 test_env = {'PATH': os.pathsep.join(test_path)}
896
897 saved_environ = os.environ
898 try:
899 os.environ = dict(test_env)
900 # Test that defaulting to os.environ works.
901 self.assertSequenceEqual(test_path, os.get_exec_path())
902 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
903 finally:
904 os.environ = saved_environ
905
906 # No PATH environment variable
907 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
908 # Empty PATH environment variable
909 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
910 # Supplied PATH environment variable
911 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
912
Victor Stinnerb745a742010-05-18 17:17:23 +0000913 if os.supports_bytes_environ:
914 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000915 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000916 # ignore BytesWarning warning
917 with warnings.catch_warnings(record=True):
918 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000919 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000920 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000921 pass
922 else:
923 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000924
925 # bytes key and/or value
926 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
927 ['abc'])
928 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
929 ['abc'])
930 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
931 ['abc'])
932
933 @unittest.skipUnless(os.supports_bytes_environ,
934 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000935 def test_environb(self):
936 # os.environ -> os.environb
937 value = 'euro\u20ac'
938 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000939 value_bytes = value.encode(sys.getfilesystemencoding(),
940 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000941 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000942 msg = "U+20AC character is not encodable to %s" % (
943 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000944 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['unicode'], value)
947 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000948
949 # os.environb -> os.environ
950 value = b'\xff'
951 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000953 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000955
Victor Stinner13ff2452018-01-22 18:32:50 +0100956 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100957 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100958 def test_unset_error(self):
959 if sys.platform == "win32":
960 # an environment variable is limited to 32,767 characters
961 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100962 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100963 else:
964 # "=" is not allowed in a variable name
965 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100966 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100967
Victor Stinner6d101392013-04-14 16:35:04 +0200968 def test_key_type(self):
969 missing = 'missingkey'
970 self.assertNotIn(missing, os.environ)
971
Victor Stinner839e5ea2013-04-14 16:43:03 +0200972 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200973 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200974 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200975 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200976
Victor Stinner839e5ea2013-04-14 16:43:03 +0200977 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200978 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200979 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200980 self.assertTrue(cm.exception.__suppress_context__)
981
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300982 def _test_environ_iteration(self, collection):
983 iterator = iter(collection)
984 new_key = "__new_key__"
985
986 next(iterator) # start iteration over os.environ.items
987
988 # add a new key in os.environ mapping
989 os.environ[new_key] = "test_environ_iteration"
990
991 try:
992 next(iterator) # force iteration over modified mapping
993 self.assertEqual(os.environ[new_key], "test_environ_iteration")
994 finally:
995 del os.environ[new_key]
996
997 def test_iter_error_when_changing_os_environ(self):
998 self._test_environ_iteration(os.environ)
999
1000 def test_iter_error_when_changing_os_environ_items(self):
1001 self._test_environ_iteration(os.environ.items())
1002
1003 def test_iter_error_when_changing_os_environ_values(self):
1004 self._test_environ_iteration(os.environ.values())
1005
Victor Stinner6d101392013-04-14 16:35:04 +02001006
Tim Petersc4e09402003-04-25 07:11:48 +00001007class WalkTests(unittest.TestCase):
1008 """Tests for os.walk()."""
1009
Victor Stinner0561c532015-03-12 10:28:24 +01001010 # Wrapper to hide minor differences between os.walk and os.fwalk
1011 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001012 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001013 if 'follow_symlinks' in kwargs:
1014 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001015 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001016
Charles-François Natali7372b062012-02-05 15:15:38 +01001017 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001018 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001019 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001020
1021 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001022 # TESTFN/
1023 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001024 # tmp1
1025 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001026 # tmp2
1027 # SUB11/ no kids
1028 # SUB2/ a file kid and a dirsymlink kid
1029 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001030 # SUB21/ not readable
1031 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001032 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001033 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001034 # broken_link2
1035 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001036 # TEST2/
1037 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001038 self.walk_path = join(support.TESTFN, "TEST1")
1039 self.sub1_path = join(self.walk_path, "SUB1")
1040 self.sub11_path = join(self.sub1_path, "SUB11")
1041 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001042 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001043 tmp1_path = join(self.walk_path, "tmp1")
1044 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001045 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001046 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001047 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001048 t2_path = join(support.TESTFN, "TEST2")
1049 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001050 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001051 broken_link2_path = join(sub2_path, "broken_link2")
1052 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001053
1054 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001055 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001056 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001057 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001058 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001059
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001060 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001061 with open(path, "x") as f:
1062 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001063
Victor Stinner0561c532015-03-12 10:28:24 +01001064 if support.can_symlink():
1065 os.symlink(os.path.abspath(t2_path), self.link_path)
1066 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001067 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1068 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001069 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001070 ["broken_link", "broken_link2", "broken_link3",
1071 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001072 else:
pxinwr3e028b22019-02-15 13:04:47 +08001073 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001074
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001075 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001076 try:
1077 os.listdir(sub21_path)
1078 except PermissionError:
1079 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1080 else:
1081 os.chmod(sub21_path, stat.S_IRWXU)
1082 os.unlink(tmp5_path)
1083 os.rmdir(sub21_path)
1084 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001085
Victor Stinner0561c532015-03-12 10:28:24 +01001086 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001087 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001088 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001089
Tim Petersc4e09402003-04-25 07:11:48 +00001090 self.assertEqual(len(all), 4)
1091 # We can't know which order SUB1 and SUB2 will appear in.
1092 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1093 # flipped: TESTFN, SUB2, SUB1, SUB11
1094 flipped = all[0][1][0] != "SUB1"
1095 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001096 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001097 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001098 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1099 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1100 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1101 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001102
Brett Cannon3f9183b2016-08-26 14:44:48 -07001103 def test_walk_prune(self, walk_path=None):
1104 if walk_path is None:
1105 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001106 # Prune the search.
1107 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001108 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001109 all.append((root, dirs, files))
1110 # Don't descend into SUB1.
1111 if 'SUB1' in dirs:
1112 # Note that this also mutates the dirs we appended to all!
1113 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001114
Victor Stinner0561c532015-03-12 10:28:24 +01001115 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001116 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001117
1118 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001119 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001120 self.assertEqual(all[1], self.sub2_tree)
1121
Brett Cannon3f9183b2016-08-26 14:44:48 -07001122 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001123 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001124
Victor Stinner0561c532015-03-12 10:28:24 +01001125 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001126 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001127 all = list(self.walk(self.walk_path, topdown=False))
1128
Victor Stinner53b0a412016-03-26 01:12:36 +01001129 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001130 # We can't know which order SUB1 and SUB2 will appear in.
1131 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1132 # flipped: SUB2, SUB11, SUB1, TESTFN
1133 flipped = all[3][1][0] != "SUB1"
1134 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001135 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001136 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001137 self.assertEqual(all[3],
1138 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1139 self.assertEqual(all[flipped],
1140 (self.sub11_path, [], []))
1141 self.assertEqual(all[flipped + 1],
1142 (self.sub1_path, ["SUB11"], ["tmp2"]))
1143 self.assertEqual(all[2 - 2 * flipped],
1144 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001145
Victor Stinner0561c532015-03-12 10:28:24 +01001146 def test_walk_symlink(self):
1147 if not support.can_symlink():
1148 self.skipTest("need symlink support")
1149
1150 # Walk, following symlinks.
1151 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1152 for root, dirs, files in walk_it:
1153 if root == self.link_path:
1154 self.assertEqual(dirs, [])
1155 self.assertEqual(files, ["tmp4"])
1156 break
1157 else:
1158 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001159
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001160 def test_walk_bad_dir(self):
1161 # Walk top-down.
1162 errors = []
1163 walk_it = self.walk(self.walk_path, onerror=errors.append)
1164 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001165 self.assertEqual(errors, [])
1166 dir1 = 'SUB1'
1167 path1 = os.path.join(root, dir1)
1168 path1new = os.path.join(root, dir1 + '.new')
1169 os.rename(path1, path1new)
1170 try:
1171 roots = [r for r, d, f in walk_it]
1172 self.assertTrue(errors)
1173 self.assertNotIn(path1, roots)
1174 self.assertNotIn(path1new, roots)
1175 for dir2 in dirs:
1176 if dir2 != dir1:
1177 self.assertIn(os.path.join(root, dir2), roots)
1178 finally:
1179 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001180
Miss Islington (bot)98a4a712019-09-12 08:07:47 -07001181 def test_walk_many_open_files(self):
1182 depth = 30
1183 base = os.path.join(support.TESTFN, 'deep')
1184 p = os.path.join(base, *(['d']*depth))
1185 os.makedirs(p)
1186
1187 iters = [self.walk(base, topdown=False) for j in range(100)]
1188 for i in range(depth + 1):
1189 expected = (p, ['d'] if i else [], [])
1190 for it in iters:
1191 self.assertEqual(next(it), expected)
1192 p = os.path.dirname(p)
1193
1194 iters = [self.walk(base, topdown=True) for j in range(100)]
1195 p = base
1196 for i in range(depth + 1):
1197 expected = (p, ['d'] if i < depth else [], [])
1198 for it in iters:
1199 self.assertEqual(next(it), expected)
1200 p = os.path.join(p, 'd')
1201
Charles-François Natali7372b062012-02-05 15:15:38 +01001202
1203@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1204class FwalkTests(WalkTests):
1205 """Tests for os.fwalk()."""
1206
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001207 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001208 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001209 yield (root, dirs, files)
1210
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001211 def fwalk(self, *args, **kwargs):
1212 return os.fwalk(*args, **kwargs)
1213
Larry Hastingsc48fe982012-06-25 04:49:05 -07001214 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1215 """
1216 compare with walk() results.
1217 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001218 walk_kwargs = walk_kwargs.copy()
1219 fwalk_kwargs = fwalk_kwargs.copy()
1220 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1221 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1222 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001223
Charles-François Natali7372b062012-02-05 15:15:38 +01001224 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001225 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001226 expected[root] = (set(dirs), set(files))
1227
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001228 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001229 self.assertIn(root, expected)
1230 self.assertEqual(expected[root], (set(dirs), set(files)))
1231
Larry Hastingsc48fe982012-06-25 04:49:05 -07001232 def test_compare_to_walk(self):
1233 kwargs = {'top': support.TESTFN}
1234 self._compare_to_walk(kwargs, kwargs)
1235
Charles-François Natali7372b062012-02-05 15:15:38 +01001236 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001237 try:
1238 fd = os.open(".", os.O_RDONLY)
1239 walk_kwargs = {'top': support.TESTFN}
1240 fwalk_kwargs = walk_kwargs.copy()
1241 fwalk_kwargs['dir_fd'] = fd
1242 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1243 finally:
1244 os.close(fd)
1245
1246 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001247 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001248 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1249 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001250 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001251 # check that the FD is valid
1252 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001253 # redundant check
1254 os.stat(rootfd)
1255 # check that listdir() returns consistent information
1256 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001257
1258 def test_fd_leak(self):
1259 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1260 # we both check that calling fwalk() a large number of times doesn't
1261 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1262 minfd = os.dup(1)
1263 os.close(minfd)
1264 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001265 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001266 pass
1267 newfd = os.dup(1)
1268 self.addCleanup(os.close, newfd)
1269 self.assertEqual(newfd, minfd)
1270
Miss Islington (bot)98a4a712019-09-12 08:07:47 -07001271 # fwalk() keeps file descriptors open
1272 test_walk_many_open_files = None
1273
1274
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001275class BytesWalkTests(WalkTests):
1276 """Tests for os.walk() with bytes."""
1277 def walk(self, top, **kwargs):
1278 if 'follow_symlinks' in kwargs:
1279 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1280 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1281 root = os.fsdecode(broot)
1282 dirs = list(map(os.fsdecode, bdirs))
1283 files = list(map(os.fsdecode, bfiles))
1284 yield (root, dirs, files)
1285 bdirs[:] = list(map(os.fsencode, dirs))
1286 bfiles[:] = list(map(os.fsencode, files))
1287
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001288@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1289class BytesFwalkTests(FwalkTests):
1290 """Tests for os.walk() with bytes."""
1291 def fwalk(self, top='.', *args, **kwargs):
1292 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1293 root = os.fsdecode(broot)
1294 dirs = list(map(os.fsdecode, bdirs))
1295 files = list(map(os.fsdecode, bfiles))
1296 yield (root, dirs, files, topfd)
1297 bdirs[:] = list(map(os.fsencode, dirs))
1298 bfiles[:] = list(map(os.fsencode, files))
1299
Charles-François Natali7372b062012-02-05 15:15:38 +01001300
Guido van Rossume7ba4952007-06-06 23:52:48 +00001301class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001302 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001303 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001304
1305 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001306 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001307 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1308 os.makedirs(path) # Should work
1309 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1310 os.makedirs(path)
1311
1312 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001313 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001314 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1315 os.makedirs(path)
1316 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1317 'dir5', 'dir6')
1318 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001319
Serhiy Storchakae304e332017-03-24 13:27:42 +02001320 def test_mode(self):
1321 with support.temp_umask(0o002):
1322 base = support.TESTFN
1323 parent = os.path.join(base, 'dir1')
1324 path = os.path.join(parent, 'dir2')
1325 os.makedirs(path, 0o555)
1326 self.assertTrue(os.path.exists(path))
1327 self.assertTrue(os.path.isdir(path))
1328 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001329 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1330 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001331
Terry Reedy5a22b652010-12-02 07:05:56 +00001332 def test_exist_ok_existing_directory(self):
1333 path = os.path.join(support.TESTFN, 'dir1')
1334 mode = 0o777
1335 old_mask = os.umask(0o022)
1336 os.makedirs(path, mode)
1337 self.assertRaises(OSError, os.makedirs, path, mode)
1338 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001339 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001340 os.makedirs(path, mode=mode, exist_ok=True)
1341 os.umask(old_mask)
1342
Martin Pantera82642f2015-11-19 04:48:44 +00001343 # Issue #25583: A drive root could raise PermissionError on Windows
1344 os.makedirs(os.path.abspath('/'), exist_ok=True)
1345
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001346 def test_exist_ok_s_isgid_directory(self):
1347 path = os.path.join(support.TESTFN, 'dir1')
1348 S_ISGID = stat.S_ISGID
1349 mode = 0o777
1350 old_mask = os.umask(0o022)
1351 try:
1352 existing_testfn_mode = stat.S_IMODE(
1353 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001354 try:
1355 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001356 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001357 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001358 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1359 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1360 # The os should apply S_ISGID from the parent dir for us, but
1361 # this test need not depend on that behavior. Be explicit.
1362 os.makedirs(path, mode | S_ISGID)
1363 # http://bugs.python.org/issue14992
1364 # Should not fail when the bit is already set.
1365 os.makedirs(path, mode, exist_ok=True)
1366 # remove the bit.
1367 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001368 # May work even when the bit is not already set when demanded.
1369 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001370 finally:
1371 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001372
1373 def test_exist_ok_existing_regular_file(self):
1374 base = support.TESTFN
1375 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001376 with open(path, 'w') as f:
1377 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001378 self.assertRaises(OSError, os.makedirs, path)
1379 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1380 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1381 os.remove(path)
1382
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001383 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001384 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001385 'dir4', 'dir5', 'dir6')
1386 # If the tests failed, the bottom-most directory ('../dir6')
1387 # may not have been created, so we look for the outermost directory
1388 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001389 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001390 path = os.path.dirname(path)
1391
1392 os.removedirs(path)
1393
Andrew Svetlov405faed2012-12-25 12:18:09 +02001394
R David Murrayf2ad1732014-12-25 18:36:56 -05001395@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1396class ChownFileTests(unittest.TestCase):
1397
Berker Peksag036a71b2015-07-21 09:29:48 +03001398 @classmethod
1399 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001400 os.mkdir(support.TESTFN)
1401
1402 def test_chown_uid_gid_arguments_must_be_index(self):
1403 stat = os.stat(support.TESTFN)
1404 uid = stat.st_uid
1405 gid = stat.st_gid
1406 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1407 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1408 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1409 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1410 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1411
Miss Islington (bot)12d174b2019-06-25 08:26:56 -07001412 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1413 def test_chown_gid(self):
1414 groups = os.getgroups()
1415 if len(groups) < 2:
1416 self.skipTest("test needs at least 2 groups")
1417
R David Murrayf2ad1732014-12-25 18:36:56 -05001418 gid_1, gid_2 = groups[:2]
1419 uid = os.stat(support.TESTFN).st_uid
Miss Islington (bot)12d174b2019-06-25 08:26:56 -07001420
R David Murrayf2ad1732014-12-25 18:36:56 -05001421 os.chown(support.TESTFN, uid, gid_1)
1422 gid = os.stat(support.TESTFN).st_gid
1423 self.assertEqual(gid, gid_1)
Miss Islington (bot)12d174b2019-06-25 08:26:56 -07001424
R David Murrayf2ad1732014-12-25 18:36:56 -05001425 os.chown(support.TESTFN, uid, gid_2)
1426 gid = os.stat(support.TESTFN).st_gid
1427 self.assertEqual(gid, gid_2)
1428
1429 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1430 "test needs root privilege and more than one user")
1431 def test_chown_with_root(self):
1432 uid_1, uid_2 = all_users[:2]
1433 gid = os.stat(support.TESTFN).st_gid
1434 os.chown(support.TESTFN, uid_1, gid)
1435 uid = os.stat(support.TESTFN).st_uid
1436 self.assertEqual(uid, uid_1)
1437 os.chown(support.TESTFN, uid_2, gid)
1438 uid = os.stat(support.TESTFN).st_uid
1439 self.assertEqual(uid, uid_2)
1440
1441 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1442 "test needs non-root account and more than one user")
1443 def test_chown_without_permission(self):
1444 uid_1, uid_2 = all_users[:2]
1445 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001446 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001447 os.chown(support.TESTFN, uid_1, gid)
1448 os.chown(support.TESTFN, uid_2, gid)
1449
Berker Peksag036a71b2015-07-21 09:29:48 +03001450 @classmethod
1451 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001452 os.rmdir(support.TESTFN)
1453
1454
Andrew Svetlov405faed2012-12-25 12:18:09 +02001455class RemoveDirsTests(unittest.TestCase):
1456 def setUp(self):
1457 os.makedirs(support.TESTFN)
1458
1459 def tearDown(self):
1460 support.rmtree(support.TESTFN)
1461
1462 def test_remove_all(self):
1463 dira = os.path.join(support.TESTFN, 'dira')
1464 os.mkdir(dira)
1465 dirb = os.path.join(dira, 'dirb')
1466 os.mkdir(dirb)
1467 os.removedirs(dirb)
1468 self.assertFalse(os.path.exists(dirb))
1469 self.assertFalse(os.path.exists(dira))
1470 self.assertFalse(os.path.exists(support.TESTFN))
1471
1472 def test_remove_partial(self):
1473 dira = os.path.join(support.TESTFN, 'dira')
1474 os.mkdir(dira)
1475 dirb = os.path.join(dira, 'dirb')
1476 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001477 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001478 os.removedirs(dirb)
1479 self.assertFalse(os.path.exists(dirb))
1480 self.assertTrue(os.path.exists(dira))
1481 self.assertTrue(os.path.exists(support.TESTFN))
1482
1483 def test_remove_nothing(self):
1484 dira = os.path.join(support.TESTFN, 'dira')
1485 os.mkdir(dira)
1486 dirb = os.path.join(dira, 'dirb')
1487 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001488 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001489 with self.assertRaises(OSError):
1490 os.removedirs(dirb)
1491 self.assertTrue(os.path.exists(dirb))
1492 self.assertTrue(os.path.exists(dira))
1493 self.assertTrue(os.path.exists(support.TESTFN))
1494
1495
Guido van Rossume7ba4952007-06-06 23:52:48 +00001496class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001497 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001498 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001499 f.write(b'hello')
1500 f.close()
1501 with open(os.devnull, 'rb') as f:
1502 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001503
Andrew Svetlov405faed2012-12-25 12:18:09 +02001504
Guido van Rossume7ba4952007-06-06 23:52:48 +00001505class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001506 def test_urandom_length(self):
1507 self.assertEqual(len(os.urandom(0)), 0)
1508 self.assertEqual(len(os.urandom(1)), 1)
1509 self.assertEqual(len(os.urandom(10)), 10)
1510 self.assertEqual(len(os.urandom(100)), 100)
1511 self.assertEqual(len(os.urandom(1000)), 1000)
1512
1513 def test_urandom_value(self):
1514 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001515 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001516 data2 = os.urandom(16)
1517 self.assertNotEqual(data1, data2)
1518
1519 def get_urandom_subprocess(self, count):
1520 code = '\n'.join((
1521 'import os, sys',
1522 'data = os.urandom(%s)' % count,
1523 'sys.stdout.buffer.write(data)',
1524 'sys.stdout.buffer.flush()'))
1525 out = assert_python_ok('-c', code)
1526 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001527 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001528 return stdout
1529
1530 def test_urandom_subprocess(self):
1531 data1 = self.get_urandom_subprocess(16)
1532 data2 = self.get_urandom_subprocess(16)
1533 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001534
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001535
Victor Stinner9b1f4742016-09-06 16:18:52 -07001536@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1537class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001538 @classmethod
1539 def setUpClass(cls):
1540 try:
1541 os.getrandom(1)
1542 except OSError as exc:
1543 if exc.errno == errno.ENOSYS:
1544 # Python compiled on a more recent Linux version
1545 # than the current Linux kernel
1546 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1547 else:
1548 raise
1549
Victor Stinner9b1f4742016-09-06 16:18:52 -07001550 def test_getrandom_type(self):
1551 data = os.getrandom(16)
1552 self.assertIsInstance(data, bytes)
1553 self.assertEqual(len(data), 16)
1554
1555 def test_getrandom0(self):
1556 empty = os.getrandom(0)
1557 self.assertEqual(empty, b'')
1558
1559 def test_getrandom_random(self):
1560 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1561
1562 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1563 # resource /dev/random
1564
1565 def test_getrandom_nonblock(self):
1566 # The call must not fail. Check also that the flag exists
1567 try:
1568 os.getrandom(1, os.GRND_NONBLOCK)
1569 except BlockingIOError:
1570 # System urandom is not initialized yet
1571 pass
1572
1573 def test_getrandom_value(self):
1574 data1 = os.getrandom(16)
1575 data2 = os.getrandom(16)
1576 self.assertNotEqual(data1, data2)
1577
1578
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001579# os.urandom() doesn't use a file descriptor when it is implemented with the
1580# getentropy() function, the getrandom() function or the getrandom() syscall
1581OS_URANDOM_DONT_USE_FD = (
1582 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1583 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1584 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001585
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001586@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1587 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001588@unittest.skipIf(sys.platform == "vxworks",
1589 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001590class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001591 @unittest.skipUnless(resource, "test requires the resource module")
1592 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001593 # Check urandom() failing when it is not able to open /dev/random.
1594 # We spawn a new process to make the test more robust (if getrlimit()
1595 # failed to restore the file descriptor limit after this, the whole
1596 # test suite would crash; this actually happened on the OS X Tiger
1597 # buildbot).
1598 code = """if 1:
1599 import errno
1600 import os
1601 import resource
1602
1603 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1604 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1605 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001606 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001607 except OSError as e:
1608 assert e.errno == errno.EMFILE, e.errno
1609 else:
1610 raise AssertionError("OSError not raised")
1611 """
1612 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001613
Antoine Pitroue472aea2014-04-26 14:33:03 +02001614 def test_urandom_fd_closed(self):
1615 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1616 # closed.
1617 code = """if 1:
1618 import os
1619 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001620 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001621 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001622 with test.support.SuppressCrashReport():
1623 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001624 sys.stdout.buffer.write(os.urandom(4))
1625 """
1626 rc, out, err = assert_python_ok('-Sc', code)
1627
1628 def test_urandom_fd_reopened(self):
1629 # Issue #21207: urandom() should detect its fd to /dev/urandom
1630 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001631 self.addCleanup(support.unlink, support.TESTFN)
1632 create_file(support.TESTFN, b"x" * 256)
1633
Antoine Pitroue472aea2014-04-26 14:33:03 +02001634 code = """if 1:
1635 import os
1636 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001637 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001638 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001639 with test.support.SuppressCrashReport():
1640 for fd in range(3, 256):
1641 try:
1642 os.close(fd)
1643 except OSError:
1644 pass
1645 else:
1646 # Found the urandom fd (XXX hopefully)
1647 break
1648 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001649 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001650 new_fd = f.fileno()
1651 # Issue #26935: posix allows new_fd and fd to be equal but
1652 # some libc implementations have dup2 return an error in this
1653 # case.
1654 if new_fd != fd:
1655 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001656 sys.stdout.buffer.write(os.urandom(4))
1657 sys.stdout.buffer.write(os.urandom(4))
1658 """.format(TESTFN=support.TESTFN)
1659 rc, out, err = assert_python_ok('-Sc', code)
1660 self.assertEqual(len(out), 8)
1661 self.assertNotEqual(out[0:4], out[4:8])
1662 rc, out2, err2 = assert_python_ok('-Sc', code)
1663 self.assertEqual(len(out2), 8)
1664 self.assertNotEqual(out2, out)
1665
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001666
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001667@contextlib.contextmanager
1668def _execvpe_mockup(defpath=None):
1669 """
1670 Stubs out execv and execve functions when used as context manager.
1671 Records exec calls. The mock execv and execve functions always raise an
1672 exception as they would normally never return.
1673 """
1674 # A list of tuples containing (function name, first arg, args)
1675 # of calls to execv or execve that have been made.
1676 calls = []
1677
1678 def mock_execv(name, *args):
1679 calls.append(('execv', name, args))
1680 raise RuntimeError("execv called")
1681
1682 def mock_execve(name, *args):
1683 calls.append(('execve', name, args))
1684 raise OSError(errno.ENOTDIR, "execve called")
1685
1686 try:
1687 orig_execv = os.execv
1688 orig_execve = os.execve
1689 orig_defpath = os.defpath
1690 os.execv = mock_execv
1691 os.execve = mock_execve
1692 if defpath is not None:
1693 os.defpath = defpath
1694 yield calls
1695 finally:
1696 os.execv = orig_execv
1697 os.execve = orig_execve
1698 os.defpath = orig_defpath
1699
pxinwrf2d7ac72019-05-21 18:46:37 +08001700@unittest.skipUnless(hasattr(os, 'execv'),
1701 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001702class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001703 @unittest.skipIf(USING_LINUXTHREADS,
1704 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001705 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001706 self.assertRaises(OSError, os.execvpe, 'no such app-',
1707 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001708
Steve Dowerbce26262016-11-19 19:17:26 -08001709 def test_execv_with_bad_arglist(self):
1710 self.assertRaises(ValueError, os.execv, 'notepad', ())
1711 self.assertRaises(ValueError, os.execv, 'notepad', [])
1712 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1713 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1714
Thomas Heller6790d602007-08-30 17:15:14 +00001715 def test_execvpe_with_bad_arglist(self):
1716 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001717 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1718 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001719
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001720 @unittest.skipUnless(hasattr(os, '_execvpe'),
1721 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001722 def _test_internal_execvpe(self, test_type):
1723 program_path = os.sep + 'absolutepath'
1724 if test_type is bytes:
1725 program = b'executable'
1726 fullpath = os.path.join(os.fsencode(program_path), program)
1727 native_fullpath = fullpath
1728 arguments = [b'progname', 'arg1', 'arg2']
1729 else:
1730 program = 'executable'
1731 arguments = ['progname', 'arg1', 'arg2']
1732 fullpath = os.path.join(program_path, program)
1733 if os.name != "nt":
1734 native_fullpath = os.fsencode(fullpath)
1735 else:
1736 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001737 env = {'spam': 'beans'}
1738
Victor Stinnerb745a742010-05-18 17:17:23 +00001739 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001740 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001741 self.assertRaises(RuntimeError,
1742 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001743 self.assertEqual(len(calls), 1)
1744 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1745
Victor Stinnerb745a742010-05-18 17:17:23 +00001746 # test os._execvpe() with a relative path:
1747 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001748 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001749 self.assertRaises(OSError,
1750 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001751 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001752 self.assertSequenceEqual(calls[0],
1753 ('execve', native_fullpath, (arguments, env)))
1754
1755 # test os._execvpe() with a relative path:
1756 # os.get_exec_path() reads the 'PATH' variable
1757 with _execvpe_mockup() as calls:
1758 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001759 if test_type is bytes:
1760 env_path[b'PATH'] = program_path
1761 else:
1762 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001763 self.assertRaises(OSError,
1764 os._execvpe, program, arguments, env=env_path)
1765 self.assertEqual(len(calls), 1)
1766 self.assertSequenceEqual(calls[0],
1767 ('execve', native_fullpath, (arguments, env_path)))
1768
1769 def test_internal_execvpe_str(self):
1770 self._test_internal_execvpe(str)
1771 if os.name != "nt":
1772 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001773
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001774 def test_execve_invalid_env(self):
1775 args = [sys.executable, '-c', 'pass']
1776
Ville Skyttä49b27342017-08-03 09:00:59 +03001777 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001778 newenv = os.environ.copy()
1779 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1780 with self.assertRaises(ValueError):
1781 os.execve(args[0], args, newenv)
1782
Ville Skyttä49b27342017-08-03 09:00:59 +03001783 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001784 newenv = os.environ.copy()
1785 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1786 with self.assertRaises(ValueError):
1787 os.execve(args[0], args, newenv)
1788
Ville Skyttä49b27342017-08-03 09:00:59 +03001789 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001790 newenv = os.environ.copy()
1791 newenv["FRUIT=ORANGE"] = "lemon"
1792 with self.assertRaises(ValueError):
1793 os.execve(args[0], args, newenv)
1794
Alexey Izbyshev83460312018-10-20 03:28:22 +03001795 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1796 def test_execve_with_empty_path(self):
1797 # bpo-32890: Check GetLastError() misuse
1798 try:
1799 os.execve('', ['arg'], {})
1800 except OSError as e:
1801 self.assertTrue(e.winerror is None or e.winerror != 0)
1802 else:
1803 self.fail('No OSError raised')
1804
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001805
Serhiy Storchaka43767632013-11-03 21:31:38 +02001806@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001807class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001808 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001809 try:
1810 os.stat(support.TESTFN)
1811 except FileNotFoundError:
1812 exists = False
1813 except OSError as exc:
1814 exists = True
1815 self.fail("file %s must not exist; os.stat failed with %s"
1816 % (support.TESTFN, exc))
1817 else:
1818 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001819
Thomas Wouters477c8d52006-05-27 19:21:47 +00001820 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001821 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001822
1823 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001824 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001825
1826 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001827 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001828
1829 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001830 self.addCleanup(support.unlink, support.TESTFN)
1831
Victor Stinnere77c9742016-03-25 10:28:23 +01001832 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001833 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001834
1835 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001836 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001837
Thomas Wouters477c8d52006-05-27 19:21:47 +00001838 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001839 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001840
Victor Stinnere77c9742016-03-25 10:28:23 +01001841
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001842class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001843 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001844 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1845 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001846 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001847 def get_single(f):
1848 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001849 if hasattr(os, f):
1850 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001851 return helper
1852 for f in singles:
1853 locals()["test_"+f] = get_single(f)
1854
Benjamin Peterson7522c742009-01-19 21:00:09 +00001855 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001856 try:
1857 f(support.make_bad_fd(), *args)
1858 except OSError as e:
1859 self.assertEqual(e.errno, errno.EBADF)
1860 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001861 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001862 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001863
Serhiy Storchaka43767632013-11-03 21:31:38 +02001864 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001865 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001866 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001867
Serhiy Storchaka43767632013-11-03 21:31:38 +02001868 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001869 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001870 fd = support.make_bad_fd()
1871 # Make sure none of the descriptors we are about to close are
1872 # currently valid (issue 6542).
1873 for i in range(10):
1874 try: os.fstat(fd+i)
1875 except OSError:
1876 pass
1877 else:
1878 break
1879 if i < 2:
1880 raise unittest.SkipTest(
1881 "Unable to acquire a range of invalid file descriptors")
1882 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001883
Serhiy Storchaka43767632013-11-03 21:31:38 +02001884 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001885 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001886 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001887
Serhiy Storchaka43767632013-11-03 21:31:38 +02001888 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001889 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001890 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001891
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001893 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001894 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001895
Serhiy Storchaka43767632013-11-03 21:31:38 +02001896 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001897 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001898 self.check(os.pathconf, "PC_NAME_MAX")
1899 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001900
Serhiy Storchaka43767632013-11-03 21:31:38 +02001901 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001902 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001903 self.check(os.truncate, 0)
1904 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001905
Serhiy Storchaka43767632013-11-03 21:31:38 +02001906 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001907 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001908 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001909
Serhiy Storchaka43767632013-11-03 21:31:38 +02001910 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001911 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001912 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001913
Victor Stinner57ddf782014-01-08 15:21:28 +01001914 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1915 def test_readv(self):
1916 buf = bytearray(10)
1917 self.check(os.readv, [buf])
1918
Serhiy Storchaka43767632013-11-03 21:31:38 +02001919 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001920 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001921 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001922
Serhiy Storchaka43767632013-11-03 21:31:38 +02001923 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001924 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001925 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001926
Victor Stinner57ddf782014-01-08 15:21:28 +01001927 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1928 def test_writev(self):
1929 self.check(os.writev, [b'abc'])
1930
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001931 def test_inheritable(self):
1932 self.check(os.get_inheritable)
1933 self.check(os.set_inheritable, True)
1934
1935 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1936 'needs os.get_blocking() and os.set_blocking()')
1937 def test_blocking(self):
1938 self.check(os.get_blocking)
1939 self.check(os.set_blocking, True)
1940
Brian Curtin1b9df392010-11-24 20:24:31 +00001941
1942class LinkTests(unittest.TestCase):
1943 def setUp(self):
1944 self.file1 = support.TESTFN
1945 self.file2 = os.path.join(support.TESTFN + "2")
1946
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001947 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001948 for file in (self.file1, self.file2):
1949 if os.path.exists(file):
1950 os.unlink(file)
1951
Brian Curtin1b9df392010-11-24 20:24:31 +00001952 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001953 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001954
xdegaye6a55d092017-11-12 17:57:04 +01001955 try:
1956 os.link(file1, file2)
1957 except PermissionError as e:
1958 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001959 with open(file1, "r") as f1, open(file2, "r") as f2:
1960 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1961
1962 def test_link(self):
1963 self._test_link(self.file1, self.file2)
1964
1965 def test_link_bytes(self):
1966 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1967 bytes(self.file2, sys.getfilesystemencoding()))
1968
Brian Curtinf498b752010-11-30 15:54:04 +00001969 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001970 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001971 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001972 except UnicodeError:
1973 raise unittest.SkipTest("Unable to encode for this platform.")
1974
Brian Curtinf498b752010-11-30 15:54:04 +00001975 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001976 self.file2 = self.file1 + "2"
1977 self._test_link(self.file1, self.file2)
1978
Serhiy Storchaka43767632013-11-03 21:31:38 +02001979@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1980class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001981 # uid_t and gid_t are 32-bit unsigned integers on Linux
1982 UID_OVERFLOW = (1 << 32)
1983 GID_OVERFLOW = (1 << 32)
1984
Serhiy Storchaka43767632013-11-03 21:31:38 +02001985 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1986 def test_setuid(self):
1987 if os.getuid() != 0:
1988 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001989 self.assertRaises(TypeError, os.setuid, 'not an int')
1990 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001991
Serhiy Storchaka43767632013-11-03 21:31:38 +02001992 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1993 def test_setgid(self):
1994 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1995 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001996 self.assertRaises(TypeError, os.setgid, 'not an int')
1997 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001998
Serhiy Storchaka43767632013-11-03 21:31:38 +02001999 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2000 def test_seteuid(self):
2001 if os.getuid() != 0:
2002 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002003 self.assertRaises(TypeError, os.setegid, 'not an int')
2004 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2007 def test_setegid(self):
2008 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2009 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002010 self.assertRaises(TypeError, os.setegid, 'not an int')
2011 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002012
Serhiy Storchaka43767632013-11-03 21:31:38 +02002013 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2014 def test_setreuid(self):
2015 if os.getuid() != 0:
2016 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002017 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2018 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2019 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2020 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002021
Serhiy Storchaka43767632013-11-03 21:31:38 +02002022 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2023 def test_setreuid_neg1(self):
2024 # Needs to accept -1. We run this in a subprocess to avoid
2025 # altering the test runner's process state (issue8045).
2026 subprocess.check_call([
2027 sys.executable, '-c',
2028 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002029
Serhiy Storchaka43767632013-11-03 21:31:38 +02002030 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2031 def test_setregid(self):
2032 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2033 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002034 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2035 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2036 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2037 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002038
Serhiy Storchaka43767632013-11-03 21:31:38 +02002039 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2040 def test_setregid_neg1(self):
2041 # Needs to accept -1. We run this in a subprocess to avoid
2042 # altering the test runner's process state (issue8045).
2043 subprocess.check_call([
2044 sys.executable, '-c',
2045 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002046
Serhiy Storchaka43767632013-11-03 21:31:38 +02002047@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2048class Pep383Tests(unittest.TestCase):
2049 def setUp(self):
2050 if support.TESTFN_UNENCODABLE:
2051 self.dir = support.TESTFN_UNENCODABLE
2052 elif support.TESTFN_NONASCII:
2053 self.dir = support.TESTFN_NONASCII
2054 else:
2055 self.dir = support.TESTFN
2056 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002057
Serhiy Storchaka43767632013-11-03 21:31:38 +02002058 bytesfn = []
2059 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002060 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002061 fn = os.fsencode(fn)
2062 except UnicodeEncodeError:
2063 return
2064 bytesfn.append(fn)
2065 add_filename(support.TESTFN_UNICODE)
2066 if support.TESTFN_UNENCODABLE:
2067 add_filename(support.TESTFN_UNENCODABLE)
2068 if support.TESTFN_NONASCII:
2069 add_filename(support.TESTFN_NONASCII)
2070 if not bytesfn:
2071 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002072
Serhiy Storchaka43767632013-11-03 21:31:38 +02002073 self.unicodefn = set()
2074 os.mkdir(self.dir)
2075 try:
2076 for fn in bytesfn:
2077 support.create_empty_file(os.path.join(self.bdir, fn))
2078 fn = os.fsdecode(fn)
2079 if fn in self.unicodefn:
2080 raise ValueError("duplicate filename")
2081 self.unicodefn.add(fn)
2082 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002083 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002084 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002085
Serhiy Storchaka43767632013-11-03 21:31:38 +02002086 def tearDown(self):
2087 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002088
Serhiy Storchaka43767632013-11-03 21:31:38 +02002089 def test_listdir(self):
2090 expected = self.unicodefn
2091 found = set(os.listdir(self.dir))
2092 self.assertEqual(found, expected)
2093 # test listdir without arguments
2094 current_directory = os.getcwd()
2095 try:
2096 os.chdir(os.sep)
2097 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2098 finally:
2099 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002100
Serhiy Storchaka43767632013-11-03 21:31:38 +02002101 def test_open(self):
2102 for fn in self.unicodefn:
2103 f = open(os.path.join(self.dir, fn), 'rb')
2104 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002105
Serhiy Storchaka43767632013-11-03 21:31:38 +02002106 @unittest.skipUnless(hasattr(os, 'statvfs'),
2107 "need os.statvfs()")
2108 def test_statvfs(self):
2109 # issue #9645
2110 for fn in self.unicodefn:
2111 # should not fail with file not found error
2112 fullname = os.path.join(self.dir, fn)
2113 os.statvfs(fullname)
2114
2115 def test_stat(self):
2116 for fn in self.unicodefn:
2117 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002118
Brian Curtineb24d742010-04-12 17:16:38 +00002119@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2120class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002121 def _kill(self, sig):
2122 # Start sys.executable as a subprocess and communicate from the
2123 # subprocess to the parent that the interpreter is ready. When it
2124 # becomes ready, send *sig* via os.kill to the subprocess and check
2125 # that the return code is equal to *sig*.
2126 import ctypes
2127 from ctypes import wintypes
2128 import msvcrt
2129
2130 # Since we can't access the contents of the process' stdout until the
2131 # process has exited, use PeekNamedPipe to see what's inside stdout
2132 # without waiting. This is done so we can tell that the interpreter
2133 # is started and running at a point where it could handle a signal.
2134 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2135 PeekNamedPipe.restype = wintypes.BOOL
2136 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2137 ctypes.POINTER(ctypes.c_char), # stdout buf
2138 wintypes.DWORD, # Buffer size
2139 ctypes.POINTER(wintypes.DWORD), # bytes read
2140 ctypes.POINTER(wintypes.DWORD), # bytes avail
2141 ctypes.POINTER(wintypes.DWORD)) # bytes left
2142 msg = "running"
2143 proc = subprocess.Popen([sys.executable, "-c",
2144 "import sys;"
2145 "sys.stdout.write('{}');"
2146 "sys.stdout.flush();"
2147 "input()".format(msg)],
2148 stdout=subprocess.PIPE,
2149 stderr=subprocess.PIPE,
2150 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002151 self.addCleanup(proc.stdout.close)
2152 self.addCleanup(proc.stderr.close)
2153 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002154
2155 count, max = 0, 100
2156 while count < max and proc.poll() is None:
2157 # Create a string buffer to store the result of stdout from the pipe
2158 buf = ctypes.create_string_buffer(len(msg))
2159 # Obtain the text currently in proc.stdout
2160 # Bytes read/avail/left are left as NULL and unused
2161 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2162 buf, ctypes.sizeof(buf), None, None, None)
2163 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2164 if buf.value:
2165 self.assertEqual(msg, buf.value.decode())
2166 break
2167 time.sleep(0.1)
2168 count += 1
2169 else:
2170 self.fail("Did not receive communication from the subprocess")
2171
Brian Curtineb24d742010-04-12 17:16:38 +00002172 os.kill(proc.pid, sig)
2173 self.assertEqual(proc.wait(), sig)
2174
2175 def test_kill_sigterm(self):
2176 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002177 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002178
2179 def test_kill_int(self):
2180 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002181 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002182
2183 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002184 tagname = "test_os_%s" % uuid.uuid1()
2185 m = mmap.mmap(-1, 1, tagname)
2186 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002187 # Run a script which has console control handling enabled.
2188 proc = subprocess.Popen([sys.executable,
2189 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002190 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002191 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2192 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002193 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002194 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002195 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002196 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002197 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002198 count += 1
2199 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002200 # Forcefully kill the process if we weren't able to signal it.
2201 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002202 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002203 os.kill(proc.pid, event)
2204 # proc.send_signal(event) could also be done here.
2205 # Allow time for the signal to be passed and the process to exit.
2206 time.sleep(0.5)
2207 if not proc.poll():
2208 # Forcefully kill the process if we weren't able to signal it.
2209 os.kill(proc.pid, signal.SIGINT)
2210 self.fail("subprocess did not stop on {}".format(name))
2211
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002212 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002213 def test_CTRL_C_EVENT(self):
2214 from ctypes import wintypes
2215 import ctypes
2216
2217 # Make a NULL value by creating a pointer with no argument.
2218 NULL = ctypes.POINTER(ctypes.c_int)()
2219 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2220 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2221 wintypes.BOOL)
2222 SetConsoleCtrlHandler.restype = wintypes.BOOL
2223
2224 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002225 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002226 # by subprocesses.
2227 SetConsoleCtrlHandler(NULL, 0)
2228
2229 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2230
2231 def test_CTRL_BREAK_EVENT(self):
2232 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2233
2234
Brian Curtind40e6f72010-07-08 21:39:08 +00002235@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002236class Win32ListdirTests(unittest.TestCase):
2237 """Test listdir on Windows."""
2238
2239 def setUp(self):
2240 self.created_paths = []
2241 for i in range(2):
2242 dir_name = 'SUB%d' % i
2243 dir_path = os.path.join(support.TESTFN, dir_name)
2244 file_name = 'FILE%d' % i
2245 file_path = os.path.join(support.TESTFN, file_name)
2246 os.makedirs(dir_path)
2247 with open(file_path, 'w') as f:
2248 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2249 self.created_paths.extend([dir_name, file_name])
2250 self.created_paths.sort()
2251
2252 def tearDown(self):
2253 shutil.rmtree(support.TESTFN)
2254
2255 def test_listdir_no_extended_path(self):
2256 """Test when the path is not an "extended" path."""
2257 # unicode
2258 self.assertEqual(
2259 sorted(os.listdir(support.TESTFN)),
2260 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002261
Tim Golden781bbeb2013-10-25 20:24:06 +01002262 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002263 self.assertEqual(
2264 sorted(os.listdir(os.fsencode(support.TESTFN))),
2265 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002266
2267 def test_listdir_extended_path(self):
2268 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002269 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002270 # unicode
2271 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2272 self.assertEqual(
2273 sorted(os.listdir(path)),
2274 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002275
Tim Golden781bbeb2013-10-25 20:24:06 +01002276 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002277 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2278 self.assertEqual(
2279 sorted(os.listdir(path)),
2280 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002281
2282
Berker Peksage0b5b202018-08-15 13:03:41 +03002283@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2284class ReadlinkTests(unittest.TestCase):
2285 filelink = 'readlinktest'
2286 filelink_target = os.path.abspath(__file__)
2287 filelinkb = os.fsencode(filelink)
2288 filelinkb_target = os.fsencode(filelink_target)
2289
Steve Dower9eb3d542019-08-21 15:52:42 -07002290 def assertPathEqual(self, left, right):
2291 left = os.path.normcase(left)
2292 right = os.path.normcase(right)
2293 if sys.platform == 'win32':
2294 # Bad practice to blindly strip the prefix as it may be required to
2295 # correctly refer to the file, but we're only comparing paths here.
2296 has_prefix = lambda p: p.startswith(
2297 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2298 if has_prefix(left):
2299 left = left[4:]
2300 if has_prefix(right):
2301 right = right[4:]
2302 self.assertEqual(left, right)
2303
Berker Peksage0b5b202018-08-15 13:03:41 +03002304 def setUp(self):
2305 self.assertTrue(os.path.exists(self.filelink_target))
2306 self.assertTrue(os.path.exists(self.filelinkb_target))
2307 self.assertFalse(os.path.exists(self.filelink))
2308 self.assertFalse(os.path.exists(self.filelinkb))
2309
2310 def test_not_symlink(self):
2311 filelink_target = FakePath(self.filelink_target)
2312 self.assertRaises(OSError, os.readlink, self.filelink_target)
2313 self.assertRaises(OSError, os.readlink, filelink_target)
2314
2315 def test_missing_link(self):
2316 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2317 self.assertRaises(FileNotFoundError, os.readlink,
2318 FakePath('missing-link'))
2319
2320 @support.skip_unless_symlink
2321 def test_pathlike(self):
2322 os.symlink(self.filelink_target, self.filelink)
2323 self.addCleanup(support.unlink, self.filelink)
2324 filelink = FakePath(self.filelink)
Steve Dower9eb3d542019-08-21 15:52:42 -07002325 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002326
2327 @support.skip_unless_symlink
2328 def test_pathlike_bytes(self):
2329 os.symlink(self.filelinkb_target, self.filelinkb)
2330 self.addCleanup(support.unlink, self.filelinkb)
2331 path = os.readlink(FakePath(self.filelinkb))
Steve Dower9eb3d542019-08-21 15:52:42 -07002332 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002333 self.assertIsInstance(path, bytes)
2334
2335 @support.skip_unless_symlink
2336 def test_bytes(self):
2337 os.symlink(self.filelinkb_target, self.filelinkb)
2338 self.addCleanup(support.unlink, self.filelinkb)
2339 path = os.readlink(self.filelinkb)
Steve Dower9eb3d542019-08-21 15:52:42 -07002340 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002341 self.assertIsInstance(path, bytes)
2342
2343
Tim Golden781bbeb2013-10-25 20:24:06 +01002344@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002345@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002346class Win32SymlinkTests(unittest.TestCase):
2347 filelink = 'filelinktest'
2348 filelink_target = os.path.abspath(__file__)
2349 dirlink = 'dirlinktest'
2350 dirlink_target = os.path.dirname(filelink_target)
2351 missing_link = 'missing link'
2352
2353 def setUp(self):
2354 assert os.path.exists(self.dirlink_target)
2355 assert os.path.exists(self.filelink_target)
2356 assert not os.path.exists(self.dirlink)
2357 assert not os.path.exists(self.filelink)
2358 assert not os.path.exists(self.missing_link)
2359
2360 def tearDown(self):
2361 if os.path.exists(self.filelink):
2362 os.remove(self.filelink)
2363 if os.path.exists(self.dirlink):
2364 os.rmdir(self.dirlink)
2365 if os.path.lexists(self.missing_link):
2366 os.remove(self.missing_link)
2367
2368 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002369 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002370 self.assertTrue(os.path.exists(self.dirlink))
2371 self.assertTrue(os.path.isdir(self.dirlink))
2372 self.assertTrue(os.path.islink(self.dirlink))
2373 self.check_stat(self.dirlink, self.dirlink_target)
2374
2375 def test_file_link(self):
2376 os.symlink(self.filelink_target, self.filelink)
2377 self.assertTrue(os.path.exists(self.filelink))
2378 self.assertTrue(os.path.isfile(self.filelink))
2379 self.assertTrue(os.path.islink(self.filelink))
2380 self.check_stat(self.filelink, self.filelink_target)
2381
2382 def _create_missing_dir_link(self):
2383 'Create a "directory" link to a non-existent target'
2384 linkname = self.missing_link
2385 if os.path.lexists(linkname):
2386 os.remove(linkname)
2387 target = r'c:\\target does not exist.29r3c740'
2388 assert not os.path.exists(target)
2389 target_is_dir = True
2390 os.symlink(target, linkname, target_is_dir)
2391
2392 def test_remove_directory_link_to_missing_target(self):
2393 self._create_missing_dir_link()
2394 # For compatibility with Unix, os.remove will check the
2395 # directory status and call RemoveDirectory if the symlink
2396 # was created with target_is_dir==True.
2397 os.remove(self.missing_link)
2398
Brian Curtind40e6f72010-07-08 21:39:08 +00002399 def test_isdir_on_directory_link_to_missing_target(self):
2400 self._create_missing_dir_link()
Steve Dower9eb3d542019-08-21 15:52:42 -07002401 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002402
Brian Curtind40e6f72010-07-08 21:39:08 +00002403 def test_rmdir_on_directory_link_to_missing_target(self):
2404 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002405 os.rmdir(self.missing_link)
2406
2407 def check_stat(self, link, target):
2408 self.assertEqual(os.stat(link), os.stat(target))
2409 self.assertNotEqual(os.lstat(link), os.stat(link))
2410
Brian Curtind25aef52011-06-13 15:16:04 -05002411 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002412 self.assertEqual(os.stat(bytes_link), os.stat(target))
2413 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002414
2415 def test_12084(self):
2416 level1 = os.path.abspath(support.TESTFN)
2417 level2 = os.path.join(level1, "level2")
2418 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002419 self.addCleanup(support.rmtree, level1)
2420
2421 os.mkdir(level1)
2422 os.mkdir(level2)
2423 os.mkdir(level3)
2424
2425 file1 = os.path.abspath(os.path.join(level1, "file1"))
2426 create_file(file1)
2427
2428 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002429 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002430 os.chdir(level2)
2431 link = os.path.join(level2, "link")
2432 os.symlink(os.path.relpath(file1), "link")
2433 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002434
Victor Stinnerae39d232016-03-24 17:12:55 +01002435 # Check os.stat calls from the same dir as the link
2436 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002437
Victor Stinnerae39d232016-03-24 17:12:55 +01002438 # Check os.stat calls from a dir below the link
2439 os.chdir(level1)
2440 self.assertEqual(os.stat(file1),
2441 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002442
Victor Stinnerae39d232016-03-24 17:12:55 +01002443 # Check os.stat calls from a dir above the link
2444 os.chdir(level3)
2445 self.assertEqual(os.stat(file1),
2446 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002447 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002448 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002449
SSE43c34aad2018-02-13 00:10:35 +07002450 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2451 and os.path.exists(r'C:\ProgramData'),
2452 'Test directories not found')
2453 def test_29248(self):
2454 # os.symlink() calls CreateSymbolicLink, which creates
2455 # the reparse data buffer with the print name stored
2456 # first, so the offset is always 0. CreateSymbolicLink
2457 # stores the "PrintName" DOS path (e.g. "C:\") first,
2458 # with an offset of 0, followed by the "SubstituteName"
2459 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2460 # the other hand, seems to have been created manually
2461 # with an inverted order.
2462 target = os.readlink(r'C:\Users\All Users')
2463 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2464
Steve Dower6921e732018-03-05 14:26:08 -08002465 def test_buffer_overflow(self):
2466 # Older versions would have a buffer overflow when detecting
2467 # whether a link source was a directory. This test ensures we
2468 # no longer crash, but does not otherwise validate the behavior
2469 segment = 'X' * 27
2470 path = os.path.join(*[segment] * 10)
2471 test_cases = [
2472 # overflow with absolute src
2473 ('\\' + path, segment),
2474 # overflow dest with relative src
2475 (segment, path),
2476 # overflow when joining src
2477 (path[:180], path[:180]),
2478 ]
2479 for src, dest in test_cases:
2480 try:
2481 os.symlink(src, dest)
2482 except FileNotFoundError:
2483 pass
2484 else:
2485 try:
2486 os.remove(dest)
2487 except OSError:
2488 pass
2489 # Also test with bytes, since that is a separate code path.
2490 try:
2491 os.symlink(os.fsencode(src), os.fsencode(dest))
2492 except FileNotFoundError:
2493 pass
2494 else:
2495 try:
2496 os.remove(dest)
2497 except OSError:
2498 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002499
Steve Dower9eb3d542019-08-21 15:52:42 -07002500 def test_appexeclink(self):
2501 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Miss Islington (bot)967d6252019-08-21 18:01:22 -07002502 if not os.path.isdir(root):
2503 self.skipTest("test requires a WindowsApps directory")
2504
Steve Dower9eb3d542019-08-21 15:52:42 -07002505 aliases = [os.path.join(root, a)
2506 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2507
2508 for alias in aliases:
2509 if support.verbose:
2510 print()
2511 print("Testing with", alias)
2512 st = os.lstat(alias)
2513 self.assertEqual(st, os.stat(alias))
2514 self.assertFalse(stat.S_ISLNK(st.st_mode))
2515 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2516 # testing the first one we see is sufficient
2517 break
2518 else:
2519 self.skipTest("test requires an app execution alias")
2520
Tim Golden0321cf22014-05-05 19:46:17 +01002521@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2522class Win32JunctionTests(unittest.TestCase):
2523 junction = 'junctiontest'
2524 junction_target = os.path.dirname(os.path.abspath(__file__))
2525
2526 def setUp(self):
2527 assert os.path.exists(self.junction_target)
Steve Dower9eb3d542019-08-21 15:52:42 -07002528 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002529
2530 def tearDown(self):
Steve Dower9eb3d542019-08-21 15:52:42 -07002531 if os.path.lexists(self.junction):
2532 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002533
2534 def test_create_junction(self):
2535 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dower9eb3d542019-08-21 15:52:42 -07002536 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002537 self.assertTrue(os.path.exists(self.junction))
2538 self.assertTrue(os.path.isdir(self.junction))
Steve Dower9eb3d542019-08-21 15:52:42 -07002539 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2540 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002541
Steve Dower9eb3d542019-08-21 15:52:42 -07002542 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002543 self.assertFalse(os.path.islink(self.junction))
Steve Dower9eb3d542019-08-21 15:52:42 -07002544 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2545 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002546
2547 def test_unlink_removes_junction(self):
2548 _winapi.CreateJunction(self.junction_target, self.junction)
2549 self.assertTrue(os.path.exists(self.junction))
Steve Dower9eb3d542019-08-21 15:52:42 -07002550 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002551
2552 os.unlink(self.junction)
2553 self.assertFalse(os.path.exists(self.junction))
2554
Mark Becwarb82bfac2019-02-02 16:08:23 -05002555@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2556class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002557 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002558 nt = support.import_module('nt')
2559 ctypes = support.import_module('ctypes')
2560 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002561
2562 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2563 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2564
2565 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2566 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2567 ctypes.wintypes.LPDWORD)
2568
2569 # This is a pseudo-handle that doesn't need to be closed
2570 hproc = kernel.GetCurrentProcess()
2571
2572 handle_count = ctypes.wintypes.DWORD()
2573 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2574 self.assertEqual(1, ok)
2575
2576 before_count = handle_count.value
2577
2578 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002579 filenames = [
2580 r'\\?\C:',
2581 r'\\?\NUL',
2582 r'\\?\CONIN',
2583 __file__,
2584 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002585
Berker Peksag6ef726a2019-04-22 18:46:28 +03002586 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002587 for name in filenames:
2588 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002589 nt._getfinalpathname(name)
2590 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002591 # Failure is expected
2592 pass
2593 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002594 os.stat(name)
2595 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002596 pass
2597
2598 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2599 self.assertEqual(1, ok)
2600
2601 handle_delta = handle_count.value - before_count
2602
2603 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002604
Jason R. Coombs3a092862013-05-27 23:21:28 -04002605@support.skip_unless_symlink
2606class NonLocalSymlinkTests(unittest.TestCase):
2607
2608 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002609 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002610 Create this structure:
2611
2612 base
2613 \___ some_dir
2614 """
2615 os.makedirs('base/some_dir')
2616
2617 def tearDown(self):
2618 shutil.rmtree('base')
2619
2620 def test_directory_link_nonlocal(self):
2621 """
2622 The symlink target should resolve relative to the link, not relative
2623 to the current directory.
2624
2625 Then, link base/some_link -> base/some_dir and ensure that some_link
2626 is resolved as a directory.
2627
2628 In issue13772, it was discovered that directory detection failed if
2629 the symlink target was not specified relative to the current
2630 directory, which was a defect in the implementation.
2631 """
2632 src = os.path.join('base', 'some_link')
2633 os.symlink('some_dir', src)
2634 assert os.path.isdir(src)
2635
2636
Victor Stinnere8d51452010-08-19 01:05:19 +00002637class FSEncodingTests(unittest.TestCase):
2638 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002639 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2640 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002641
Victor Stinnere8d51452010-08-19 01:05:19 +00002642 def test_identity(self):
2643 # assert fsdecode(fsencode(x)) == x
2644 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2645 try:
2646 bytesfn = os.fsencode(fn)
2647 except UnicodeEncodeError:
2648 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002649 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002650
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002651
Brett Cannonefb00c02012-02-29 18:31:31 -05002652
2653class DeviceEncodingTests(unittest.TestCase):
2654
2655 def test_bad_fd(self):
2656 # Return None when an fd doesn't actually exist.
2657 self.assertIsNone(os.device_encoding(123456))
2658
Paul Monson62dfd7d2019-04-25 11:36:45 -07002659 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002660 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002661 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002662 def test_device_encoding(self):
2663 encoding = os.device_encoding(0)
2664 self.assertIsNotNone(encoding)
2665 self.assertTrue(codecs.lookup(encoding))
2666
2667
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002668class PidTests(unittest.TestCase):
2669 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2670 def test_getppid(self):
2671 p = subprocess.Popen([sys.executable, '-c',
2672 'import os; print(os.getppid())'],
2673 stdout=subprocess.PIPE)
2674 stdout, _ = p.communicate()
2675 # We are the parent of our subprocess
2676 self.assertEqual(int(stdout), os.getpid())
2677
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002678 def test_waitpid(self):
2679 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002680 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002681 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002682 status = os.waitpid(pid, 0)
2683 self.assertEqual(status, (pid, 0))
2684
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002685
Victor Stinner4659ccf2016-09-14 10:57:00 +02002686class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002687 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002688 self.exitcode = 17
2689
2690 filename = support.TESTFN
2691 self.addCleanup(support.unlink, filename)
2692
2693 if not with_env:
2694 code = 'import sys; sys.exit(%s)' % self.exitcode
2695 else:
2696 self.env = dict(os.environ)
2697 # create an unique key
2698 self.key = str(uuid.uuid4())
2699 self.env[self.key] = self.key
2700 # read the variable from os.environ to check that it exists
2701 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2702 % (self.key, self.exitcode))
2703
2704 with open(filename, "w") as fp:
2705 fp.write(code)
2706
Berker Peksag81816462016-09-15 20:19:47 +03002707 args = [sys.executable, filename]
2708 if use_bytes:
2709 args = [os.fsencode(a) for a in args]
2710 self.env = {os.fsencode(k): os.fsencode(v)
2711 for k, v in self.env.items()}
2712
2713 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002714
Berker Peksag4af23d72016-09-15 20:32:44 +03002715 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002716 def test_spawnl(self):
2717 args = self.create_args()
2718 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2719 self.assertEqual(exitcode, self.exitcode)
2720
Berker Peksag4af23d72016-09-15 20:32:44 +03002721 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002722 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002723 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002724 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2725 self.assertEqual(exitcode, self.exitcode)
2726
Berker Peksag4af23d72016-09-15 20:32:44 +03002727 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002728 def test_spawnlp(self):
2729 args = self.create_args()
2730 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2731 self.assertEqual(exitcode, self.exitcode)
2732
Berker Peksag4af23d72016-09-15 20:32:44 +03002733 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002734 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002735 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002736 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2737 self.assertEqual(exitcode, self.exitcode)
2738
Berker Peksag4af23d72016-09-15 20:32:44 +03002739 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002740 def test_spawnv(self):
2741 args = self.create_args()
2742 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2743 self.assertEqual(exitcode, self.exitcode)
2744
Berker Peksag4af23d72016-09-15 20:32:44 +03002745 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002746 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002747 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002748 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2749 self.assertEqual(exitcode, self.exitcode)
2750
Berker Peksag4af23d72016-09-15 20:32:44 +03002751 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002752 def test_spawnvp(self):
2753 args = self.create_args()
2754 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2755 self.assertEqual(exitcode, self.exitcode)
2756
Berker Peksag4af23d72016-09-15 20:32:44 +03002757 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002758 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002759 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002760 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2761 self.assertEqual(exitcode, self.exitcode)
2762
Berker Peksag4af23d72016-09-15 20:32:44 +03002763 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002764 def test_nowait(self):
2765 args = self.create_args()
2766 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2767 result = os.waitpid(pid, 0)
2768 self.assertEqual(result[0], pid)
2769 status = result[1]
2770 if hasattr(os, 'WIFEXITED'):
2771 self.assertTrue(os.WIFEXITED(status))
2772 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2773 else:
2774 self.assertEqual(status, self.exitcode << 8)
2775
Berker Peksag4af23d72016-09-15 20:32:44 +03002776 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002777 def test_spawnve_bytes(self):
2778 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2779 args = self.create_args(with_env=True, use_bytes=True)
2780 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2781 self.assertEqual(exitcode, self.exitcode)
2782
Steve Dower859fd7b2016-11-19 18:53:19 -08002783 @requires_os_func('spawnl')
2784 def test_spawnl_noargs(self):
2785 args = self.create_args()
2786 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002787 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002788
2789 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002790 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002791 args = self.create_args()
2792 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002793 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002794
2795 @requires_os_func('spawnv')
2796 def test_spawnv_noargs(self):
2797 args = self.create_args()
2798 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2799 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002800 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2801 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002802
2803 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002804 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002805 args = self.create_args()
2806 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2807 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002808 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2809 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002810
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002811 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002812 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002813
Ville Skyttä49b27342017-08-03 09:00:59 +03002814 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002815 newenv = os.environ.copy()
2816 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2817 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002818 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002819 except ValueError:
2820 pass
2821 else:
2822 self.assertEqual(exitcode, 127)
2823
Ville Skyttä49b27342017-08-03 09:00:59 +03002824 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002825 newenv = os.environ.copy()
2826 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2827 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002828 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002829 except ValueError:
2830 pass
2831 else:
2832 self.assertEqual(exitcode, 127)
2833
Ville Skyttä49b27342017-08-03 09:00:59 +03002834 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002835 newenv = os.environ.copy()
2836 newenv["FRUIT=ORANGE"] = "lemon"
2837 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002838 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002839 except ValueError:
2840 pass
2841 else:
2842 self.assertEqual(exitcode, 127)
2843
Ville Skyttä49b27342017-08-03 09:00:59 +03002844 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002845 filename = support.TESTFN
2846 self.addCleanup(support.unlink, filename)
2847 with open(filename, "w") as fp:
2848 fp.write('import sys, os\n'
2849 'if os.getenv("FRUIT") != "orange=lemon":\n'
2850 ' raise AssertionError')
2851 args = [sys.executable, filename]
2852 newenv = os.environ.copy()
2853 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002854 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002855 self.assertEqual(exitcode, 0)
2856
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002857 @requires_os_func('spawnve')
2858 def test_spawnve_invalid_env(self):
2859 self._test_invalid_env(os.spawnve)
2860
2861 @requires_os_func('spawnvpe')
2862 def test_spawnvpe_invalid_env(self):
2863 self._test_invalid_env(os.spawnvpe)
2864
Serhiy Storchaka77703942017-06-25 07:33:01 +03002865
Brian Curtin0151b8e2010-09-24 13:43:43 +00002866# The introduction of this TestCase caused at least two different errors on
2867# *nix buildbots. Temporarily skip this to let the buildbots move along.
2868@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002869@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2870class LoginTests(unittest.TestCase):
2871 def test_getlogin(self):
2872 user_name = os.getlogin()
2873 self.assertNotEqual(len(user_name), 0)
2874
2875
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002876@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2877 "needs os.getpriority and os.setpriority")
2878class ProgramPriorityTests(unittest.TestCase):
2879 """Tests for os.getpriority() and os.setpriority()."""
2880
2881 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002882
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002883 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2884 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2885 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002886 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2887 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002888 raise unittest.SkipTest("unable to reliably test setpriority "
2889 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002890 else:
2891 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002892 finally:
2893 try:
2894 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2895 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002896 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002897 raise
2898
2899
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002900class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002901
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002902 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002903
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002904 def __init__(self, conn):
2905 asynchat.async_chat.__init__(self, conn)
2906 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002907 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002908 self.closed = False
2909 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002910
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002911 def handle_read(self):
2912 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002913 if self.accumulate:
2914 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002915
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002916 def get_data(self):
2917 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002918
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002919 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002920 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002921 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002922
2923 def handle_error(self):
2924 raise
2925
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002926 def __init__(self, address):
2927 threading.Thread.__init__(self)
2928 asyncore.dispatcher.__init__(self)
2929 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2930 self.bind(address)
2931 self.listen(5)
2932 self.host, self.port = self.socket.getsockname()[:2]
2933 self.handler_instance = None
2934 self._active = False
2935 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002936
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 # --- public API
2938
2939 @property
2940 def running(self):
2941 return self._active
2942
2943 def start(self):
2944 assert not self.running
2945 self.__flag = threading.Event()
2946 threading.Thread.start(self)
2947 self.__flag.wait()
2948
2949 def stop(self):
2950 assert self.running
2951 self._active = False
2952 self.join()
2953
2954 def wait(self):
2955 # wait for handler connection to be closed, then stop the server
2956 while not getattr(self.handler_instance, "closed", False):
2957 time.sleep(0.001)
2958 self.stop()
2959
2960 # --- internals
2961
2962 def run(self):
2963 self._active = True
2964 self.__flag.set()
2965 while self._active and asyncore.socket_map:
2966 self._active_lock.acquire()
2967 asyncore.loop(timeout=0.001, count=1)
2968 self._active_lock.release()
2969 asyncore.close_all()
2970
2971 def handle_accept(self):
2972 conn, addr = self.accept()
2973 self.handler_instance = self.Handler(conn)
2974
2975 def handle_connect(self):
2976 self.close()
2977 handle_read = handle_connect
2978
2979 def writable(self):
2980 return 0
2981
2982 def handle_error(self):
2983 raise
2984
2985
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002986@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2987class TestSendfile(unittest.TestCase):
2988
Victor Stinner8c663fd2017-11-08 14:44:44 -08002989 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002990 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002991 not sys.platform.startswith("solaris") and \
2992 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002993 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2994 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002995 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2996 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002997
2998 @classmethod
2999 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003000 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01003001 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003002
3003 @classmethod
3004 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003005 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003006 support.unlink(support.TESTFN)
3007
3008 def setUp(self):
3009 self.server = SendfileTestServer((support.HOST, 0))
3010 self.server.start()
3011 self.client = socket.socket()
3012 self.client.connect((self.server.host, self.server.port))
3013 self.client.settimeout(1)
3014 # synchronize by waiting for "220 ready" response
3015 self.client.recv(1024)
3016 self.sockno = self.client.fileno()
3017 self.file = open(support.TESTFN, 'rb')
3018 self.fileno = self.file.fileno()
3019
3020 def tearDown(self):
3021 self.file.close()
3022 self.client.close()
3023 if self.server.running:
3024 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003025 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003026
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003027 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003028 """A higher level wrapper representing how an application is
3029 supposed to use sendfile().
3030 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003031 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003032 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003033 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003034 except OSError as err:
3035 if err.errno == errno.ECONNRESET:
3036 # disconnected
3037 raise
3038 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3039 # we have to retry send data
3040 continue
3041 else:
3042 raise
3043
3044 def test_send_whole_file(self):
3045 # normal send
3046 total_sent = 0
3047 offset = 0
3048 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003049 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003050 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3051 if sent == 0:
3052 break
3053 offset += sent
3054 total_sent += sent
3055 self.assertTrue(sent <= nbytes)
3056 self.assertEqual(offset, total_sent)
3057
3058 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003059 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003060 self.client.close()
3061 self.server.wait()
3062 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003063 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003064 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003065
3066 def test_send_at_certain_offset(self):
3067 # start sending a file at a certain offset
3068 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003069 offset = len(self.DATA) // 2
3070 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003071 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003072 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003073 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3074 if sent == 0:
3075 break
3076 offset += sent
3077 total_sent += sent
3078 self.assertTrue(sent <= nbytes)
3079
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003080 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003081 self.client.close()
3082 self.server.wait()
3083 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003084 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003085 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003086 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003087 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003088
3089 def test_offset_overflow(self):
3090 # specify an offset > file size
3091 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003092 try:
3093 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3094 except OSError as e:
3095 # Solaris can raise EINVAL if offset >= file length, ignore.
3096 if e.errno != errno.EINVAL:
3097 raise
3098 else:
3099 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003100 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003101 self.client.close()
3102 self.server.wait()
3103 data = self.server.handler_instance.get_data()
3104 self.assertEqual(data, b'')
3105
3106 def test_invalid_offset(self):
3107 with self.assertRaises(OSError) as cm:
3108 os.sendfile(self.sockno, self.fileno, -1, 4096)
3109 self.assertEqual(cm.exception.errno, errno.EINVAL)
3110
Martin Panterbf19d162015-09-09 01:01:13 +00003111 def test_keywords(self):
3112 # Keyword arguments should be supported
3113 os.sendfile(out=self.sockno, offset=0, count=4096,
3114 **{'in': self.fileno})
3115 if self.SUPPORT_HEADERS_TRAILERS:
3116 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003117 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003118
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003119 # --- headers / trailers tests
3120
Serhiy Storchaka43767632013-11-03 21:31:38 +02003121 @requires_headers_trailers
3122 def test_headers(self):
3123 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003124 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003125 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003126 headers=[b"x" * 512, b"y" * 256])
3127 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003128 total_sent += sent
3129 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003130 while total_sent < len(expected_data):
3131 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003132 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3133 offset, nbytes)
3134 if sent == 0:
3135 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003136 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003137 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003138 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003139
Serhiy Storchaka43767632013-11-03 21:31:38 +02003140 self.assertEqual(total_sent, len(expected_data))
3141 self.client.close()
3142 self.server.wait()
3143 data = self.server.handler_instance.get_data()
3144 self.assertEqual(hash(data), hash(expected_data))
3145
3146 @requires_headers_trailers
3147 def test_trailers(self):
3148 TESTFN2 = support.TESTFN + "2"
3149 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003150
3151 self.addCleanup(support.unlink, TESTFN2)
3152 create_file(TESTFN2, file_data)
3153
3154 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003155 os.sendfile(self.sockno, f.fileno(), 0, 5,
3156 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003157 self.client.close()
3158 self.server.wait()
3159 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003160 self.assertEqual(data, b"abcde123456789")
3161
3162 @requires_headers_trailers
3163 @requires_32b
3164 def test_headers_overflow_32bits(self):
3165 self.server.handler_instance.accumulate = False
3166 with self.assertRaises(OSError) as cm:
3167 os.sendfile(self.sockno, self.fileno, 0, 0,
3168 headers=[b"x" * 2**16] * 2**15)
3169 self.assertEqual(cm.exception.errno, errno.EINVAL)
3170
3171 @requires_headers_trailers
3172 @requires_32b
3173 def test_trailers_overflow_32bits(self):
3174 self.server.handler_instance.accumulate = False
3175 with self.assertRaises(OSError) as cm:
3176 os.sendfile(self.sockno, self.fileno, 0, 0,
3177 trailers=[b"x" * 2**16] * 2**15)
3178 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003179
Serhiy Storchaka43767632013-11-03 21:31:38 +02003180 @requires_headers_trailers
3181 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3182 'test needs os.SF_NODISKIO')
3183 def test_flags(self):
3184 try:
3185 os.sendfile(self.sockno, self.fileno, 0, 4096,
3186 flags=os.SF_NODISKIO)
3187 except OSError as err:
3188 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3189 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003190
3191
Larry Hastings9cf065c2012-06-22 16:30:09 -07003192def supports_extended_attributes():
3193 if not hasattr(os, "setxattr"):
3194 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003195
Larry Hastings9cf065c2012-06-22 16:30:09 -07003196 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003197 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003198 try:
3199 os.setxattr(fp.fileno(), b"user.test", b"")
3200 except OSError:
3201 return False
3202 finally:
3203 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003204
3205 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003206
3207
3208@unittest.skipUnless(supports_extended_attributes(),
3209 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003210# Kernels < 2.6.39 don't respect setxattr flags.
3211@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003212class ExtendedAttributeTests(unittest.TestCase):
3213
Larry Hastings9cf065c2012-06-22 16:30:09 -07003214 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003215 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003216 self.addCleanup(support.unlink, fn)
3217 create_file(fn)
3218
Benjamin Peterson799bd802011-08-31 22:15:17 -04003219 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003220 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003221 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003222
Victor Stinnerf12e5062011-10-16 22:12:03 +02003223 init_xattr = listxattr(fn)
3224 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003225
Larry Hastings9cf065c2012-06-22 16:30:09 -07003226 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003227 xattr = set(init_xattr)
3228 xattr.add("user.test")
3229 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003230 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3231 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3232 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003233
Benjamin Peterson799bd802011-08-31 22:15:17 -04003234 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003235 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003236 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003237
Benjamin Peterson799bd802011-08-31 22:15:17 -04003238 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003239 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003240 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003241
Larry Hastings9cf065c2012-06-22 16:30:09 -07003242 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003243 xattr.add("user.test2")
3244 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003245 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003246
Benjamin Peterson799bd802011-08-31 22:15:17 -04003247 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003248 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003249 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003250
Victor Stinnerf12e5062011-10-16 22:12:03 +02003251 xattr.remove("user.test")
3252 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003253 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3254 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3255 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3256 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003257 many = sorted("user.test{}".format(i) for i in range(100))
3258 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003259 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003260 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003261
Larry Hastings9cf065c2012-06-22 16:30:09 -07003262 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003263 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003264 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003265
3266 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3267 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003268
3269 def test_simple(self):
3270 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3271 os.listxattr)
3272
3273 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003274 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3275 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003276
3277 def test_fds(self):
3278 def getxattr(path, *args):
3279 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003280 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003281 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003282 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003283 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003284 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003285 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003286 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003287 def listxattr(path, *args):
3288 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003289 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003290 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3291
3292
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003293@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3294class TermsizeTests(unittest.TestCase):
3295 def test_does_not_crash(self):
3296 """Check if get_terminal_size() returns a meaningful value.
3297
3298 There's no easy portable way to actually check the size of the
3299 terminal, so let's check if it returns something sensible instead.
3300 """
3301 try:
3302 size = os.get_terminal_size()
3303 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003304 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003305 # Under win32 a generic OSError can be thrown if the
3306 # handle cannot be retrieved
3307 self.skipTest("failed to query terminal size")
3308 raise
3309
Antoine Pitroucfade362012-02-08 23:48:59 +01003310 self.assertGreaterEqual(size.columns, 0)
3311 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003312
3313 def test_stty_match(self):
3314 """Check if stty returns the same results
3315
3316 stty actually tests stdin, so get_terminal_size is invoked on
3317 stdin explicitly. If stty succeeded, then get_terminal_size()
3318 should work too.
3319 """
3320 try:
3321 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003322 except (FileNotFoundError, subprocess.CalledProcessError,
3323 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003324 self.skipTest("stty invocation failed")
3325 expected = (int(size[1]), int(size[0])) # reversed order
3326
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003327 try:
3328 actual = os.get_terminal_size(sys.__stdin__.fileno())
3329 except OSError as e:
3330 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3331 # Under win32 a generic OSError can be thrown if the
3332 # handle cannot be retrieved
3333 self.skipTest("failed to query terminal size")
3334 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003335 self.assertEqual(expected, actual)
3336
3337
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003338@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003339@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003340class MemfdCreateTests(unittest.TestCase):
3341 def test_memfd_create(self):
3342 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3343 self.assertNotEqual(fd, -1)
3344 self.addCleanup(os.close, fd)
3345 self.assertFalse(os.get_inheritable(fd))
3346 with open(fd, "wb", closefd=False) as f:
3347 f.write(b'memfd_create')
3348 self.assertEqual(f.tell(), 12)
3349
3350 fd2 = os.memfd_create("Hi")
3351 self.addCleanup(os.close, fd2)
3352 self.assertFalse(os.get_inheritable(fd2))
3353
3354
Victor Stinner292c8352012-10-30 02:17:38 +01003355class OSErrorTests(unittest.TestCase):
3356 def setUp(self):
3357 class Str(str):
3358 pass
3359
Victor Stinnerafe17062012-10-31 22:47:43 +01003360 self.bytes_filenames = []
3361 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003362 if support.TESTFN_UNENCODABLE is not None:
3363 decoded = support.TESTFN_UNENCODABLE
3364 else:
3365 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003366 self.unicode_filenames.append(decoded)
3367 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003368 if support.TESTFN_UNDECODABLE is not None:
3369 encoded = support.TESTFN_UNDECODABLE
3370 else:
3371 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003372 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003373 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003374 self.bytes_filenames.append(memoryview(encoded))
3375
3376 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003377
3378 def test_oserror_filename(self):
3379 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003380 (self.filenames, os.chdir,),
3381 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003382 (self.filenames, os.lstat,),
3383 (self.filenames, os.open, os.O_RDONLY),
3384 (self.filenames, os.rmdir,),
3385 (self.filenames, os.stat,),
3386 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003387 ]
3388 if sys.platform == "win32":
3389 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003390 (self.bytes_filenames, os.rename, b"dst"),
3391 (self.bytes_filenames, os.replace, b"dst"),
3392 (self.unicode_filenames, os.rename, "dst"),
3393 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003394 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003395 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003396 else:
3397 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003398 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003399 (self.filenames, os.rename, "dst"),
3400 (self.filenames, os.replace, "dst"),
3401 ))
3402 if hasattr(os, "chown"):
3403 funcs.append((self.filenames, os.chown, 0, 0))
3404 if hasattr(os, "lchown"):
3405 funcs.append((self.filenames, os.lchown, 0, 0))
3406 if hasattr(os, "truncate"):
3407 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003408 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003409 funcs.append((self.filenames, os.chflags, 0))
3410 if hasattr(os, "lchflags"):
3411 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003412 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003413 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003414 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003415 if sys.platform == "win32":
3416 funcs.append((self.bytes_filenames, os.link, b"dst"))
3417 funcs.append((self.unicode_filenames, os.link, "dst"))
3418 else:
3419 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003420 if hasattr(os, "listxattr"):
3421 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003422 (self.filenames, os.listxattr,),
3423 (self.filenames, os.getxattr, "user.test"),
3424 (self.filenames, os.setxattr, "user.test", b'user'),
3425 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003426 ))
3427 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003428 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003429 if hasattr(os, "readlink"):
Miss Islington (bot)c30c8692019-08-21 14:09:33 -07003430 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003431
Steve Dowercc16be82016-09-08 10:35:16 -07003432
Victor Stinnerafe17062012-10-31 22:47:43 +01003433 for filenames, func, *func_args in funcs:
3434 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003435 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003436 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003437 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003438 else:
3439 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3440 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003441 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003442 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003443 except UnicodeDecodeError:
3444 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003445 else:
3446 self.fail("No exception thrown by {}".format(func))
3447
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003448class CPUCountTests(unittest.TestCase):
3449 def test_cpu_count(self):
3450 cpus = os.cpu_count()
3451 if cpus is not None:
3452 self.assertIsInstance(cpus, int)
3453 self.assertGreater(cpus, 0)
3454 else:
3455 self.skipTest("Could not determine the number of CPUs")
3456
Victor Stinnerdaf45552013-08-28 00:53:59 +02003457
3458class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003459 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003460 fd = os.open(__file__, os.O_RDONLY)
3461 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003462 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003463
Victor Stinnerdaf45552013-08-28 00:53:59 +02003464 os.set_inheritable(fd, True)
3465 self.assertEqual(os.get_inheritable(fd), True)
3466
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003467 @unittest.skipIf(fcntl is None, "need fcntl")
3468 def test_get_inheritable_cloexec(self):
3469 fd = os.open(__file__, os.O_RDONLY)
3470 self.addCleanup(os.close, fd)
3471 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003472
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003473 # clear FD_CLOEXEC flag
3474 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3475 flags &= ~fcntl.FD_CLOEXEC
3476 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003477
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003478 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003479
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003480 @unittest.skipIf(fcntl is None, "need fcntl")
3481 def test_set_inheritable_cloexec(self):
3482 fd = os.open(__file__, os.O_RDONLY)
3483 self.addCleanup(os.close, fd)
3484 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3485 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003486
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003487 os.set_inheritable(fd, True)
3488 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3489 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003490
Victor Stinnerdaf45552013-08-28 00:53:59 +02003491 def test_open(self):
3492 fd = os.open(__file__, os.O_RDONLY)
3493 self.addCleanup(os.close, fd)
3494 self.assertEqual(os.get_inheritable(fd), False)
3495
3496 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3497 def test_pipe(self):
3498 rfd, wfd = os.pipe()
3499 self.addCleanup(os.close, rfd)
3500 self.addCleanup(os.close, wfd)
3501 self.assertEqual(os.get_inheritable(rfd), False)
3502 self.assertEqual(os.get_inheritable(wfd), False)
3503
3504 def test_dup(self):
3505 fd1 = os.open(__file__, os.O_RDONLY)
3506 self.addCleanup(os.close, fd1)
3507
3508 fd2 = os.dup(fd1)
3509 self.addCleanup(os.close, fd2)
3510 self.assertEqual(os.get_inheritable(fd2), False)
3511
Miss Islington (bot)3921d122019-08-23 12:04:27 -07003512 def test_dup_standard_stream(self):
3513 fd = os.dup(1)
3514 self.addCleanup(os.close, fd)
3515 self.assertGreater(fd, 0)
3516
Miss Islington (bot)693945d2019-06-17 01:45:26 -07003517 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3518 def test_dup_nul(self):
3519 # os.dup() was creating inheritable fds for character files.
3520 fd1 = os.open('NUL', os.O_RDONLY)
3521 self.addCleanup(os.close, fd1)
3522 fd2 = os.dup(fd1)
3523 self.addCleanup(os.close, fd2)
3524 self.assertFalse(os.get_inheritable(fd2))
3525
Victor Stinnerdaf45552013-08-28 00:53:59 +02003526 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3527 def test_dup2(self):
3528 fd = os.open(__file__, os.O_RDONLY)
3529 self.addCleanup(os.close, fd)
3530
3531 # inheritable by default
3532 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003533 self.addCleanup(os.close, fd2)
3534 self.assertEqual(os.dup2(fd, fd2), fd2)
3535 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003536
3537 # force non-inheritable
3538 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003539 self.addCleanup(os.close, fd3)
3540 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3541 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003542
3543 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3544 def test_openpty(self):
3545 master_fd, slave_fd = os.openpty()
3546 self.addCleanup(os.close, master_fd)
3547 self.addCleanup(os.close, slave_fd)
3548 self.assertEqual(os.get_inheritable(master_fd), False)
3549 self.assertEqual(os.get_inheritable(slave_fd), False)
3550
3551
Brett Cannon3f9183b2016-08-26 14:44:48 -07003552class PathTConverterTests(unittest.TestCase):
3553 # tuples of (function name, allows fd arguments, additional arguments to
3554 # function, cleanup function)
3555 functions = [
3556 ('stat', True, (), None),
3557 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003558 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003559 ('chflags', False, (0,), None),
3560 ('lchflags', False, (0,), None),
3561 ('open', False, (0,), getattr(os, 'close', None)),
3562 ]
3563
3564 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003565 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003566 if os.name == 'nt':
3567 bytes_fspath = bytes_filename = None
3568 else:
3569 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003570 bytes_fspath = FakePath(bytes_filename)
3571 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003572 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003573 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003574
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003575 int_fspath = FakePath(fd)
3576 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003577
3578 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3579 with self.subTest(name=name):
3580 try:
3581 fn = getattr(os, name)
3582 except AttributeError:
3583 continue
3584
Brett Cannon8f96a302016-08-26 19:30:11 -07003585 for path in (str_filename, bytes_filename, str_fspath,
3586 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003587 if path is None:
3588 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003589 with self.subTest(name=name, path=path):
3590 result = fn(path, *extra_args)
3591 if cleanup_fn is not None:
3592 cleanup_fn(result)
3593
3594 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003595 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003596 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003597
3598 if allow_fd:
3599 result = fn(fd, *extra_args) # should not fail
3600 if cleanup_fn is not None:
3601 cleanup_fn(result)
3602 else:
3603 with self.assertRaisesRegex(
3604 TypeError,
3605 'os.PathLike'):
3606 fn(fd, *extra_args)
3607
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003608 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003609 msg = r'__fspath__\(\) to return str or bytes, not %s'
3610 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003611 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003612 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003613 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003614 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003615 os.stat(FakePath(object()))
3616
Brett Cannon3f9183b2016-08-26 14:44:48 -07003617
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003618@unittest.skipUnless(hasattr(os, 'get_blocking'),
3619 'needs os.get_blocking() and os.set_blocking()')
3620class BlockingTests(unittest.TestCase):
3621 def test_blocking(self):
3622 fd = os.open(__file__, os.O_RDONLY)
3623 self.addCleanup(os.close, fd)
3624 self.assertEqual(os.get_blocking(fd), True)
3625
3626 os.set_blocking(fd, False)
3627 self.assertEqual(os.get_blocking(fd), False)
3628
3629 os.set_blocking(fd, True)
3630 self.assertEqual(os.get_blocking(fd), True)
3631
3632
Yury Selivanov97e2e062014-09-26 12:33:06 -04003633
3634class ExportsTests(unittest.TestCase):
3635 def test_os_all(self):
3636 self.assertIn('open', os.__all__)
3637 self.assertIn('walk', os.__all__)
3638
3639
Victor Stinner6036e442015-03-08 01:58:04 +01003640class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003641 check_no_resource_warning = support.check_no_resource_warning
3642
Victor Stinner6036e442015-03-08 01:58:04 +01003643 def setUp(self):
3644 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003645 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003646 self.addCleanup(support.rmtree, self.path)
3647 os.mkdir(self.path)
3648
3649 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003650 path = self.bytes_path if isinstance(name, bytes) else self.path
3651 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003652 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003653 return filename
3654
3655 def get_entries(self, names):
3656 entries = dict((entry.name, entry)
3657 for entry in os.scandir(self.path))
3658 self.assertEqual(sorted(entries.keys()), names)
3659 return entries
3660
3661 def assert_stat_equal(self, stat1, stat2, skip_fields):
3662 if skip_fields:
3663 for attr in dir(stat1):
3664 if not attr.startswith("st_"):
3665 continue
3666 if attr in ("st_dev", "st_ino", "st_nlink"):
3667 continue
3668 self.assertEqual(getattr(stat1, attr),
3669 getattr(stat2, attr),
3670 (stat1, stat2, attr))
3671 else:
3672 self.assertEqual(stat1, stat2)
3673
3674 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003675 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003676 self.assertEqual(entry.name, name)
3677 self.assertEqual(entry.path, os.path.join(self.path, name))
3678 self.assertEqual(entry.inode(),
3679 os.stat(entry.path, follow_symlinks=False).st_ino)
3680
3681 entry_stat = os.stat(entry.path)
3682 self.assertEqual(entry.is_dir(),
3683 stat.S_ISDIR(entry_stat.st_mode))
3684 self.assertEqual(entry.is_file(),
3685 stat.S_ISREG(entry_stat.st_mode))
3686 self.assertEqual(entry.is_symlink(),
3687 os.path.islink(entry.path))
3688
3689 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3690 self.assertEqual(entry.is_dir(follow_symlinks=False),
3691 stat.S_ISDIR(entry_lstat.st_mode))
3692 self.assertEqual(entry.is_file(follow_symlinks=False),
3693 stat.S_ISREG(entry_lstat.st_mode))
3694
3695 self.assert_stat_equal(entry.stat(),
3696 entry_stat,
3697 os.name == 'nt' and not is_symlink)
3698 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3699 entry_lstat,
3700 os.name == 'nt')
3701
3702 def test_attributes(self):
3703 link = hasattr(os, 'link')
3704 symlink = support.can_symlink()
3705
3706 dirname = os.path.join(self.path, "dir")
3707 os.mkdir(dirname)
3708 filename = self.create_file("file.txt")
3709 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003710 try:
3711 os.link(filename, os.path.join(self.path, "link_file.txt"))
3712 except PermissionError as e:
3713 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003714 if symlink:
3715 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3716 target_is_directory=True)
3717 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3718
3719 names = ['dir', 'file.txt']
3720 if link:
3721 names.append('link_file.txt')
3722 if symlink:
3723 names.extend(('symlink_dir', 'symlink_file.txt'))
3724 entries = self.get_entries(names)
3725
3726 entry = entries['dir']
3727 self.check_entry(entry, 'dir', True, False, False)
3728
3729 entry = entries['file.txt']
3730 self.check_entry(entry, 'file.txt', False, True, False)
3731
3732 if link:
3733 entry = entries['link_file.txt']
3734 self.check_entry(entry, 'link_file.txt', False, True, False)
3735
3736 if symlink:
3737 entry = entries['symlink_dir']
3738 self.check_entry(entry, 'symlink_dir', True, False, True)
3739
3740 entry = entries['symlink_file.txt']
3741 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3742
3743 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003744 path = self.bytes_path if isinstance(name, bytes) else self.path
3745 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003746 self.assertEqual(len(entries), 1)
3747
3748 entry = entries[0]
3749 self.assertEqual(entry.name, name)
3750 return entry
3751
Brett Cannon96881cd2016-06-10 14:37:21 -07003752 def create_file_entry(self, name='file.txt'):
3753 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003754 return self.get_entry(os.path.basename(filename))
3755
3756 def test_current_directory(self):
3757 filename = self.create_file()
3758 old_dir = os.getcwd()
3759 try:
3760 os.chdir(self.path)
3761
3762 # call scandir() without parameter: it must list the content
3763 # of the current directory
3764 entries = dict((entry.name, entry) for entry in os.scandir())
3765 self.assertEqual(sorted(entries.keys()),
3766 [os.path.basename(filename)])
3767 finally:
3768 os.chdir(old_dir)
3769
3770 def test_repr(self):
3771 entry = self.create_file_entry()
3772 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3773
Brett Cannon96881cd2016-06-10 14:37:21 -07003774 def test_fspath_protocol(self):
3775 entry = self.create_file_entry()
3776 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3777
3778 def test_fspath_protocol_bytes(self):
3779 bytes_filename = os.fsencode('bytesfile.txt')
3780 bytes_entry = self.create_file_entry(name=bytes_filename)
3781 fspath = os.fspath(bytes_entry)
3782 self.assertIsInstance(fspath, bytes)
3783 self.assertEqual(fspath,
3784 os.path.join(os.fsencode(self.path),bytes_filename))
3785
Victor Stinner6036e442015-03-08 01:58:04 +01003786 def test_removed_dir(self):
3787 path = os.path.join(self.path, 'dir')
3788
3789 os.mkdir(path)
3790 entry = self.get_entry('dir')
3791 os.rmdir(path)
3792
3793 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3794 if os.name == 'nt':
3795 self.assertTrue(entry.is_dir())
3796 self.assertFalse(entry.is_file())
3797 self.assertFalse(entry.is_symlink())
3798 if os.name == 'nt':
3799 self.assertRaises(FileNotFoundError, entry.inode)
3800 # don't fail
3801 entry.stat()
3802 entry.stat(follow_symlinks=False)
3803 else:
3804 self.assertGreater(entry.inode(), 0)
3805 self.assertRaises(FileNotFoundError, entry.stat)
3806 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3807
3808 def test_removed_file(self):
3809 entry = self.create_file_entry()
3810 os.unlink(entry.path)
3811
3812 self.assertFalse(entry.is_dir())
3813 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3814 if os.name == 'nt':
3815 self.assertTrue(entry.is_file())
3816 self.assertFalse(entry.is_symlink())
3817 if os.name == 'nt':
3818 self.assertRaises(FileNotFoundError, entry.inode)
3819 # don't fail
3820 entry.stat()
3821 entry.stat(follow_symlinks=False)
3822 else:
3823 self.assertGreater(entry.inode(), 0)
3824 self.assertRaises(FileNotFoundError, entry.stat)
3825 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3826
3827 def test_broken_symlink(self):
3828 if not support.can_symlink():
3829 return self.skipTest('cannot create symbolic link')
3830
3831 filename = self.create_file("file.txt")
3832 os.symlink(filename,
3833 os.path.join(self.path, "symlink.txt"))
3834 entries = self.get_entries(['file.txt', 'symlink.txt'])
3835 entry = entries['symlink.txt']
3836 os.unlink(filename)
3837
3838 self.assertGreater(entry.inode(), 0)
3839 self.assertFalse(entry.is_dir())
3840 self.assertFalse(entry.is_file()) # broken symlink returns False
3841 self.assertFalse(entry.is_dir(follow_symlinks=False))
3842 self.assertFalse(entry.is_file(follow_symlinks=False))
3843 self.assertTrue(entry.is_symlink())
3844 self.assertRaises(FileNotFoundError, entry.stat)
3845 # don't fail
3846 entry.stat(follow_symlinks=False)
3847
3848 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003849 self.create_file("file.txt")
3850
3851 path_bytes = os.fsencode(self.path)
3852 entries = list(os.scandir(path_bytes))
3853 self.assertEqual(len(entries), 1, entries)
3854 entry = entries[0]
3855
3856 self.assertEqual(entry.name, b'file.txt')
3857 self.assertEqual(entry.path,
3858 os.fsencode(os.path.join(self.path, 'file.txt')))
3859
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003860 def test_bytes_like(self):
3861 self.create_file("file.txt")
3862
3863 for cls in bytearray, memoryview:
3864 path_bytes = cls(os.fsencode(self.path))
3865 with self.assertWarns(DeprecationWarning):
3866 entries = list(os.scandir(path_bytes))
3867 self.assertEqual(len(entries), 1, entries)
3868 entry = entries[0]
3869
3870 self.assertEqual(entry.name, b'file.txt')
3871 self.assertEqual(entry.path,
3872 os.fsencode(os.path.join(self.path, 'file.txt')))
3873 self.assertIs(type(entry.name), bytes)
3874 self.assertIs(type(entry.path), bytes)
3875
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003876 @unittest.skipUnless(os.listdir in os.supports_fd,
3877 'fd support for listdir required for this test.')
3878 def test_fd(self):
3879 self.assertIn(os.scandir, os.supports_fd)
3880 self.create_file('file.txt')
3881 expected_names = ['file.txt']
3882 if support.can_symlink():
3883 os.symlink('file.txt', os.path.join(self.path, 'link'))
3884 expected_names.append('link')
3885
3886 fd = os.open(self.path, os.O_RDONLY)
3887 try:
3888 with os.scandir(fd) as it:
3889 entries = list(it)
3890 names = [entry.name for entry in entries]
3891 self.assertEqual(sorted(names), expected_names)
3892 self.assertEqual(names, os.listdir(fd))
3893 for entry in entries:
3894 self.assertEqual(entry.path, entry.name)
3895 self.assertEqual(os.fspath(entry), entry.name)
3896 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3897 if os.stat in os.supports_dir_fd:
3898 st = os.stat(entry.name, dir_fd=fd)
3899 self.assertEqual(entry.stat(), st)
3900 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3901 self.assertEqual(entry.stat(follow_symlinks=False), st)
3902 finally:
3903 os.close(fd)
3904
Victor Stinner6036e442015-03-08 01:58:04 +01003905 def test_empty_path(self):
3906 self.assertRaises(FileNotFoundError, os.scandir, '')
3907
3908 def test_consume_iterator_twice(self):
3909 self.create_file("file.txt")
3910 iterator = os.scandir(self.path)
3911
3912 entries = list(iterator)
3913 self.assertEqual(len(entries), 1, entries)
3914
3915 # check than consuming the iterator twice doesn't raise exception
3916 entries2 = list(iterator)
3917 self.assertEqual(len(entries2), 0, entries2)
3918
3919 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003920 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003921 self.assertRaises(TypeError, os.scandir, obj)
3922
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003923 def test_close(self):
3924 self.create_file("file.txt")
3925 self.create_file("file2.txt")
3926 iterator = os.scandir(self.path)
3927 next(iterator)
3928 iterator.close()
3929 # multiple closes
3930 iterator.close()
3931 with self.check_no_resource_warning():
3932 del iterator
3933
3934 def test_context_manager(self):
3935 self.create_file("file.txt")
3936 self.create_file("file2.txt")
3937 with os.scandir(self.path) as iterator:
3938 next(iterator)
3939 with self.check_no_resource_warning():
3940 del iterator
3941
3942 def test_context_manager_close(self):
3943 self.create_file("file.txt")
3944 self.create_file("file2.txt")
3945 with os.scandir(self.path) as iterator:
3946 next(iterator)
3947 iterator.close()
3948
3949 def test_context_manager_exception(self):
3950 self.create_file("file.txt")
3951 self.create_file("file2.txt")
3952 with self.assertRaises(ZeroDivisionError):
3953 with os.scandir(self.path) as iterator:
3954 next(iterator)
3955 1/0
3956 with self.check_no_resource_warning():
3957 del iterator
3958
3959 def test_resource_warning(self):
3960 self.create_file("file.txt")
3961 self.create_file("file2.txt")
3962 iterator = os.scandir(self.path)
3963 next(iterator)
3964 with self.assertWarns(ResourceWarning):
3965 del iterator
3966 support.gc_collect()
3967 # exhausted iterator
3968 iterator = os.scandir(self.path)
3969 list(iterator)
3970 with self.check_no_resource_warning():
3971 del iterator
3972
Victor Stinner6036e442015-03-08 01:58:04 +01003973
Ethan Furmancdc08792016-06-02 15:06:09 -07003974class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003975
3976 # Abstracted so it can be overridden to test pure Python implementation
3977 # if a C version is provided.
3978 fspath = staticmethod(os.fspath)
3979
Ethan Furmancdc08792016-06-02 15:06:09 -07003980 def test_return_bytes(self):
3981 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003982 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003983
3984 def test_return_string(self):
3985 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003986 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003987
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003988 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003989 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003990 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003991
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003992 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003993 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3994 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3995
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003996 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003997 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3998 self.assertTrue(issubclass(FakePath, os.PathLike))
3999 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004000
Ethan Furmancdc08792016-06-02 15:06:09 -07004001 def test_garbage_in_exception_out(self):
4002 vapor = type('blah', (), {})
4003 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004004 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004005
4006 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004007 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004008
Brett Cannon044283a2016-07-15 10:41:49 -07004009 def test_bad_pathlike(self):
4010 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004011 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004012 # __fspath__ attribute that is not callable.
4013 c = type('foo', (), {})
4014 c.__fspath__ = 1
4015 self.assertRaises(TypeError, self.fspath, c())
4016 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004017 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004018 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004019
Victor Stinnerc29b5852017-11-02 07:28:27 -07004020
4021class TimesTests(unittest.TestCase):
4022 def test_times(self):
4023 times = os.times()
4024 self.assertIsInstance(times, os.times_result)
4025
4026 for field in ('user', 'system', 'children_user', 'children_system',
4027 'elapsed'):
4028 value = getattr(times, field)
4029 self.assertIsInstance(value, float)
4030
4031 if os.name == 'nt':
4032 self.assertEqual(times.children_user, 0)
4033 self.assertEqual(times.children_system, 0)
4034 self.assertEqual(times.elapsed, 0)
4035
4036
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004037# Only test if the C version is provided, otherwise TestPEP519 already tested
4038# the pure Python implementation.
4039if hasattr(os, "_fspath"):
4040 class TestPEP519PurePython(TestPEP519):
4041
4042 """Explicitly test the pure Python implementation of os.fspath()."""
4043
4044 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004045
4046
Fred Drake2e2be372001-09-20 21:33:42 +00004047if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004048 unittest.main()