blob: 784000a2eb3668ba71f42fab3a2eadf30258c2c3 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200109 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Victor Stinner689830e2019-06-26 17:31:12 +0200144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Victor Stinner47aacc82015-06-12 17:26:23 +0200594
595class UtimeTests(unittest.TestCase):
596 def setUp(self):
597 self.dirname = support.TESTFN
598 self.fname = os.path.join(self.dirname, "f1")
599
600 self.addCleanup(support.rmtree, self.dirname)
601 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100602 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200603
Victor Stinner47aacc82015-06-12 17:26:23 +0200604 def support_subsecond(self, filename):
605 # Heuristic to check if the filesystem supports timestamp with
606 # subsecond resolution: check if float and int timestamps are different
607 st = os.stat(filename)
608 return ((st.st_atime != st[7])
609 or (st.st_mtime != st[8])
610 or (st.st_ctime != st[9]))
611
612 def _test_utime(self, set_time, filename=None):
613 if not filename:
614 filename = self.fname
615
616 support_subsecond = self.support_subsecond(filename)
617 if support_subsecond:
618 # Timestamp with a resolution of 1 microsecond (10^-6).
619 #
620 # The resolution of the C internal function used by os.utime()
621 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
622 # test with a resolution of 1 ns requires more work:
623 # see the issue #15745.
624 atime_ns = 1002003000 # 1.002003 seconds
625 mtime_ns = 4005006000 # 4.005006 seconds
626 else:
627 # use a resolution of 1 second
628 atime_ns = 5 * 10**9
629 mtime_ns = 8 * 10**9
630
631 set_time(filename, (atime_ns, mtime_ns))
632 st = os.stat(filename)
633
634 if support_subsecond:
635 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
636 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
637 else:
638 self.assertEqual(st.st_atime, atime_ns * 1e-9)
639 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
640 self.assertEqual(st.st_atime_ns, atime_ns)
641 self.assertEqual(st.st_mtime_ns, mtime_ns)
642
643 def test_utime(self):
644 def set_time(filename, ns):
645 # test the ns keyword parameter
646 os.utime(filename, ns=ns)
647 self._test_utime(set_time)
648
649 @staticmethod
650 def ns_to_sec(ns):
651 # Convert a number of nanosecond (int) to a number of seconds (float).
652 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
653 # issue, os.utime() rounds towards minus infinity.
654 return (ns * 1e-9) + 0.5e-9
655
656 def test_utime_by_indexed(self):
657 # pass times as floating point seconds as the second indexed parameter
658 def set_time(filename, ns):
659 atime_ns, mtime_ns = ns
660 atime = self.ns_to_sec(atime_ns)
661 mtime = self.ns_to_sec(mtime_ns)
662 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
663 # or utime(time_t)
664 os.utime(filename, (atime, mtime))
665 self._test_utime(set_time)
666
667 def test_utime_by_times(self):
668 def set_time(filename, ns):
669 atime_ns, mtime_ns = ns
670 atime = self.ns_to_sec(atime_ns)
671 mtime = self.ns_to_sec(mtime_ns)
672 # test the times keyword parameter
673 os.utime(filename, times=(atime, mtime))
674 self._test_utime(set_time)
675
676 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
677 "follow_symlinks support for utime required "
678 "for this test.")
679 def test_utime_nofollow_symlinks(self):
680 def set_time(filename, ns):
681 # use follow_symlinks=False to test utimensat(timespec)
682 # or lutimes(timeval)
683 os.utime(filename, ns=ns, follow_symlinks=False)
684 self._test_utime(set_time)
685
686 @unittest.skipUnless(os.utime in os.supports_fd,
687 "fd support for utime required for this test.")
688 def test_utime_fd(self):
689 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100690 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200691 # use a file descriptor to test futimens(timespec)
692 # or futimes(timeval)
693 os.utime(fp.fileno(), ns=ns)
694 self._test_utime(set_time)
695
696 @unittest.skipUnless(os.utime in os.supports_dir_fd,
697 "dir_fd support for utime required for this test.")
698 def test_utime_dir_fd(self):
699 def set_time(filename, ns):
700 dirname, name = os.path.split(filename)
701 dirfd = os.open(dirname, os.O_RDONLY)
702 try:
703 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
704 os.utime(name, dir_fd=dirfd, ns=ns)
705 finally:
706 os.close(dirfd)
707 self._test_utime(set_time)
708
709 def test_utime_directory(self):
710 def set_time(filename, ns):
711 # test calling os.utime() on a directory
712 os.utime(filename, ns=ns)
713 self._test_utime(set_time, filename=self.dirname)
714
715 def _test_utime_current(self, set_time):
716 # Get the system clock
717 current = time.time()
718
719 # Call os.utime() to set the timestamp to the current system clock
720 set_time(self.fname)
721
722 if not self.support_subsecond(self.fname):
723 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700724 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200725 # On Windows, the usual resolution of time.time() is 15.6 ms.
726 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700727 #
728 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
729 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200730 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200731 st = os.stat(self.fname)
732 msg = ("st_time=%r, current=%r, dt=%r"
733 % (st.st_mtime, current, st.st_mtime - current))
734 self.assertAlmostEqual(st.st_mtime, current,
735 delta=delta, msg=msg)
736
737 def test_utime_current(self):
738 def set_time(filename):
739 # Set to the current time in the new way
740 os.utime(self.fname)
741 self._test_utime_current(set_time)
742
743 def test_utime_current_old(self):
744 def set_time(filename):
745 # Set to the current time in the old explicit way.
746 os.utime(self.fname, None)
747 self._test_utime_current(set_time)
748
749 def get_file_system(self, path):
750 if sys.platform == 'win32':
751 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
752 import ctypes
753 kernel32 = ctypes.windll.kernel32
754 buf = ctypes.create_unicode_buffer("", 100)
755 ok = kernel32.GetVolumeInformationW(root, None, 0,
756 None, None, None,
757 buf, len(buf))
758 if ok:
759 return buf.value
760 # return None if the filesystem is unknown
761
762 def test_large_time(self):
763 # Many filesystems are limited to the year 2038. At least, the test
764 # pass with NTFS filesystem.
765 if self.get_file_system(self.dirname) != "NTFS":
766 self.skipTest("requires NTFS")
767
768 large = 5000000000 # some day in 2128
769 os.utime(self.fname, (large, large))
770 self.assertEqual(os.stat(self.fname).st_mtime, large)
771
772 def test_utime_invalid_arguments(self):
773 # seconds and nanoseconds parameters are mutually exclusive
774 with self.assertRaises(ValueError):
775 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200776 with self.assertRaises(TypeError):
777 os.utime(self.fname, [5, 5])
778 with self.assertRaises(TypeError):
779 os.utime(self.fname, (5,))
780 with self.assertRaises(TypeError):
781 os.utime(self.fname, (5, 5, 5))
782 with self.assertRaises(TypeError):
783 os.utime(self.fname, ns=[5, 5])
784 with self.assertRaises(TypeError):
785 os.utime(self.fname, ns=(5,))
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, ns=(5, 5, 5))
788
789 if os.utime not in os.supports_follow_symlinks:
790 with self.assertRaises(NotImplementedError):
791 os.utime(self.fname, (5, 5), follow_symlinks=False)
792 if os.utime not in os.supports_fd:
793 with open(self.fname, 'wb', 0) as fp:
794 with self.assertRaises(TypeError):
795 os.utime(fp.fileno(), (5, 5))
796 if os.utime not in os.supports_dir_fd:
797 with self.assertRaises(NotImplementedError):
798 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200799
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300800 @support.cpython_only
801 def test_issue31577(self):
802 # The interpreter shouldn't crash in case utime() received a bad
803 # ns argument.
804 def get_bad_int(divmod_ret_val):
805 class BadInt:
806 def __divmod__(*args):
807 return divmod_ret_val
808 return BadInt()
809 with self.assertRaises(TypeError):
810 os.utime(self.fname, ns=(get_bad_int(42), 1))
811 with self.assertRaises(TypeError):
812 os.utime(self.fname, ns=(get_bad_int(()), 1))
813 with self.assertRaises(TypeError):
814 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
815
Victor Stinner47aacc82015-06-12 17:26:23 +0200816
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000817from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000818
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000819class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000820 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000821 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000822
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000823 def setUp(self):
824 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000825 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000826 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000827 for key, value in self._reference().items():
828 os.environ[key] = value
829
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000830 def tearDown(self):
831 os.environ.clear()
832 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 os.environb.clear()
835 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000836
Christian Heimes90333392007-11-01 19:08:42 +0000837 def _reference(self):
838 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
839
840 def _empty_mapping(self):
841 os.environ.clear()
842 return os.environ
843
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000844 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200845 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
846 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000847 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000848 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300849 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200850 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300851 value = popen.read().strip()
852 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000853
Xavier de Gayed1415312016-07-22 12:15:29 +0200854 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
855 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000856 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200857 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
858 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 it = iter(popen)
860 self.assertEqual(next(it), "line1\n")
861 self.assertEqual(next(it), "line2\n")
862 self.assertEqual(next(it), "line3\n")
863 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000864
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000865 # Verify environ keys and values from the OS are of the
866 # correct str type.
867 def test_keyvalue_types(self):
868 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000869 self.assertEqual(type(key), str)
870 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000871
Christian Heimes90333392007-11-01 19:08:42 +0000872 def test_items(self):
873 for key, value in self._reference().items():
874 self.assertEqual(os.environ.get(key), value)
875
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000876 # Issue 7310
877 def test___repr__(self):
878 """Check that the repr() of os.environ looks like environ({...})."""
879 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000880 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
881 '{!r}: {!r}'.format(key, value)
882 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000883
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000884 def test_get_exec_path(self):
885 defpath_list = os.defpath.split(os.pathsep)
886 test_path = ['/monty', '/python', '', '/flying/circus']
887 test_env = {'PATH': os.pathsep.join(test_path)}
888
889 saved_environ = os.environ
890 try:
891 os.environ = dict(test_env)
892 # Test that defaulting to os.environ works.
893 self.assertSequenceEqual(test_path, os.get_exec_path())
894 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
895 finally:
896 os.environ = saved_environ
897
898 # No PATH environment variable
899 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
900 # Empty PATH environment variable
901 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
902 # Supplied PATH environment variable
903 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
904
Victor Stinnerb745a742010-05-18 17:17:23 +0000905 if os.supports_bytes_environ:
906 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000907 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000908 # ignore BytesWarning warning
909 with warnings.catch_warnings(record=True):
910 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000911 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000912 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000913 pass
914 else:
915 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000916
917 # bytes key and/or value
918 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
919 ['abc'])
920 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
921 ['abc'])
922 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
923 ['abc'])
924
925 @unittest.skipUnless(os.supports_bytes_environ,
926 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000927 def test_environb(self):
928 # os.environ -> os.environb
929 value = 'euro\u20ac'
930 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000931 value_bytes = value.encode(sys.getfilesystemencoding(),
932 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000933 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000934 msg = "U+20AC character is not encodable to %s" % (
935 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000936 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000937 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000938 self.assertEqual(os.environ['unicode'], value)
939 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000940
941 # os.environb -> os.environ
942 value = b'\xff'
943 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000944 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000947
Victor Stinner13ff2452018-01-22 18:32:50 +0100948 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100949 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100950 def test_unset_error(self):
951 if sys.platform == "win32":
952 # an environment variable is limited to 32,767 characters
953 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100954 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100955 else:
956 # "=" is not allowed in a variable name
957 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100958 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100959
Victor Stinner6d101392013-04-14 16:35:04 +0200960 def test_key_type(self):
961 missing = 'missingkey'
962 self.assertNotIn(missing, os.environ)
963
Victor Stinner839e5ea2013-04-14 16:43:03 +0200964 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200965 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200966 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200967 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200968
Victor Stinner839e5ea2013-04-14 16:43:03 +0200969 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200970 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200971 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200972 self.assertTrue(cm.exception.__suppress_context__)
973
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300974 def _test_environ_iteration(self, collection):
975 iterator = iter(collection)
976 new_key = "__new_key__"
977
978 next(iterator) # start iteration over os.environ.items
979
980 # add a new key in os.environ mapping
981 os.environ[new_key] = "test_environ_iteration"
982
983 try:
984 next(iterator) # force iteration over modified mapping
985 self.assertEqual(os.environ[new_key], "test_environ_iteration")
986 finally:
987 del os.environ[new_key]
988
989 def test_iter_error_when_changing_os_environ(self):
990 self._test_environ_iteration(os.environ)
991
992 def test_iter_error_when_changing_os_environ_items(self):
993 self._test_environ_iteration(os.environ.items())
994
995 def test_iter_error_when_changing_os_environ_values(self):
996 self._test_environ_iteration(os.environ.values())
997
Victor Stinner6d101392013-04-14 16:35:04 +0200998
Tim Petersc4e09402003-04-25 07:11:48 +0000999class WalkTests(unittest.TestCase):
1000 """Tests for os.walk()."""
1001
Victor Stinner0561c532015-03-12 10:28:24 +01001002 # Wrapper to hide minor differences between os.walk and os.fwalk
1003 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001004 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001005 if 'follow_symlinks' in kwargs:
1006 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001007 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001008
Charles-François Natali7372b062012-02-05 15:15:38 +01001009 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001010 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001011 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001012
1013 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001014 # TESTFN/
1015 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001016 # tmp1
1017 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001018 # tmp2
1019 # SUB11/ no kids
1020 # SUB2/ a file kid and a dirsymlink kid
1021 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001022 # SUB21/ not readable
1023 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001024 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001025 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001026 # broken_link2
1027 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001028 # TEST2/
1029 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001030 self.walk_path = join(support.TESTFN, "TEST1")
1031 self.sub1_path = join(self.walk_path, "SUB1")
1032 self.sub11_path = join(self.sub1_path, "SUB11")
1033 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001034 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001035 tmp1_path = join(self.walk_path, "tmp1")
1036 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001037 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001038 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001039 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001040 t2_path = join(support.TESTFN, "TEST2")
1041 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001042 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001043 broken_link2_path = join(sub2_path, "broken_link2")
1044 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001045
1046 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001047 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001048 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001049 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001050 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001051
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001052 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001053 with open(path, "x") as f:
1054 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001055
Victor Stinner0561c532015-03-12 10:28:24 +01001056 if support.can_symlink():
1057 os.symlink(os.path.abspath(t2_path), self.link_path)
1058 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001059 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1060 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001061 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001062 ["broken_link", "broken_link2", "broken_link3",
1063 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001064 else:
pxinwr3e028b22019-02-15 13:04:47 +08001065 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001066
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001067 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001068 try:
1069 os.listdir(sub21_path)
1070 except PermissionError:
1071 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1072 else:
1073 os.chmod(sub21_path, stat.S_IRWXU)
1074 os.unlink(tmp5_path)
1075 os.rmdir(sub21_path)
1076 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001077
Victor Stinner0561c532015-03-12 10:28:24 +01001078 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001079 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001080 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001081
Tim Petersc4e09402003-04-25 07:11:48 +00001082 self.assertEqual(len(all), 4)
1083 # We can't know which order SUB1 and SUB2 will appear in.
1084 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1085 # flipped: TESTFN, SUB2, SUB1, SUB11
1086 flipped = all[0][1][0] != "SUB1"
1087 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001088 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001089 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001090 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1091 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1092 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1093 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001094
Brett Cannon3f9183b2016-08-26 14:44:48 -07001095 def test_walk_prune(self, walk_path=None):
1096 if walk_path is None:
1097 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001098 # Prune the search.
1099 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001100 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001101 all.append((root, dirs, files))
1102 # Don't descend into SUB1.
1103 if 'SUB1' in dirs:
1104 # Note that this also mutates the dirs we appended to all!
1105 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001106
Victor Stinner0561c532015-03-12 10:28:24 +01001107 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001108 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001109
1110 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001111 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001112 self.assertEqual(all[1], self.sub2_tree)
1113
Brett Cannon3f9183b2016-08-26 14:44:48 -07001114 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001115 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001116
Victor Stinner0561c532015-03-12 10:28:24 +01001117 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001118 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001119 all = list(self.walk(self.walk_path, topdown=False))
1120
Victor Stinner53b0a412016-03-26 01:12:36 +01001121 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001122 # We can't know which order SUB1 and SUB2 will appear in.
1123 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1124 # flipped: SUB2, SUB11, SUB1, TESTFN
1125 flipped = all[3][1][0] != "SUB1"
1126 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001127 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001128 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001129 self.assertEqual(all[3],
1130 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1131 self.assertEqual(all[flipped],
1132 (self.sub11_path, [], []))
1133 self.assertEqual(all[flipped + 1],
1134 (self.sub1_path, ["SUB11"], ["tmp2"]))
1135 self.assertEqual(all[2 - 2 * flipped],
1136 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001137
Victor Stinner0561c532015-03-12 10:28:24 +01001138 def test_walk_symlink(self):
1139 if not support.can_symlink():
1140 self.skipTest("need symlink support")
1141
1142 # Walk, following symlinks.
1143 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1144 for root, dirs, files in walk_it:
1145 if root == self.link_path:
1146 self.assertEqual(dirs, [])
1147 self.assertEqual(files, ["tmp4"])
1148 break
1149 else:
1150 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001151
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001152 def test_walk_bad_dir(self):
1153 # Walk top-down.
1154 errors = []
1155 walk_it = self.walk(self.walk_path, onerror=errors.append)
1156 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001157 self.assertEqual(errors, [])
1158 dir1 = 'SUB1'
1159 path1 = os.path.join(root, dir1)
1160 path1new = os.path.join(root, dir1 + '.new')
1161 os.rename(path1, path1new)
1162 try:
1163 roots = [r for r, d, f in walk_it]
1164 self.assertTrue(errors)
1165 self.assertNotIn(path1, roots)
1166 self.assertNotIn(path1new, roots)
1167 for dir2 in dirs:
1168 if dir2 != dir1:
1169 self.assertIn(os.path.join(root, dir2), roots)
1170 finally:
1171 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001172
Charles-François Natali7372b062012-02-05 15:15:38 +01001173
1174@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1175class FwalkTests(WalkTests):
1176 """Tests for os.fwalk()."""
1177
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001178 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001179 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001180 yield (root, dirs, files)
1181
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001182 def fwalk(self, *args, **kwargs):
1183 return os.fwalk(*args, **kwargs)
1184
Larry Hastingsc48fe982012-06-25 04:49:05 -07001185 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1186 """
1187 compare with walk() results.
1188 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001189 walk_kwargs = walk_kwargs.copy()
1190 fwalk_kwargs = fwalk_kwargs.copy()
1191 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1192 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1193 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001194
Charles-François Natali7372b062012-02-05 15:15:38 +01001195 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001196 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001197 expected[root] = (set(dirs), set(files))
1198
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001199 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001200 self.assertIn(root, expected)
1201 self.assertEqual(expected[root], (set(dirs), set(files)))
1202
Larry Hastingsc48fe982012-06-25 04:49:05 -07001203 def test_compare_to_walk(self):
1204 kwargs = {'top': support.TESTFN}
1205 self._compare_to_walk(kwargs, kwargs)
1206
Charles-François Natali7372b062012-02-05 15:15:38 +01001207 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001208 try:
1209 fd = os.open(".", os.O_RDONLY)
1210 walk_kwargs = {'top': support.TESTFN}
1211 fwalk_kwargs = walk_kwargs.copy()
1212 fwalk_kwargs['dir_fd'] = fd
1213 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1214 finally:
1215 os.close(fd)
1216
1217 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001218 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001219 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1220 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001221 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001222 # check that the FD is valid
1223 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001224 # redundant check
1225 os.stat(rootfd)
1226 # check that listdir() returns consistent information
1227 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001228
1229 def test_fd_leak(self):
1230 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1231 # we both check that calling fwalk() a large number of times doesn't
1232 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1233 minfd = os.dup(1)
1234 os.close(minfd)
1235 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001236 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001237 pass
1238 newfd = os.dup(1)
1239 self.addCleanup(os.close, newfd)
1240 self.assertEqual(newfd, minfd)
1241
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001242class BytesWalkTests(WalkTests):
1243 """Tests for os.walk() with bytes."""
1244 def walk(self, top, **kwargs):
1245 if 'follow_symlinks' in kwargs:
1246 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1247 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1248 root = os.fsdecode(broot)
1249 dirs = list(map(os.fsdecode, bdirs))
1250 files = list(map(os.fsdecode, bfiles))
1251 yield (root, dirs, files)
1252 bdirs[:] = list(map(os.fsencode, dirs))
1253 bfiles[:] = list(map(os.fsencode, files))
1254
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001255@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1256class BytesFwalkTests(FwalkTests):
1257 """Tests for os.walk() with bytes."""
1258 def fwalk(self, top='.', *args, **kwargs):
1259 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1260 root = os.fsdecode(broot)
1261 dirs = list(map(os.fsdecode, bdirs))
1262 files = list(map(os.fsdecode, bfiles))
1263 yield (root, dirs, files, topfd)
1264 bdirs[:] = list(map(os.fsencode, dirs))
1265 bfiles[:] = list(map(os.fsencode, files))
1266
Charles-François Natali7372b062012-02-05 15:15:38 +01001267
Guido van Rossume7ba4952007-06-06 23:52:48 +00001268class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001270 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001271
1272 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001273 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001274 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1275 os.makedirs(path) # Should work
1276 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1277 os.makedirs(path)
1278
1279 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001280 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001281 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1282 os.makedirs(path)
1283 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1284 'dir5', 'dir6')
1285 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001286
Serhiy Storchakae304e332017-03-24 13:27:42 +02001287 def test_mode(self):
1288 with support.temp_umask(0o002):
1289 base = support.TESTFN
1290 parent = os.path.join(base, 'dir1')
1291 path = os.path.join(parent, 'dir2')
1292 os.makedirs(path, 0o555)
1293 self.assertTrue(os.path.exists(path))
1294 self.assertTrue(os.path.isdir(path))
1295 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001296 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1297 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001298
Terry Reedy5a22b652010-12-02 07:05:56 +00001299 def test_exist_ok_existing_directory(self):
1300 path = os.path.join(support.TESTFN, 'dir1')
1301 mode = 0o777
1302 old_mask = os.umask(0o022)
1303 os.makedirs(path, mode)
1304 self.assertRaises(OSError, os.makedirs, path, mode)
1305 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001306 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001307 os.makedirs(path, mode=mode, exist_ok=True)
1308 os.umask(old_mask)
1309
Martin Pantera82642f2015-11-19 04:48:44 +00001310 # Issue #25583: A drive root could raise PermissionError on Windows
1311 os.makedirs(os.path.abspath('/'), exist_ok=True)
1312
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001313 def test_exist_ok_s_isgid_directory(self):
1314 path = os.path.join(support.TESTFN, 'dir1')
1315 S_ISGID = stat.S_ISGID
1316 mode = 0o777
1317 old_mask = os.umask(0o022)
1318 try:
1319 existing_testfn_mode = stat.S_IMODE(
1320 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001321 try:
1322 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001323 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001324 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001325 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1326 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1327 # The os should apply S_ISGID from the parent dir for us, but
1328 # this test need not depend on that behavior. Be explicit.
1329 os.makedirs(path, mode | S_ISGID)
1330 # http://bugs.python.org/issue14992
1331 # Should not fail when the bit is already set.
1332 os.makedirs(path, mode, exist_ok=True)
1333 # remove the bit.
1334 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001335 # May work even when the bit is not already set when demanded.
1336 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001337 finally:
1338 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001339
1340 def test_exist_ok_existing_regular_file(self):
1341 base = support.TESTFN
1342 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001343 with open(path, 'w') as f:
1344 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001345 self.assertRaises(OSError, os.makedirs, path)
1346 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1347 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1348 os.remove(path)
1349
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001350 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001351 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001352 'dir4', 'dir5', 'dir6')
1353 # If the tests failed, the bottom-most directory ('../dir6')
1354 # may not have been created, so we look for the outermost directory
1355 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001356 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001357 path = os.path.dirname(path)
1358
1359 os.removedirs(path)
1360
Andrew Svetlov405faed2012-12-25 12:18:09 +02001361
R David Murrayf2ad1732014-12-25 18:36:56 -05001362@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1363class ChownFileTests(unittest.TestCase):
1364
Berker Peksag036a71b2015-07-21 09:29:48 +03001365 @classmethod
1366 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001367 os.mkdir(support.TESTFN)
1368
1369 def test_chown_uid_gid_arguments_must_be_index(self):
1370 stat = os.stat(support.TESTFN)
1371 uid = stat.st_uid
1372 gid = stat.st_gid
1373 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1374 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1375 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1376 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1377 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1378
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001379 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1380 def test_chown_gid(self):
1381 groups = os.getgroups()
1382 if len(groups) < 2:
1383 self.skipTest("test needs at least 2 groups")
1384
R David Murrayf2ad1732014-12-25 18:36:56 -05001385 gid_1, gid_2 = groups[:2]
1386 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001387
R David Murrayf2ad1732014-12-25 18:36:56 -05001388 os.chown(support.TESTFN, uid, gid_1)
1389 gid = os.stat(support.TESTFN).st_gid
1390 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001391
R David Murrayf2ad1732014-12-25 18:36:56 -05001392 os.chown(support.TESTFN, uid, gid_2)
1393 gid = os.stat(support.TESTFN).st_gid
1394 self.assertEqual(gid, gid_2)
1395
1396 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1397 "test needs root privilege and more than one user")
1398 def test_chown_with_root(self):
1399 uid_1, uid_2 = all_users[:2]
1400 gid = os.stat(support.TESTFN).st_gid
1401 os.chown(support.TESTFN, uid_1, gid)
1402 uid = os.stat(support.TESTFN).st_uid
1403 self.assertEqual(uid, uid_1)
1404 os.chown(support.TESTFN, uid_2, gid)
1405 uid = os.stat(support.TESTFN).st_uid
1406 self.assertEqual(uid, uid_2)
1407
1408 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1409 "test needs non-root account and more than one user")
1410 def test_chown_without_permission(self):
1411 uid_1, uid_2 = all_users[:2]
1412 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001413 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001414 os.chown(support.TESTFN, uid_1, gid)
1415 os.chown(support.TESTFN, uid_2, gid)
1416
Berker Peksag036a71b2015-07-21 09:29:48 +03001417 @classmethod
1418 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001419 os.rmdir(support.TESTFN)
1420
1421
Andrew Svetlov405faed2012-12-25 12:18:09 +02001422class RemoveDirsTests(unittest.TestCase):
1423 def setUp(self):
1424 os.makedirs(support.TESTFN)
1425
1426 def tearDown(self):
1427 support.rmtree(support.TESTFN)
1428
1429 def test_remove_all(self):
1430 dira = os.path.join(support.TESTFN, 'dira')
1431 os.mkdir(dira)
1432 dirb = os.path.join(dira, 'dirb')
1433 os.mkdir(dirb)
1434 os.removedirs(dirb)
1435 self.assertFalse(os.path.exists(dirb))
1436 self.assertFalse(os.path.exists(dira))
1437 self.assertFalse(os.path.exists(support.TESTFN))
1438
1439 def test_remove_partial(self):
1440 dira = os.path.join(support.TESTFN, 'dira')
1441 os.mkdir(dira)
1442 dirb = os.path.join(dira, 'dirb')
1443 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001444 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001445 os.removedirs(dirb)
1446 self.assertFalse(os.path.exists(dirb))
1447 self.assertTrue(os.path.exists(dira))
1448 self.assertTrue(os.path.exists(support.TESTFN))
1449
1450 def test_remove_nothing(self):
1451 dira = os.path.join(support.TESTFN, 'dira')
1452 os.mkdir(dira)
1453 dirb = os.path.join(dira, 'dirb')
1454 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001455 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001456 with self.assertRaises(OSError):
1457 os.removedirs(dirb)
1458 self.assertTrue(os.path.exists(dirb))
1459 self.assertTrue(os.path.exists(dira))
1460 self.assertTrue(os.path.exists(support.TESTFN))
1461
1462
Guido van Rossume7ba4952007-06-06 23:52:48 +00001463class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001464 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001465 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001466 f.write(b'hello')
1467 f.close()
1468 with open(os.devnull, 'rb') as f:
1469 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001470
Andrew Svetlov405faed2012-12-25 12:18:09 +02001471
Guido van Rossume7ba4952007-06-06 23:52:48 +00001472class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001473 def test_urandom_length(self):
1474 self.assertEqual(len(os.urandom(0)), 0)
1475 self.assertEqual(len(os.urandom(1)), 1)
1476 self.assertEqual(len(os.urandom(10)), 10)
1477 self.assertEqual(len(os.urandom(100)), 100)
1478 self.assertEqual(len(os.urandom(1000)), 1000)
1479
1480 def test_urandom_value(self):
1481 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001482 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001483 data2 = os.urandom(16)
1484 self.assertNotEqual(data1, data2)
1485
1486 def get_urandom_subprocess(self, count):
1487 code = '\n'.join((
1488 'import os, sys',
1489 'data = os.urandom(%s)' % count,
1490 'sys.stdout.buffer.write(data)',
1491 'sys.stdout.buffer.flush()'))
1492 out = assert_python_ok('-c', code)
1493 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001494 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001495 return stdout
1496
1497 def test_urandom_subprocess(self):
1498 data1 = self.get_urandom_subprocess(16)
1499 data2 = self.get_urandom_subprocess(16)
1500 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001501
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001502
Victor Stinner9b1f4742016-09-06 16:18:52 -07001503@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1504class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001505 @classmethod
1506 def setUpClass(cls):
1507 try:
1508 os.getrandom(1)
1509 except OSError as exc:
1510 if exc.errno == errno.ENOSYS:
1511 # Python compiled on a more recent Linux version
1512 # than the current Linux kernel
1513 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1514 else:
1515 raise
1516
Victor Stinner9b1f4742016-09-06 16:18:52 -07001517 def test_getrandom_type(self):
1518 data = os.getrandom(16)
1519 self.assertIsInstance(data, bytes)
1520 self.assertEqual(len(data), 16)
1521
1522 def test_getrandom0(self):
1523 empty = os.getrandom(0)
1524 self.assertEqual(empty, b'')
1525
1526 def test_getrandom_random(self):
1527 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1528
1529 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1530 # resource /dev/random
1531
1532 def test_getrandom_nonblock(self):
1533 # The call must not fail. Check also that the flag exists
1534 try:
1535 os.getrandom(1, os.GRND_NONBLOCK)
1536 except BlockingIOError:
1537 # System urandom is not initialized yet
1538 pass
1539
1540 def test_getrandom_value(self):
1541 data1 = os.getrandom(16)
1542 data2 = os.getrandom(16)
1543 self.assertNotEqual(data1, data2)
1544
1545
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001546# os.urandom() doesn't use a file descriptor when it is implemented with the
1547# getentropy() function, the getrandom() function or the getrandom() syscall
1548OS_URANDOM_DONT_USE_FD = (
1549 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1550 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1551 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001552
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001553@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1554 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001555@unittest.skipIf(sys.platform == "vxworks",
1556 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001557class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001558 @unittest.skipUnless(resource, "test requires the resource module")
1559 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001560 # Check urandom() failing when it is not able to open /dev/random.
1561 # We spawn a new process to make the test more robust (if getrlimit()
1562 # failed to restore the file descriptor limit after this, the whole
1563 # test suite would crash; this actually happened on the OS X Tiger
1564 # buildbot).
1565 code = """if 1:
1566 import errno
1567 import os
1568 import resource
1569
1570 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1571 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1572 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001573 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001574 except OSError as e:
1575 assert e.errno == errno.EMFILE, e.errno
1576 else:
1577 raise AssertionError("OSError not raised")
1578 """
1579 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001580
Antoine Pitroue472aea2014-04-26 14:33:03 +02001581 def test_urandom_fd_closed(self):
1582 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1583 # closed.
1584 code = """if 1:
1585 import os
1586 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001587 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001588 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001589 with test.support.SuppressCrashReport():
1590 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001591 sys.stdout.buffer.write(os.urandom(4))
1592 """
1593 rc, out, err = assert_python_ok('-Sc', code)
1594
1595 def test_urandom_fd_reopened(self):
1596 # Issue #21207: urandom() should detect its fd to /dev/urandom
1597 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001598 self.addCleanup(support.unlink, support.TESTFN)
1599 create_file(support.TESTFN, b"x" * 256)
1600
Antoine Pitroue472aea2014-04-26 14:33:03 +02001601 code = """if 1:
1602 import os
1603 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001604 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001605 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001606 with test.support.SuppressCrashReport():
1607 for fd in range(3, 256):
1608 try:
1609 os.close(fd)
1610 except OSError:
1611 pass
1612 else:
1613 # Found the urandom fd (XXX hopefully)
1614 break
1615 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001616 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001617 new_fd = f.fileno()
1618 # Issue #26935: posix allows new_fd and fd to be equal but
1619 # some libc implementations have dup2 return an error in this
1620 # case.
1621 if new_fd != fd:
1622 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001623 sys.stdout.buffer.write(os.urandom(4))
1624 sys.stdout.buffer.write(os.urandom(4))
1625 """.format(TESTFN=support.TESTFN)
1626 rc, out, err = assert_python_ok('-Sc', code)
1627 self.assertEqual(len(out), 8)
1628 self.assertNotEqual(out[0:4], out[4:8])
1629 rc, out2, err2 = assert_python_ok('-Sc', code)
1630 self.assertEqual(len(out2), 8)
1631 self.assertNotEqual(out2, out)
1632
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001633
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001634@contextlib.contextmanager
1635def _execvpe_mockup(defpath=None):
1636 """
1637 Stubs out execv and execve functions when used as context manager.
1638 Records exec calls. The mock execv and execve functions always raise an
1639 exception as they would normally never return.
1640 """
1641 # A list of tuples containing (function name, first arg, args)
1642 # of calls to execv or execve that have been made.
1643 calls = []
1644
1645 def mock_execv(name, *args):
1646 calls.append(('execv', name, args))
1647 raise RuntimeError("execv called")
1648
1649 def mock_execve(name, *args):
1650 calls.append(('execve', name, args))
1651 raise OSError(errno.ENOTDIR, "execve called")
1652
1653 try:
1654 orig_execv = os.execv
1655 orig_execve = os.execve
1656 orig_defpath = os.defpath
1657 os.execv = mock_execv
1658 os.execve = mock_execve
1659 if defpath is not None:
1660 os.defpath = defpath
1661 yield calls
1662 finally:
1663 os.execv = orig_execv
1664 os.execve = orig_execve
1665 os.defpath = orig_defpath
1666
pxinwrf2d7ac72019-05-21 18:46:37 +08001667@unittest.skipUnless(hasattr(os, 'execv'),
1668 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001669class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001670 @unittest.skipIf(USING_LINUXTHREADS,
1671 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001672 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001673 self.assertRaises(OSError, os.execvpe, 'no such app-',
1674 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001675
Steve Dowerbce26262016-11-19 19:17:26 -08001676 def test_execv_with_bad_arglist(self):
1677 self.assertRaises(ValueError, os.execv, 'notepad', ())
1678 self.assertRaises(ValueError, os.execv, 'notepad', [])
1679 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1680 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1681
Thomas Heller6790d602007-08-30 17:15:14 +00001682 def test_execvpe_with_bad_arglist(self):
1683 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001684 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1685 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001686
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001687 @unittest.skipUnless(hasattr(os, '_execvpe'),
1688 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001689 def _test_internal_execvpe(self, test_type):
1690 program_path = os.sep + 'absolutepath'
1691 if test_type is bytes:
1692 program = b'executable'
1693 fullpath = os.path.join(os.fsencode(program_path), program)
1694 native_fullpath = fullpath
1695 arguments = [b'progname', 'arg1', 'arg2']
1696 else:
1697 program = 'executable'
1698 arguments = ['progname', 'arg1', 'arg2']
1699 fullpath = os.path.join(program_path, program)
1700 if os.name != "nt":
1701 native_fullpath = os.fsencode(fullpath)
1702 else:
1703 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001704 env = {'spam': 'beans'}
1705
Victor Stinnerb745a742010-05-18 17:17:23 +00001706 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001707 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001708 self.assertRaises(RuntimeError,
1709 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001710 self.assertEqual(len(calls), 1)
1711 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1712
Victor Stinnerb745a742010-05-18 17:17:23 +00001713 # test os._execvpe() with a relative path:
1714 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001715 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001716 self.assertRaises(OSError,
1717 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001718 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001719 self.assertSequenceEqual(calls[0],
1720 ('execve', native_fullpath, (arguments, env)))
1721
1722 # test os._execvpe() with a relative path:
1723 # os.get_exec_path() reads the 'PATH' variable
1724 with _execvpe_mockup() as calls:
1725 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001726 if test_type is bytes:
1727 env_path[b'PATH'] = program_path
1728 else:
1729 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001730 self.assertRaises(OSError,
1731 os._execvpe, program, arguments, env=env_path)
1732 self.assertEqual(len(calls), 1)
1733 self.assertSequenceEqual(calls[0],
1734 ('execve', native_fullpath, (arguments, env_path)))
1735
1736 def test_internal_execvpe_str(self):
1737 self._test_internal_execvpe(str)
1738 if os.name != "nt":
1739 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001740
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001741 def test_execve_invalid_env(self):
1742 args = [sys.executable, '-c', 'pass']
1743
Ville Skyttä49b27342017-08-03 09:00:59 +03001744 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001745 newenv = os.environ.copy()
1746 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1747 with self.assertRaises(ValueError):
1748 os.execve(args[0], args, newenv)
1749
Ville Skyttä49b27342017-08-03 09:00:59 +03001750 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001751 newenv = os.environ.copy()
1752 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1753 with self.assertRaises(ValueError):
1754 os.execve(args[0], args, newenv)
1755
Ville Skyttä49b27342017-08-03 09:00:59 +03001756 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001757 newenv = os.environ.copy()
1758 newenv["FRUIT=ORANGE"] = "lemon"
1759 with self.assertRaises(ValueError):
1760 os.execve(args[0], args, newenv)
1761
Alexey Izbyshev83460312018-10-20 03:28:22 +03001762 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1763 def test_execve_with_empty_path(self):
1764 # bpo-32890: Check GetLastError() misuse
1765 try:
1766 os.execve('', ['arg'], {})
1767 except OSError as e:
1768 self.assertTrue(e.winerror is None or e.winerror != 0)
1769 else:
1770 self.fail('No OSError raised')
1771
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001772
Serhiy Storchaka43767632013-11-03 21:31:38 +02001773@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001774class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001775 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001776 try:
1777 os.stat(support.TESTFN)
1778 except FileNotFoundError:
1779 exists = False
1780 except OSError as exc:
1781 exists = True
1782 self.fail("file %s must not exist; os.stat failed with %s"
1783 % (support.TESTFN, exc))
1784 else:
1785 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001786
Thomas Wouters477c8d52006-05-27 19:21:47 +00001787 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001788 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001789
1790 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001791 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001792
1793 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001794 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001795
1796 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001797 self.addCleanup(support.unlink, support.TESTFN)
1798
Victor Stinnere77c9742016-03-25 10:28:23 +01001799 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001800 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001801
1802 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001803 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001804
Thomas Wouters477c8d52006-05-27 19:21:47 +00001805 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001806 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001807
Victor Stinnere77c9742016-03-25 10:28:23 +01001808
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001809class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001810 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001811 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1812 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001813 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001814 def get_single(f):
1815 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001816 if hasattr(os, f):
1817 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001818 return helper
1819 for f in singles:
1820 locals()["test_"+f] = get_single(f)
1821
Benjamin Peterson7522c742009-01-19 21:00:09 +00001822 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001823 try:
1824 f(support.make_bad_fd(), *args)
1825 except OSError as e:
1826 self.assertEqual(e.errno, errno.EBADF)
1827 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001828 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001829 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001830
Serhiy Storchaka43767632013-11-03 21:31:38 +02001831 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001832 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001833 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001834
Serhiy Storchaka43767632013-11-03 21:31:38 +02001835 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001836 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001837 fd = support.make_bad_fd()
1838 # Make sure none of the descriptors we are about to close are
1839 # currently valid (issue 6542).
1840 for i in range(10):
1841 try: os.fstat(fd+i)
1842 except OSError:
1843 pass
1844 else:
1845 break
1846 if i < 2:
1847 raise unittest.SkipTest(
1848 "Unable to acquire a range of invalid file descriptors")
1849 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001850
Serhiy Storchaka43767632013-11-03 21:31:38 +02001851 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001852 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001853 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001854
Serhiy Storchaka43767632013-11-03 21:31:38 +02001855 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001856 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001857 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001858
Serhiy Storchaka43767632013-11-03 21:31:38 +02001859 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001860 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001862
Serhiy Storchaka43767632013-11-03 21:31:38 +02001863 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001864 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865 self.check(os.pathconf, "PC_NAME_MAX")
1866 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001867
Serhiy Storchaka43767632013-11-03 21:31:38 +02001868 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001869 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001870 self.check(os.truncate, 0)
1871 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001872
Serhiy Storchaka43767632013-11-03 21:31:38 +02001873 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001874 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001875 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001876
Serhiy Storchaka43767632013-11-03 21:31:38 +02001877 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001878 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001879 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001880
Victor Stinner57ddf782014-01-08 15:21:28 +01001881 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1882 def test_readv(self):
1883 buf = bytearray(10)
1884 self.check(os.readv, [buf])
1885
Serhiy Storchaka43767632013-11-03 21:31:38 +02001886 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001887 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001888 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001889
Serhiy Storchaka43767632013-11-03 21:31:38 +02001890 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001891 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001893
Victor Stinner57ddf782014-01-08 15:21:28 +01001894 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1895 def test_writev(self):
1896 self.check(os.writev, [b'abc'])
1897
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001898 def test_inheritable(self):
1899 self.check(os.get_inheritable)
1900 self.check(os.set_inheritable, True)
1901
1902 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1903 'needs os.get_blocking() and os.set_blocking()')
1904 def test_blocking(self):
1905 self.check(os.get_blocking)
1906 self.check(os.set_blocking, True)
1907
Brian Curtin1b9df392010-11-24 20:24:31 +00001908
1909class LinkTests(unittest.TestCase):
1910 def setUp(self):
1911 self.file1 = support.TESTFN
1912 self.file2 = os.path.join(support.TESTFN + "2")
1913
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001914 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001915 for file in (self.file1, self.file2):
1916 if os.path.exists(file):
1917 os.unlink(file)
1918
Brian Curtin1b9df392010-11-24 20:24:31 +00001919 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001920 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001921
xdegaye6a55d092017-11-12 17:57:04 +01001922 try:
1923 os.link(file1, file2)
1924 except PermissionError as e:
1925 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001926 with open(file1, "r") as f1, open(file2, "r") as f2:
1927 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1928
1929 def test_link(self):
1930 self._test_link(self.file1, self.file2)
1931
1932 def test_link_bytes(self):
1933 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1934 bytes(self.file2, sys.getfilesystemencoding()))
1935
Brian Curtinf498b752010-11-30 15:54:04 +00001936 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001937 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001938 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001939 except UnicodeError:
1940 raise unittest.SkipTest("Unable to encode for this platform.")
1941
Brian Curtinf498b752010-11-30 15:54:04 +00001942 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001943 self.file2 = self.file1 + "2"
1944 self._test_link(self.file1, self.file2)
1945
Serhiy Storchaka43767632013-11-03 21:31:38 +02001946@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1947class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001948 # uid_t and gid_t are 32-bit unsigned integers on Linux
1949 UID_OVERFLOW = (1 << 32)
1950 GID_OVERFLOW = (1 << 32)
1951
Serhiy Storchaka43767632013-11-03 21:31:38 +02001952 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1953 def test_setuid(self):
1954 if os.getuid() != 0:
1955 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001956 self.assertRaises(TypeError, os.setuid, 'not an int')
1957 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001958
Serhiy Storchaka43767632013-11-03 21:31:38 +02001959 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1960 def test_setgid(self):
1961 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1962 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001963 self.assertRaises(TypeError, os.setgid, 'not an int')
1964 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001965
Serhiy Storchaka43767632013-11-03 21:31:38 +02001966 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1967 def test_seteuid(self):
1968 if os.getuid() != 0:
1969 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001970 self.assertRaises(TypeError, os.setegid, 'not an int')
1971 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001972
Serhiy Storchaka43767632013-11-03 21:31:38 +02001973 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1974 def test_setegid(self):
1975 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1976 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001977 self.assertRaises(TypeError, os.setegid, 'not an int')
1978 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001979
Serhiy Storchaka43767632013-11-03 21:31:38 +02001980 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1981 def test_setreuid(self):
1982 if os.getuid() != 0:
1983 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001984 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1985 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1986 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1987 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001988
Serhiy Storchaka43767632013-11-03 21:31:38 +02001989 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1990 def test_setreuid_neg1(self):
1991 # Needs to accept -1. We run this in a subprocess to avoid
1992 # altering the test runner's process state (issue8045).
1993 subprocess.check_call([
1994 sys.executable, '-c',
1995 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001996
Serhiy Storchaka43767632013-11-03 21:31:38 +02001997 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1998 def test_setregid(self):
1999 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2000 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002001 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2002 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2003 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2004 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2007 def test_setregid_neg1(self):
2008 # Needs to accept -1. We run this in a subprocess to avoid
2009 # altering the test runner's process state (issue8045).
2010 subprocess.check_call([
2011 sys.executable, '-c',
2012 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002013
Serhiy Storchaka43767632013-11-03 21:31:38 +02002014@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2015class Pep383Tests(unittest.TestCase):
2016 def setUp(self):
2017 if support.TESTFN_UNENCODABLE:
2018 self.dir = support.TESTFN_UNENCODABLE
2019 elif support.TESTFN_NONASCII:
2020 self.dir = support.TESTFN_NONASCII
2021 else:
2022 self.dir = support.TESTFN
2023 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002024
Serhiy Storchaka43767632013-11-03 21:31:38 +02002025 bytesfn = []
2026 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002027 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002028 fn = os.fsencode(fn)
2029 except UnicodeEncodeError:
2030 return
2031 bytesfn.append(fn)
2032 add_filename(support.TESTFN_UNICODE)
2033 if support.TESTFN_UNENCODABLE:
2034 add_filename(support.TESTFN_UNENCODABLE)
2035 if support.TESTFN_NONASCII:
2036 add_filename(support.TESTFN_NONASCII)
2037 if not bytesfn:
2038 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002039
Serhiy Storchaka43767632013-11-03 21:31:38 +02002040 self.unicodefn = set()
2041 os.mkdir(self.dir)
2042 try:
2043 for fn in bytesfn:
2044 support.create_empty_file(os.path.join(self.bdir, fn))
2045 fn = os.fsdecode(fn)
2046 if fn in self.unicodefn:
2047 raise ValueError("duplicate filename")
2048 self.unicodefn.add(fn)
2049 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002050 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002051 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002052
Serhiy Storchaka43767632013-11-03 21:31:38 +02002053 def tearDown(self):
2054 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002055
Serhiy Storchaka43767632013-11-03 21:31:38 +02002056 def test_listdir(self):
2057 expected = self.unicodefn
2058 found = set(os.listdir(self.dir))
2059 self.assertEqual(found, expected)
2060 # test listdir without arguments
2061 current_directory = os.getcwd()
2062 try:
2063 os.chdir(os.sep)
2064 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2065 finally:
2066 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002067
Serhiy Storchaka43767632013-11-03 21:31:38 +02002068 def test_open(self):
2069 for fn in self.unicodefn:
2070 f = open(os.path.join(self.dir, fn), 'rb')
2071 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002072
Serhiy Storchaka43767632013-11-03 21:31:38 +02002073 @unittest.skipUnless(hasattr(os, 'statvfs'),
2074 "need os.statvfs()")
2075 def test_statvfs(self):
2076 # issue #9645
2077 for fn in self.unicodefn:
2078 # should not fail with file not found error
2079 fullname = os.path.join(self.dir, fn)
2080 os.statvfs(fullname)
2081
2082 def test_stat(self):
2083 for fn in self.unicodefn:
2084 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002085
Brian Curtineb24d742010-04-12 17:16:38 +00002086@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2087class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002088 def _kill(self, sig):
2089 # Start sys.executable as a subprocess and communicate from the
2090 # subprocess to the parent that the interpreter is ready. When it
2091 # becomes ready, send *sig* via os.kill to the subprocess and check
2092 # that the return code is equal to *sig*.
2093 import ctypes
2094 from ctypes import wintypes
2095 import msvcrt
2096
2097 # Since we can't access the contents of the process' stdout until the
2098 # process has exited, use PeekNamedPipe to see what's inside stdout
2099 # without waiting. This is done so we can tell that the interpreter
2100 # is started and running at a point where it could handle a signal.
2101 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2102 PeekNamedPipe.restype = wintypes.BOOL
2103 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2104 ctypes.POINTER(ctypes.c_char), # stdout buf
2105 wintypes.DWORD, # Buffer size
2106 ctypes.POINTER(wintypes.DWORD), # bytes read
2107 ctypes.POINTER(wintypes.DWORD), # bytes avail
2108 ctypes.POINTER(wintypes.DWORD)) # bytes left
2109 msg = "running"
2110 proc = subprocess.Popen([sys.executable, "-c",
2111 "import sys;"
2112 "sys.stdout.write('{}');"
2113 "sys.stdout.flush();"
2114 "input()".format(msg)],
2115 stdout=subprocess.PIPE,
2116 stderr=subprocess.PIPE,
2117 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002118 self.addCleanup(proc.stdout.close)
2119 self.addCleanup(proc.stderr.close)
2120 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002121
2122 count, max = 0, 100
2123 while count < max and proc.poll() is None:
2124 # Create a string buffer to store the result of stdout from the pipe
2125 buf = ctypes.create_string_buffer(len(msg))
2126 # Obtain the text currently in proc.stdout
2127 # Bytes read/avail/left are left as NULL and unused
2128 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2129 buf, ctypes.sizeof(buf), None, None, None)
2130 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2131 if buf.value:
2132 self.assertEqual(msg, buf.value.decode())
2133 break
2134 time.sleep(0.1)
2135 count += 1
2136 else:
2137 self.fail("Did not receive communication from the subprocess")
2138
Brian Curtineb24d742010-04-12 17:16:38 +00002139 os.kill(proc.pid, sig)
2140 self.assertEqual(proc.wait(), sig)
2141
2142 def test_kill_sigterm(self):
2143 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002144 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002145
2146 def test_kill_int(self):
2147 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002148 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002149
2150 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002151 tagname = "test_os_%s" % uuid.uuid1()
2152 m = mmap.mmap(-1, 1, tagname)
2153 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002154 # Run a script which has console control handling enabled.
2155 proc = subprocess.Popen([sys.executable,
2156 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002157 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002158 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2159 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002160 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002161 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002162 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002163 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002164 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002165 count += 1
2166 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002167 # Forcefully kill the process if we weren't able to signal it.
2168 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002169 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002170 os.kill(proc.pid, event)
2171 # proc.send_signal(event) could also be done here.
2172 # Allow time for the signal to be passed and the process to exit.
2173 time.sleep(0.5)
2174 if not proc.poll():
2175 # Forcefully kill the process if we weren't able to signal it.
2176 os.kill(proc.pid, signal.SIGINT)
2177 self.fail("subprocess did not stop on {}".format(name))
2178
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002179 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002180 def test_CTRL_C_EVENT(self):
2181 from ctypes import wintypes
2182 import ctypes
2183
2184 # Make a NULL value by creating a pointer with no argument.
2185 NULL = ctypes.POINTER(ctypes.c_int)()
2186 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2187 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2188 wintypes.BOOL)
2189 SetConsoleCtrlHandler.restype = wintypes.BOOL
2190
2191 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002192 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002193 # by subprocesses.
2194 SetConsoleCtrlHandler(NULL, 0)
2195
2196 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2197
2198 def test_CTRL_BREAK_EVENT(self):
2199 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2200
2201
Brian Curtind40e6f72010-07-08 21:39:08 +00002202@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002203class Win32ListdirTests(unittest.TestCase):
2204 """Test listdir on Windows."""
2205
2206 def setUp(self):
2207 self.created_paths = []
2208 for i in range(2):
2209 dir_name = 'SUB%d' % i
2210 dir_path = os.path.join(support.TESTFN, dir_name)
2211 file_name = 'FILE%d' % i
2212 file_path = os.path.join(support.TESTFN, file_name)
2213 os.makedirs(dir_path)
2214 with open(file_path, 'w') as f:
2215 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2216 self.created_paths.extend([dir_name, file_name])
2217 self.created_paths.sort()
2218
2219 def tearDown(self):
2220 shutil.rmtree(support.TESTFN)
2221
2222 def test_listdir_no_extended_path(self):
2223 """Test when the path is not an "extended" path."""
2224 # unicode
2225 self.assertEqual(
2226 sorted(os.listdir(support.TESTFN)),
2227 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002228
Tim Golden781bbeb2013-10-25 20:24:06 +01002229 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002230 self.assertEqual(
2231 sorted(os.listdir(os.fsencode(support.TESTFN))),
2232 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002233
2234 def test_listdir_extended_path(self):
2235 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002236 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002237 # unicode
2238 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2239 self.assertEqual(
2240 sorted(os.listdir(path)),
2241 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002242
Tim Golden781bbeb2013-10-25 20:24:06 +01002243 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002244 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2245 self.assertEqual(
2246 sorted(os.listdir(path)),
2247 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002248
2249
Berker Peksage0b5b202018-08-15 13:03:41 +03002250@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2251class ReadlinkTests(unittest.TestCase):
2252 filelink = 'readlinktest'
2253 filelink_target = os.path.abspath(__file__)
2254 filelinkb = os.fsencode(filelink)
2255 filelinkb_target = os.fsencode(filelink_target)
2256
2257 def setUp(self):
2258 self.assertTrue(os.path.exists(self.filelink_target))
2259 self.assertTrue(os.path.exists(self.filelinkb_target))
2260 self.assertFalse(os.path.exists(self.filelink))
2261 self.assertFalse(os.path.exists(self.filelinkb))
2262
2263 def test_not_symlink(self):
2264 filelink_target = FakePath(self.filelink_target)
2265 self.assertRaises(OSError, os.readlink, self.filelink_target)
2266 self.assertRaises(OSError, os.readlink, filelink_target)
2267
2268 def test_missing_link(self):
2269 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2270 self.assertRaises(FileNotFoundError, os.readlink,
2271 FakePath('missing-link'))
2272
2273 @support.skip_unless_symlink
2274 def test_pathlike(self):
2275 os.symlink(self.filelink_target, self.filelink)
2276 self.addCleanup(support.unlink, self.filelink)
2277 filelink = FakePath(self.filelink)
2278 self.assertEqual(os.readlink(filelink), self.filelink_target)
2279
2280 @support.skip_unless_symlink
2281 def test_pathlike_bytes(self):
2282 os.symlink(self.filelinkb_target, self.filelinkb)
2283 self.addCleanup(support.unlink, self.filelinkb)
2284 path = os.readlink(FakePath(self.filelinkb))
2285 self.assertEqual(path, self.filelinkb_target)
2286 self.assertIsInstance(path, bytes)
2287
2288 @support.skip_unless_symlink
2289 def test_bytes(self):
2290 os.symlink(self.filelinkb_target, self.filelinkb)
2291 self.addCleanup(support.unlink, self.filelinkb)
2292 path = os.readlink(self.filelinkb)
2293 self.assertEqual(path, self.filelinkb_target)
2294 self.assertIsInstance(path, bytes)
2295
2296
Tim Golden781bbeb2013-10-25 20:24:06 +01002297@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002298@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002299class Win32SymlinkTests(unittest.TestCase):
2300 filelink = 'filelinktest'
2301 filelink_target = os.path.abspath(__file__)
2302 dirlink = 'dirlinktest'
2303 dirlink_target = os.path.dirname(filelink_target)
2304 missing_link = 'missing link'
2305
2306 def setUp(self):
2307 assert os.path.exists(self.dirlink_target)
2308 assert os.path.exists(self.filelink_target)
2309 assert not os.path.exists(self.dirlink)
2310 assert not os.path.exists(self.filelink)
2311 assert not os.path.exists(self.missing_link)
2312
2313 def tearDown(self):
2314 if os.path.exists(self.filelink):
2315 os.remove(self.filelink)
2316 if os.path.exists(self.dirlink):
2317 os.rmdir(self.dirlink)
2318 if os.path.lexists(self.missing_link):
2319 os.remove(self.missing_link)
2320
2321 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002322 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002323 self.assertTrue(os.path.exists(self.dirlink))
2324 self.assertTrue(os.path.isdir(self.dirlink))
2325 self.assertTrue(os.path.islink(self.dirlink))
2326 self.check_stat(self.dirlink, self.dirlink_target)
2327
2328 def test_file_link(self):
2329 os.symlink(self.filelink_target, self.filelink)
2330 self.assertTrue(os.path.exists(self.filelink))
2331 self.assertTrue(os.path.isfile(self.filelink))
2332 self.assertTrue(os.path.islink(self.filelink))
2333 self.check_stat(self.filelink, self.filelink_target)
2334
2335 def _create_missing_dir_link(self):
2336 'Create a "directory" link to a non-existent target'
2337 linkname = self.missing_link
2338 if os.path.lexists(linkname):
2339 os.remove(linkname)
2340 target = r'c:\\target does not exist.29r3c740'
2341 assert not os.path.exists(target)
2342 target_is_dir = True
2343 os.symlink(target, linkname, target_is_dir)
2344
2345 def test_remove_directory_link_to_missing_target(self):
2346 self._create_missing_dir_link()
2347 # For compatibility with Unix, os.remove will check the
2348 # directory status and call RemoveDirectory if the symlink
2349 # was created with target_is_dir==True.
2350 os.remove(self.missing_link)
2351
2352 @unittest.skip("currently fails; consider for improvement")
2353 def test_isdir_on_directory_link_to_missing_target(self):
2354 self._create_missing_dir_link()
2355 # consider having isdir return true for directory links
2356 self.assertTrue(os.path.isdir(self.missing_link))
2357
2358 @unittest.skip("currently fails; consider for improvement")
2359 def test_rmdir_on_directory_link_to_missing_target(self):
2360 self._create_missing_dir_link()
2361 # consider allowing rmdir to remove directory links
2362 os.rmdir(self.missing_link)
2363
2364 def check_stat(self, link, target):
2365 self.assertEqual(os.stat(link), os.stat(target))
2366 self.assertNotEqual(os.lstat(link), os.stat(link))
2367
Brian Curtind25aef52011-06-13 15:16:04 -05002368 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002369 self.assertEqual(os.stat(bytes_link), os.stat(target))
2370 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002371
2372 def test_12084(self):
2373 level1 = os.path.abspath(support.TESTFN)
2374 level2 = os.path.join(level1, "level2")
2375 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002376 self.addCleanup(support.rmtree, level1)
2377
2378 os.mkdir(level1)
2379 os.mkdir(level2)
2380 os.mkdir(level3)
2381
2382 file1 = os.path.abspath(os.path.join(level1, "file1"))
2383 create_file(file1)
2384
2385 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002386 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002387 os.chdir(level2)
2388 link = os.path.join(level2, "link")
2389 os.symlink(os.path.relpath(file1), "link")
2390 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002391
Victor Stinnerae39d232016-03-24 17:12:55 +01002392 # Check os.stat calls from the same dir as the link
2393 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002394
Victor Stinnerae39d232016-03-24 17:12:55 +01002395 # Check os.stat calls from a dir below the link
2396 os.chdir(level1)
2397 self.assertEqual(os.stat(file1),
2398 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002399
Victor Stinnerae39d232016-03-24 17:12:55 +01002400 # Check os.stat calls from a dir above the link
2401 os.chdir(level3)
2402 self.assertEqual(os.stat(file1),
2403 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002404 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002405 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002406
SSE43c34aad2018-02-13 00:10:35 +07002407 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2408 and os.path.exists(r'C:\ProgramData'),
2409 'Test directories not found')
2410 def test_29248(self):
2411 # os.symlink() calls CreateSymbolicLink, which creates
2412 # the reparse data buffer with the print name stored
2413 # first, so the offset is always 0. CreateSymbolicLink
2414 # stores the "PrintName" DOS path (e.g. "C:\") first,
2415 # with an offset of 0, followed by the "SubstituteName"
2416 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2417 # the other hand, seems to have been created manually
2418 # with an inverted order.
2419 target = os.readlink(r'C:\Users\All Users')
2420 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2421
Steve Dower6921e732018-03-05 14:26:08 -08002422 def test_buffer_overflow(self):
2423 # Older versions would have a buffer overflow when detecting
2424 # whether a link source was a directory. This test ensures we
2425 # no longer crash, but does not otherwise validate the behavior
2426 segment = 'X' * 27
2427 path = os.path.join(*[segment] * 10)
2428 test_cases = [
2429 # overflow with absolute src
2430 ('\\' + path, segment),
2431 # overflow dest with relative src
2432 (segment, path),
2433 # overflow when joining src
2434 (path[:180], path[:180]),
2435 ]
2436 for src, dest in test_cases:
2437 try:
2438 os.symlink(src, dest)
2439 except FileNotFoundError:
2440 pass
2441 else:
2442 try:
2443 os.remove(dest)
2444 except OSError:
2445 pass
2446 # Also test with bytes, since that is a separate code path.
2447 try:
2448 os.symlink(os.fsencode(src), os.fsencode(dest))
2449 except FileNotFoundError:
2450 pass
2451 else:
2452 try:
2453 os.remove(dest)
2454 except OSError:
2455 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002456
Tim Golden0321cf22014-05-05 19:46:17 +01002457@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2458class Win32JunctionTests(unittest.TestCase):
2459 junction = 'junctiontest'
2460 junction_target = os.path.dirname(os.path.abspath(__file__))
2461
2462 def setUp(self):
2463 assert os.path.exists(self.junction_target)
2464 assert not os.path.exists(self.junction)
2465
2466 def tearDown(self):
2467 if os.path.exists(self.junction):
2468 # os.rmdir delegates to Windows' RemoveDirectoryW,
2469 # which removes junction points safely.
2470 os.rmdir(self.junction)
2471
2472 def test_create_junction(self):
2473 _winapi.CreateJunction(self.junction_target, self.junction)
2474 self.assertTrue(os.path.exists(self.junction))
2475 self.assertTrue(os.path.isdir(self.junction))
2476
2477 # Junctions are not recognized as links.
2478 self.assertFalse(os.path.islink(self.junction))
2479
2480 def test_unlink_removes_junction(self):
2481 _winapi.CreateJunction(self.junction_target, self.junction)
2482 self.assertTrue(os.path.exists(self.junction))
2483
2484 os.unlink(self.junction)
2485 self.assertFalse(os.path.exists(self.junction))
2486
Mark Becwarb82bfac2019-02-02 16:08:23 -05002487@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2488class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002489 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002490 nt = support.import_module('nt')
2491 ctypes = support.import_module('ctypes')
2492 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002493
2494 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2495 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2496
2497 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2498 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2499 ctypes.wintypes.LPDWORD)
2500
2501 # This is a pseudo-handle that doesn't need to be closed
2502 hproc = kernel.GetCurrentProcess()
2503
2504 handle_count = ctypes.wintypes.DWORD()
2505 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2506 self.assertEqual(1, ok)
2507
2508 before_count = handle_count.value
2509
2510 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002511 filenames = [
2512 r'\\?\C:',
2513 r'\\?\NUL',
2514 r'\\?\CONIN',
2515 __file__,
2516 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002517
Berker Peksag6ef726a2019-04-22 18:46:28 +03002518 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002519 for name in filenames:
2520 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002521 nt._getfinalpathname(name)
2522 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002523 # Failure is expected
2524 pass
2525 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002526 os.stat(name)
2527 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002528 pass
2529
2530 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2531 self.assertEqual(1, ok)
2532
2533 handle_delta = handle_count.value - before_count
2534
2535 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002536
Jason R. Coombs3a092862013-05-27 23:21:28 -04002537@support.skip_unless_symlink
2538class NonLocalSymlinkTests(unittest.TestCase):
2539
2540 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002541 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002542 Create this structure:
2543
2544 base
2545 \___ some_dir
2546 """
2547 os.makedirs('base/some_dir')
2548
2549 def tearDown(self):
2550 shutil.rmtree('base')
2551
2552 def test_directory_link_nonlocal(self):
2553 """
2554 The symlink target should resolve relative to the link, not relative
2555 to the current directory.
2556
2557 Then, link base/some_link -> base/some_dir and ensure that some_link
2558 is resolved as a directory.
2559
2560 In issue13772, it was discovered that directory detection failed if
2561 the symlink target was not specified relative to the current
2562 directory, which was a defect in the implementation.
2563 """
2564 src = os.path.join('base', 'some_link')
2565 os.symlink('some_dir', src)
2566 assert os.path.isdir(src)
2567
2568
Victor Stinnere8d51452010-08-19 01:05:19 +00002569class FSEncodingTests(unittest.TestCase):
2570 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2572 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002573
Victor Stinnere8d51452010-08-19 01:05:19 +00002574 def test_identity(self):
2575 # assert fsdecode(fsencode(x)) == x
2576 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2577 try:
2578 bytesfn = os.fsencode(fn)
2579 except UnicodeEncodeError:
2580 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002581 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002582
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002583
Brett Cannonefb00c02012-02-29 18:31:31 -05002584
2585class DeviceEncodingTests(unittest.TestCase):
2586
2587 def test_bad_fd(self):
2588 # Return None when an fd doesn't actually exist.
2589 self.assertIsNone(os.device_encoding(123456))
2590
Paul Monson62dfd7d2019-04-25 11:36:45 -07002591 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002592 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002593 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002594 def test_device_encoding(self):
2595 encoding = os.device_encoding(0)
2596 self.assertIsNotNone(encoding)
2597 self.assertTrue(codecs.lookup(encoding))
2598
2599
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002600class PidTests(unittest.TestCase):
2601 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2602 def test_getppid(self):
2603 p = subprocess.Popen([sys.executable, '-c',
2604 'import os; print(os.getppid())'],
2605 stdout=subprocess.PIPE)
2606 stdout, _ = p.communicate()
2607 # We are the parent of our subprocess
2608 self.assertEqual(int(stdout), os.getpid())
2609
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002610 def test_waitpid(self):
2611 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002612 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002613 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002614 status = os.waitpid(pid, 0)
2615 self.assertEqual(status, (pid, 0))
2616
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002617
Victor Stinner4659ccf2016-09-14 10:57:00 +02002618class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002619 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002620 self.exitcode = 17
2621
2622 filename = support.TESTFN
2623 self.addCleanup(support.unlink, filename)
2624
2625 if not with_env:
2626 code = 'import sys; sys.exit(%s)' % self.exitcode
2627 else:
2628 self.env = dict(os.environ)
2629 # create an unique key
2630 self.key = str(uuid.uuid4())
2631 self.env[self.key] = self.key
2632 # read the variable from os.environ to check that it exists
2633 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2634 % (self.key, self.exitcode))
2635
2636 with open(filename, "w") as fp:
2637 fp.write(code)
2638
Berker Peksag81816462016-09-15 20:19:47 +03002639 args = [sys.executable, filename]
2640 if use_bytes:
2641 args = [os.fsencode(a) for a in args]
2642 self.env = {os.fsencode(k): os.fsencode(v)
2643 for k, v in self.env.items()}
2644
2645 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002646
Berker Peksag4af23d72016-09-15 20:32:44 +03002647 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002648 def test_spawnl(self):
2649 args = self.create_args()
2650 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2651 self.assertEqual(exitcode, self.exitcode)
2652
Berker Peksag4af23d72016-09-15 20:32:44 +03002653 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002654 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002655 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002656 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2657 self.assertEqual(exitcode, self.exitcode)
2658
Berker Peksag4af23d72016-09-15 20:32:44 +03002659 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002660 def test_spawnlp(self):
2661 args = self.create_args()
2662 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2663 self.assertEqual(exitcode, self.exitcode)
2664
Berker Peksag4af23d72016-09-15 20:32:44 +03002665 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002666 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002667 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002668 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2669 self.assertEqual(exitcode, self.exitcode)
2670
Berker Peksag4af23d72016-09-15 20:32:44 +03002671 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002672 def test_spawnv(self):
2673 args = self.create_args()
2674 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2675 self.assertEqual(exitcode, self.exitcode)
2676
Berker Peksag4af23d72016-09-15 20:32:44 +03002677 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002678 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002679 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002680 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2681 self.assertEqual(exitcode, self.exitcode)
2682
Berker Peksag4af23d72016-09-15 20:32:44 +03002683 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002684 def test_spawnvp(self):
2685 args = self.create_args()
2686 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2687 self.assertEqual(exitcode, self.exitcode)
2688
Berker Peksag4af23d72016-09-15 20:32:44 +03002689 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002690 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002691 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002692 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2693 self.assertEqual(exitcode, self.exitcode)
2694
Berker Peksag4af23d72016-09-15 20:32:44 +03002695 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002696 def test_nowait(self):
2697 args = self.create_args()
2698 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2699 result = os.waitpid(pid, 0)
2700 self.assertEqual(result[0], pid)
2701 status = result[1]
2702 if hasattr(os, 'WIFEXITED'):
2703 self.assertTrue(os.WIFEXITED(status))
2704 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2705 else:
2706 self.assertEqual(status, self.exitcode << 8)
2707
Berker Peksag4af23d72016-09-15 20:32:44 +03002708 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002709 def test_spawnve_bytes(self):
2710 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2711 args = self.create_args(with_env=True, use_bytes=True)
2712 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2713 self.assertEqual(exitcode, self.exitcode)
2714
Steve Dower859fd7b2016-11-19 18:53:19 -08002715 @requires_os_func('spawnl')
2716 def test_spawnl_noargs(self):
2717 args = self.create_args()
2718 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002719 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002720
2721 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002722 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002723 args = self.create_args()
2724 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002725 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002726
2727 @requires_os_func('spawnv')
2728 def test_spawnv_noargs(self):
2729 args = self.create_args()
2730 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2731 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002732 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2733 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002734
2735 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002736 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002737 args = self.create_args()
2738 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2739 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002740 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2741 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002742
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002743 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002744 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002745
Ville Skyttä49b27342017-08-03 09:00:59 +03002746 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002747 newenv = os.environ.copy()
2748 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2749 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002750 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002751 except ValueError:
2752 pass
2753 else:
2754 self.assertEqual(exitcode, 127)
2755
Ville Skyttä49b27342017-08-03 09:00:59 +03002756 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002757 newenv = os.environ.copy()
2758 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2759 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002760 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002761 except ValueError:
2762 pass
2763 else:
2764 self.assertEqual(exitcode, 127)
2765
Ville Skyttä49b27342017-08-03 09:00:59 +03002766 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002767 newenv = os.environ.copy()
2768 newenv["FRUIT=ORANGE"] = "lemon"
2769 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002770 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002771 except ValueError:
2772 pass
2773 else:
2774 self.assertEqual(exitcode, 127)
2775
Ville Skyttä49b27342017-08-03 09:00:59 +03002776 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002777 filename = support.TESTFN
2778 self.addCleanup(support.unlink, filename)
2779 with open(filename, "w") as fp:
2780 fp.write('import sys, os\n'
2781 'if os.getenv("FRUIT") != "orange=lemon":\n'
2782 ' raise AssertionError')
2783 args = [sys.executable, filename]
2784 newenv = os.environ.copy()
2785 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002786 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002787 self.assertEqual(exitcode, 0)
2788
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002789 @requires_os_func('spawnve')
2790 def test_spawnve_invalid_env(self):
2791 self._test_invalid_env(os.spawnve)
2792
2793 @requires_os_func('spawnvpe')
2794 def test_spawnvpe_invalid_env(self):
2795 self._test_invalid_env(os.spawnvpe)
2796
Serhiy Storchaka77703942017-06-25 07:33:01 +03002797
Brian Curtin0151b8e2010-09-24 13:43:43 +00002798# The introduction of this TestCase caused at least two different errors on
2799# *nix buildbots. Temporarily skip this to let the buildbots move along.
2800@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002801@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2802class LoginTests(unittest.TestCase):
2803 def test_getlogin(self):
2804 user_name = os.getlogin()
2805 self.assertNotEqual(len(user_name), 0)
2806
2807
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002808@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2809 "needs os.getpriority and os.setpriority")
2810class ProgramPriorityTests(unittest.TestCase):
2811 """Tests for os.getpriority() and os.setpriority()."""
2812
2813 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002814
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002815 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2816 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2817 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002818 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2819 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002820 raise unittest.SkipTest("unable to reliably test setpriority "
2821 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002822 else:
2823 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002824 finally:
2825 try:
2826 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2827 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002828 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002829 raise
2830
2831
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002832class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002833
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002834 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002835
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002836 def __init__(self, conn):
2837 asynchat.async_chat.__init__(self, conn)
2838 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002839 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002840 self.closed = False
2841 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002842
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002843 def handle_read(self):
2844 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002845 if self.accumulate:
2846 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002847
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002848 def get_data(self):
2849 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002850
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002851 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002852 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002853 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002854
2855 def handle_error(self):
2856 raise
2857
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002858 def __init__(self, address):
2859 threading.Thread.__init__(self)
2860 asyncore.dispatcher.__init__(self)
2861 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2862 self.bind(address)
2863 self.listen(5)
2864 self.host, self.port = self.socket.getsockname()[:2]
2865 self.handler_instance = None
2866 self._active = False
2867 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002868
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002869 # --- public API
2870
2871 @property
2872 def running(self):
2873 return self._active
2874
2875 def start(self):
2876 assert not self.running
2877 self.__flag = threading.Event()
2878 threading.Thread.start(self)
2879 self.__flag.wait()
2880
2881 def stop(self):
2882 assert self.running
2883 self._active = False
2884 self.join()
2885
2886 def wait(self):
2887 # wait for handler connection to be closed, then stop the server
2888 while not getattr(self.handler_instance, "closed", False):
2889 time.sleep(0.001)
2890 self.stop()
2891
2892 # --- internals
2893
2894 def run(self):
2895 self._active = True
2896 self.__flag.set()
2897 while self._active and asyncore.socket_map:
2898 self._active_lock.acquire()
2899 asyncore.loop(timeout=0.001, count=1)
2900 self._active_lock.release()
2901 asyncore.close_all()
2902
2903 def handle_accept(self):
2904 conn, addr = self.accept()
2905 self.handler_instance = self.Handler(conn)
2906
2907 def handle_connect(self):
2908 self.close()
2909 handle_read = handle_connect
2910
2911 def writable(self):
2912 return 0
2913
2914 def handle_error(self):
2915 raise
2916
2917
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002918@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2919class TestSendfile(unittest.TestCase):
2920
Victor Stinner8c663fd2017-11-08 14:44:44 -08002921 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002922 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002923 not sys.platform.startswith("solaris") and \
2924 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002925 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2926 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002927 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2928 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002929
2930 @classmethod
2931 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002932 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002933 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002934
2935 @classmethod
2936 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002937 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002938 support.unlink(support.TESTFN)
2939
2940 def setUp(self):
2941 self.server = SendfileTestServer((support.HOST, 0))
2942 self.server.start()
2943 self.client = socket.socket()
2944 self.client.connect((self.server.host, self.server.port))
2945 self.client.settimeout(1)
2946 # synchronize by waiting for "220 ready" response
2947 self.client.recv(1024)
2948 self.sockno = self.client.fileno()
2949 self.file = open(support.TESTFN, 'rb')
2950 self.fileno = self.file.fileno()
2951
2952 def tearDown(self):
2953 self.file.close()
2954 self.client.close()
2955 if self.server.running:
2956 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002957 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002958
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002959 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002960 """A higher level wrapper representing how an application is
2961 supposed to use sendfile().
2962 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002963 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002964 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002965 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002966 except OSError as err:
2967 if err.errno == errno.ECONNRESET:
2968 # disconnected
2969 raise
2970 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2971 # we have to retry send data
2972 continue
2973 else:
2974 raise
2975
2976 def test_send_whole_file(self):
2977 # normal send
2978 total_sent = 0
2979 offset = 0
2980 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002981 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002982 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2983 if sent == 0:
2984 break
2985 offset += sent
2986 total_sent += sent
2987 self.assertTrue(sent <= nbytes)
2988 self.assertEqual(offset, total_sent)
2989
2990 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002991 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002992 self.client.close()
2993 self.server.wait()
2994 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002995 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002996 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002997
2998 def test_send_at_certain_offset(self):
2999 # start sending a file at a certain offset
3000 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003001 offset = len(self.DATA) // 2
3002 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003003 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003004 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003005 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3006 if sent == 0:
3007 break
3008 offset += sent
3009 total_sent += sent
3010 self.assertTrue(sent <= nbytes)
3011
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003012 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003013 self.client.close()
3014 self.server.wait()
3015 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003016 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003017 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003018 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003019 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003020
3021 def test_offset_overflow(self):
3022 # specify an offset > file size
3023 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003024 try:
3025 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3026 except OSError as e:
3027 # Solaris can raise EINVAL if offset >= file length, ignore.
3028 if e.errno != errno.EINVAL:
3029 raise
3030 else:
3031 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003032 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003033 self.client.close()
3034 self.server.wait()
3035 data = self.server.handler_instance.get_data()
3036 self.assertEqual(data, b'')
3037
3038 def test_invalid_offset(self):
3039 with self.assertRaises(OSError) as cm:
3040 os.sendfile(self.sockno, self.fileno, -1, 4096)
3041 self.assertEqual(cm.exception.errno, errno.EINVAL)
3042
Martin Panterbf19d162015-09-09 01:01:13 +00003043 def test_keywords(self):
3044 # Keyword arguments should be supported
3045 os.sendfile(out=self.sockno, offset=0, count=4096,
3046 **{'in': self.fileno})
3047 if self.SUPPORT_HEADERS_TRAILERS:
3048 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003049 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003050
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003051 # --- headers / trailers tests
3052
Serhiy Storchaka43767632013-11-03 21:31:38 +02003053 @requires_headers_trailers
3054 def test_headers(self):
3055 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003056 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003057 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003058 headers=[b"x" * 512, b"y" * 256])
3059 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003060 total_sent += sent
3061 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003062 while total_sent < len(expected_data):
3063 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003064 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3065 offset, nbytes)
3066 if sent == 0:
3067 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003068 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003069 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003070 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003071
Serhiy Storchaka43767632013-11-03 21:31:38 +02003072 self.assertEqual(total_sent, len(expected_data))
3073 self.client.close()
3074 self.server.wait()
3075 data = self.server.handler_instance.get_data()
3076 self.assertEqual(hash(data), hash(expected_data))
3077
3078 @requires_headers_trailers
3079 def test_trailers(self):
3080 TESTFN2 = support.TESTFN + "2"
3081 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003082
3083 self.addCleanup(support.unlink, TESTFN2)
3084 create_file(TESTFN2, file_data)
3085
3086 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003087 os.sendfile(self.sockno, f.fileno(), 0, 5,
3088 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003089 self.client.close()
3090 self.server.wait()
3091 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003092 self.assertEqual(data, b"abcde123456789")
3093
3094 @requires_headers_trailers
3095 @requires_32b
3096 def test_headers_overflow_32bits(self):
3097 self.server.handler_instance.accumulate = False
3098 with self.assertRaises(OSError) as cm:
3099 os.sendfile(self.sockno, self.fileno, 0, 0,
3100 headers=[b"x" * 2**16] * 2**15)
3101 self.assertEqual(cm.exception.errno, errno.EINVAL)
3102
3103 @requires_headers_trailers
3104 @requires_32b
3105 def test_trailers_overflow_32bits(self):
3106 self.server.handler_instance.accumulate = False
3107 with self.assertRaises(OSError) as cm:
3108 os.sendfile(self.sockno, self.fileno, 0, 0,
3109 trailers=[b"x" * 2**16] * 2**15)
3110 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003111
Serhiy Storchaka43767632013-11-03 21:31:38 +02003112 @requires_headers_trailers
3113 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3114 'test needs os.SF_NODISKIO')
3115 def test_flags(self):
3116 try:
3117 os.sendfile(self.sockno, self.fileno, 0, 4096,
3118 flags=os.SF_NODISKIO)
3119 except OSError as err:
3120 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3121 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003122
3123
Larry Hastings9cf065c2012-06-22 16:30:09 -07003124def supports_extended_attributes():
3125 if not hasattr(os, "setxattr"):
3126 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003127
Larry Hastings9cf065c2012-06-22 16:30:09 -07003128 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003129 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003130 try:
3131 os.setxattr(fp.fileno(), b"user.test", b"")
3132 except OSError:
3133 return False
3134 finally:
3135 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003136
3137 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003138
3139
3140@unittest.skipUnless(supports_extended_attributes(),
3141 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003142# Kernels < 2.6.39 don't respect setxattr flags.
3143@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003144class ExtendedAttributeTests(unittest.TestCase):
3145
Larry Hastings9cf065c2012-06-22 16:30:09 -07003146 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003147 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003148 self.addCleanup(support.unlink, fn)
3149 create_file(fn)
3150
Benjamin Peterson799bd802011-08-31 22:15:17 -04003151 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003152 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003153 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003154
Victor Stinnerf12e5062011-10-16 22:12:03 +02003155 init_xattr = listxattr(fn)
3156 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003157
Larry Hastings9cf065c2012-06-22 16:30:09 -07003158 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003159 xattr = set(init_xattr)
3160 xattr.add("user.test")
3161 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003162 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3163 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3164 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003165
Benjamin Peterson799bd802011-08-31 22:15:17 -04003166 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003167 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003168 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003169
Benjamin Peterson799bd802011-08-31 22:15:17 -04003170 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003171 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003172 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003173
Larry Hastings9cf065c2012-06-22 16:30:09 -07003174 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003175 xattr.add("user.test2")
3176 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003177 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003178
Benjamin Peterson799bd802011-08-31 22:15:17 -04003179 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003180 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003181 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003182
Victor Stinnerf12e5062011-10-16 22:12:03 +02003183 xattr.remove("user.test")
3184 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003185 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3186 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3187 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3188 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003189 many = sorted("user.test{}".format(i) for i in range(100))
3190 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003191 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003192 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003193
Larry Hastings9cf065c2012-06-22 16:30:09 -07003194 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003195 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003196 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003197
3198 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3199 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003200
3201 def test_simple(self):
3202 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3203 os.listxattr)
3204
3205 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003206 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3207 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003208
3209 def test_fds(self):
3210 def getxattr(path, *args):
3211 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003212 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003213 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003214 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003215 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003216 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003217 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003218 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003219 def listxattr(path, *args):
3220 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003221 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003222 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3223
3224
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003225@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3226class TermsizeTests(unittest.TestCase):
3227 def test_does_not_crash(self):
3228 """Check if get_terminal_size() returns a meaningful value.
3229
3230 There's no easy portable way to actually check the size of the
3231 terminal, so let's check if it returns something sensible instead.
3232 """
3233 try:
3234 size = os.get_terminal_size()
3235 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003236 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003237 # Under win32 a generic OSError can be thrown if the
3238 # handle cannot be retrieved
3239 self.skipTest("failed to query terminal size")
3240 raise
3241
Antoine Pitroucfade362012-02-08 23:48:59 +01003242 self.assertGreaterEqual(size.columns, 0)
3243 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003244
3245 def test_stty_match(self):
3246 """Check if stty returns the same results
3247
3248 stty actually tests stdin, so get_terminal_size is invoked on
3249 stdin explicitly. If stty succeeded, then get_terminal_size()
3250 should work too.
3251 """
3252 try:
3253 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003254 except (FileNotFoundError, subprocess.CalledProcessError,
3255 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003256 self.skipTest("stty invocation failed")
3257 expected = (int(size[1]), int(size[0])) # reversed order
3258
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003259 try:
3260 actual = os.get_terminal_size(sys.__stdin__.fileno())
3261 except OSError as e:
3262 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3263 # Under win32 a generic OSError can be thrown if the
3264 # handle cannot be retrieved
3265 self.skipTest("failed to query terminal size")
3266 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003267 self.assertEqual(expected, actual)
3268
3269
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003270@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003271@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003272class MemfdCreateTests(unittest.TestCase):
3273 def test_memfd_create(self):
3274 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3275 self.assertNotEqual(fd, -1)
3276 self.addCleanup(os.close, fd)
3277 self.assertFalse(os.get_inheritable(fd))
3278 with open(fd, "wb", closefd=False) as f:
3279 f.write(b'memfd_create')
3280 self.assertEqual(f.tell(), 12)
3281
3282 fd2 = os.memfd_create("Hi")
3283 self.addCleanup(os.close, fd2)
3284 self.assertFalse(os.get_inheritable(fd2))
3285
3286
Victor Stinner292c8352012-10-30 02:17:38 +01003287class OSErrorTests(unittest.TestCase):
3288 def setUp(self):
3289 class Str(str):
3290 pass
3291
Victor Stinnerafe17062012-10-31 22:47:43 +01003292 self.bytes_filenames = []
3293 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003294 if support.TESTFN_UNENCODABLE is not None:
3295 decoded = support.TESTFN_UNENCODABLE
3296 else:
3297 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003298 self.unicode_filenames.append(decoded)
3299 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003300 if support.TESTFN_UNDECODABLE is not None:
3301 encoded = support.TESTFN_UNDECODABLE
3302 else:
3303 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003304 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003305 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003306 self.bytes_filenames.append(memoryview(encoded))
3307
3308 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003309
3310 def test_oserror_filename(self):
3311 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003312 (self.filenames, os.chdir,),
3313 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003314 (self.filenames, os.lstat,),
3315 (self.filenames, os.open, os.O_RDONLY),
3316 (self.filenames, os.rmdir,),
3317 (self.filenames, os.stat,),
3318 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003319 ]
3320 if sys.platform == "win32":
3321 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003322 (self.bytes_filenames, os.rename, b"dst"),
3323 (self.bytes_filenames, os.replace, b"dst"),
3324 (self.unicode_filenames, os.rename, "dst"),
3325 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003326 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003327 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003328 else:
3329 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003330 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003331 (self.filenames, os.rename, "dst"),
3332 (self.filenames, os.replace, "dst"),
3333 ))
3334 if hasattr(os, "chown"):
3335 funcs.append((self.filenames, os.chown, 0, 0))
3336 if hasattr(os, "lchown"):
3337 funcs.append((self.filenames, os.lchown, 0, 0))
3338 if hasattr(os, "truncate"):
3339 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003340 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003341 funcs.append((self.filenames, os.chflags, 0))
3342 if hasattr(os, "lchflags"):
3343 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003344 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003345 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003346 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003347 if sys.platform == "win32":
3348 funcs.append((self.bytes_filenames, os.link, b"dst"))
3349 funcs.append((self.unicode_filenames, os.link, "dst"))
3350 else:
3351 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003352 if hasattr(os, "listxattr"):
3353 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003354 (self.filenames, os.listxattr,),
3355 (self.filenames, os.getxattr, "user.test"),
3356 (self.filenames, os.setxattr, "user.test", b'user'),
3357 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003358 ))
3359 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003360 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003361 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003362 if sys.platform == "win32":
3363 funcs.append((self.unicode_filenames, os.readlink,))
3364 else:
3365 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003366
Steve Dowercc16be82016-09-08 10:35:16 -07003367
Victor Stinnerafe17062012-10-31 22:47:43 +01003368 for filenames, func, *func_args in funcs:
3369 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003370 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003371 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003372 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003373 else:
3374 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3375 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003376 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003377 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003378 except UnicodeDecodeError:
3379 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003380 else:
3381 self.fail("No exception thrown by {}".format(func))
3382
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003383class CPUCountTests(unittest.TestCase):
3384 def test_cpu_count(self):
3385 cpus = os.cpu_count()
3386 if cpus is not None:
3387 self.assertIsInstance(cpus, int)
3388 self.assertGreater(cpus, 0)
3389 else:
3390 self.skipTest("Could not determine the number of CPUs")
3391
Victor Stinnerdaf45552013-08-28 00:53:59 +02003392
3393class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003394 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003395 fd = os.open(__file__, os.O_RDONLY)
3396 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003397 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003398
Victor Stinnerdaf45552013-08-28 00:53:59 +02003399 os.set_inheritable(fd, True)
3400 self.assertEqual(os.get_inheritable(fd), True)
3401
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003402 @unittest.skipIf(fcntl is None, "need fcntl")
3403 def test_get_inheritable_cloexec(self):
3404 fd = os.open(__file__, os.O_RDONLY)
3405 self.addCleanup(os.close, fd)
3406 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003407
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003408 # clear FD_CLOEXEC flag
3409 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3410 flags &= ~fcntl.FD_CLOEXEC
3411 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003412
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003413 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003414
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003415 @unittest.skipIf(fcntl is None, "need fcntl")
3416 def test_set_inheritable_cloexec(self):
3417 fd = os.open(__file__, os.O_RDONLY)
3418 self.addCleanup(os.close, fd)
3419 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3420 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003421
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003422 os.set_inheritable(fd, True)
3423 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3424 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003425
Victor Stinnerdaf45552013-08-28 00:53:59 +02003426 def test_open(self):
3427 fd = os.open(__file__, os.O_RDONLY)
3428 self.addCleanup(os.close, fd)
3429 self.assertEqual(os.get_inheritable(fd), False)
3430
3431 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3432 def test_pipe(self):
3433 rfd, wfd = os.pipe()
3434 self.addCleanup(os.close, rfd)
3435 self.addCleanup(os.close, wfd)
3436 self.assertEqual(os.get_inheritable(rfd), False)
3437 self.assertEqual(os.get_inheritable(wfd), False)
3438
3439 def test_dup(self):
3440 fd1 = os.open(__file__, os.O_RDONLY)
3441 self.addCleanup(os.close, fd1)
3442
3443 fd2 = os.dup(fd1)
3444 self.addCleanup(os.close, fd2)
3445 self.assertEqual(os.get_inheritable(fd2), False)
3446
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003447 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3448 def test_dup_nul(self):
3449 # os.dup() was creating inheritable fds for character files.
3450 fd1 = os.open('NUL', os.O_RDONLY)
3451 self.addCleanup(os.close, fd1)
3452 fd2 = os.dup(fd1)
3453 self.addCleanup(os.close, fd2)
3454 self.assertFalse(os.get_inheritable(fd2))
3455
Victor Stinnerdaf45552013-08-28 00:53:59 +02003456 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3457 def test_dup2(self):
3458 fd = os.open(__file__, os.O_RDONLY)
3459 self.addCleanup(os.close, fd)
3460
3461 # inheritable by default
3462 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003463 self.addCleanup(os.close, fd2)
3464 self.assertEqual(os.dup2(fd, fd2), fd2)
3465 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003466
3467 # force non-inheritable
3468 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003469 self.addCleanup(os.close, fd3)
3470 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3471 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003472
3473 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3474 def test_openpty(self):
3475 master_fd, slave_fd = os.openpty()
3476 self.addCleanup(os.close, master_fd)
3477 self.addCleanup(os.close, slave_fd)
3478 self.assertEqual(os.get_inheritable(master_fd), False)
3479 self.assertEqual(os.get_inheritable(slave_fd), False)
3480
3481
Brett Cannon3f9183b2016-08-26 14:44:48 -07003482class PathTConverterTests(unittest.TestCase):
3483 # tuples of (function name, allows fd arguments, additional arguments to
3484 # function, cleanup function)
3485 functions = [
3486 ('stat', True, (), None),
3487 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003488 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003489 ('chflags', False, (0,), None),
3490 ('lchflags', False, (0,), None),
3491 ('open', False, (0,), getattr(os, 'close', None)),
3492 ]
3493
3494 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003495 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003496 if os.name == 'nt':
3497 bytes_fspath = bytes_filename = None
3498 else:
3499 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003500 bytes_fspath = FakePath(bytes_filename)
3501 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003502 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003503 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003504
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003505 int_fspath = FakePath(fd)
3506 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003507
3508 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3509 with self.subTest(name=name):
3510 try:
3511 fn = getattr(os, name)
3512 except AttributeError:
3513 continue
3514
Brett Cannon8f96a302016-08-26 19:30:11 -07003515 for path in (str_filename, bytes_filename, str_fspath,
3516 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003517 if path is None:
3518 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003519 with self.subTest(name=name, path=path):
3520 result = fn(path, *extra_args)
3521 if cleanup_fn is not None:
3522 cleanup_fn(result)
3523
3524 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003525 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003526 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003527
3528 if allow_fd:
3529 result = fn(fd, *extra_args) # should not fail
3530 if cleanup_fn is not None:
3531 cleanup_fn(result)
3532 else:
3533 with self.assertRaisesRegex(
3534 TypeError,
3535 'os.PathLike'):
3536 fn(fd, *extra_args)
3537
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003538 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003539 msg = r'__fspath__\(\) to return str or bytes, not %s'
3540 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003541 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003542 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003543 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003544 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003545 os.stat(FakePath(object()))
3546
Brett Cannon3f9183b2016-08-26 14:44:48 -07003547
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003548@unittest.skipUnless(hasattr(os, 'get_blocking'),
3549 'needs os.get_blocking() and os.set_blocking()')
3550class BlockingTests(unittest.TestCase):
3551 def test_blocking(self):
3552 fd = os.open(__file__, os.O_RDONLY)
3553 self.addCleanup(os.close, fd)
3554 self.assertEqual(os.get_blocking(fd), True)
3555
3556 os.set_blocking(fd, False)
3557 self.assertEqual(os.get_blocking(fd), False)
3558
3559 os.set_blocking(fd, True)
3560 self.assertEqual(os.get_blocking(fd), True)
3561
3562
Yury Selivanov97e2e062014-09-26 12:33:06 -04003563
3564class ExportsTests(unittest.TestCase):
3565 def test_os_all(self):
3566 self.assertIn('open', os.__all__)
3567 self.assertIn('walk', os.__all__)
3568
3569
Victor Stinner6036e442015-03-08 01:58:04 +01003570class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003571 check_no_resource_warning = support.check_no_resource_warning
3572
Victor Stinner6036e442015-03-08 01:58:04 +01003573 def setUp(self):
3574 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003575 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003576 self.addCleanup(support.rmtree, self.path)
3577 os.mkdir(self.path)
3578
3579 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003580 path = self.bytes_path if isinstance(name, bytes) else self.path
3581 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003582 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003583 return filename
3584
3585 def get_entries(self, names):
3586 entries = dict((entry.name, entry)
3587 for entry in os.scandir(self.path))
3588 self.assertEqual(sorted(entries.keys()), names)
3589 return entries
3590
3591 def assert_stat_equal(self, stat1, stat2, skip_fields):
3592 if skip_fields:
3593 for attr in dir(stat1):
3594 if not attr.startswith("st_"):
3595 continue
3596 if attr in ("st_dev", "st_ino", "st_nlink"):
3597 continue
3598 self.assertEqual(getattr(stat1, attr),
3599 getattr(stat2, attr),
3600 (stat1, stat2, attr))
3601 else:
3602 self.assertEqual(stat1, stat2)
3603
3604 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003605 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003606 self.assertEqual(entry.name, name)
3607 self.assertEqual(entry.path, os.path.join(self.path, name))
3608 self.assertEqual(entry.inode(),
3609 os.stat(entry.path, follow_symlinks=False).st_ino)
3610
3611 entry_stat = os.stat(entry.path)
3612 self.assertEqual(entry.is_dir(),
3613 stat.S_ISDIR(entry_stat.st_mode))
3614 self.assertEqual(entry.is_file(),
3615 stat.S_ISREG(entry_stat.st_mode))
3616 self.assertEqual(entry.is_symlink(),
3617 os.path.islink(entry.path))
3618
3619 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3620 self.assertEqual(entry.is_dir(follow_symlinks=False),
3621 stat.S_ISDIR(entry_lstat.st_mode))
3622 self.assertEqual(entry.is_file(follow_symlinks=False),
3623 stat.S_ISREG(entry_lstat.st_mode))
3624
3625 self.assert_stat_equal(entry.stat(),
3626 entry_stat,
3627 os.name == 'nt' and not is_symlink)
3628 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3629 entry_lstat,
3630 os.name == 'nt')
3631
3632 def test_attributes(self):
3633 link = hasattr(os, 'link')
3634 symlink = support.can_symlink()
3635
3636 dirname = os.path.join(self.path, "dir")
3637 os.mkdir(dirname)
3638 filename = self.create_file("file.txt")
3639 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003640 try:
3641 os.link(filename, os.path.join(self.path, "link_file.txt"))
3642 except PermissionError as e:
3643 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003644 if symlink:
3645 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3646 target_is_directory=True)
3647 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3648
3649 names = ['dir', 'file.txt']
3650 if link:
3651 names.append('link_file.txt')
3652 if symlink:
3653 names.extend(('symlink_dir', 'symlink_file.txt'))
3654 entries = self.get_entries(names)
3655
3656 entry = entries['dir']
3657 self.check_entry(entry, 'dir', True, False, False)
3658
3659 entry = entries['file.txt']
3660 self.check_entry(entry, 'file.txt', False, True, False)
3661
3662 if link:
3663 entry = entries['link_file.txt']
3664 self.check_entry(entry, 'link_file.txt', False, True, False)
3665
3666 if symlink:
3667 entry = entries['symlink_dir']
3668 self.check_entry(entry, 'symlink_dir', True, False, True)
3669
3670 entry = entries['symlink_file.txt']
3671 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3672
3673 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003674 path = self.bytes_path if isinstance(name, bytes) else self.path
3675 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003676 self.assertEqual(len(entries), 1)
3677
3678 entry = entries[0]
3679 self.assertEqual(entry.name, name)
3680 return entry
3681
Brett Cannon96881cd2016-06-10 14:37:21 -07003682 def create_file_entry(self, name='file.txt'):
3683 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003684 return self.get_entry(os.path.basename(filename))
3685
3686 def test_current_directory(self):
3687 filename = self.create_file()
3688 old_dir = os.getcwd()
3689 try:
3690 os.chdir(self.path)
3691
3692 # call scandir() without parameter: it must list the content
3693 # of the current directory
3694 entries = dict((entry.name, entry) for entry in os.scandir())
3695 self.assertEqual(sorted(entries.keys()),
3696 [os.path.basename(filename)])
3697 finally:
3698 os.chdir(old_dir)
3699
3700 def test_repr(self):
3701 entry = self.create_file_entry()
3702 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3703
Brett Cannon96881cd2016-06-10 14:37:21 -07003704 def test_fspath_protocol(self):
3705 entry = self.create_file_entry()
3706 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3707
3708 def test_fspath_protocol_bytes(self):
3709 bytes_filename = os.fsencode('bytesfile.txt')
3710 bytes_entry = self.create_file_entry(name=bytes_filename)
3711 fspath = os.fspath(bytes_entry)
3712 self.assertIsInstance(fspath, bytes)
3713 self.assertEqual(fspath,
3714 os.path.join(os.fsencode(self.path),bytes_filename))
3715
Victor Stinner6036e442015-03-08 01:58:04 +01003716 def test_removed_dir(self):
3717 path = os.path.join(self.path, 'dir')
3718
3719 os.mkdir(path)
3720 entry = self.get_entry('dir')
3721 os.rmdir(path)
3722
3723 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3724 if os.name == 'nt':
3725 self.assertTrue(entry.is_dir())
3726 self.assertFalse(entry.is_file())
3727 self.assertFalse(entry.is_symlink())
3728 if os.name == 'nt':
3729 self.assertRaises(FileNotFoundError, entry.inode)
3730 # don't fail
3731 entry.stat()
3732 entry.stat(follow_symlinks=False)
3733 else:
3734 self.assertGreater(entry.inode(), 0)
3735 self.assertRaises(FileNotFoundError, entry.stat)
3736 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3737
3738 def test_removed_file(self):
3739 entry = self.create_file_entry()
3740 os.unlink(entry.path)
3741
3742 self.assertFalse(entry.is_dir())
3743 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3744 if os.name == 'nt':
3745 self.assertTrue(entry.is_file())
3746 self.assertFalse(entry.is_symlink())
3747 if os.name == 'nt':
3748 self.assertRaises(FileNotFoundError, entry.inode)
3749 # don't fail
3750 entry.stat()
3751 entry.stat(follow_symlinks=False)
3752 else:
3753 self.assertGreater(entry.inode(), 0)
3754 self.assertRaises(FileNotFoundError, entry.stat)
3755 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3756
3757 def test_broken_symlink(self):
3758 if not support.can_symlink():
3759 return self.skipTest('cannot create symbolic link')
3760
3761 filename = self.create_file("file.txt")
3762 os.symlink(filename,
3763 os.path.join(self.path, "symlink.txt"))
3764 entries = self.get_entries(['file.txt', 'symlink.txt'])
3765 entry = entries['symlink.txt']
3766 os.unlink(filename)
3767
3768 self.assertGreater(entry.inode(), 0)
3769 self.assertFalse(entry.is_dir())
3770 self.assertFalse(entry.is_file()) # broken symlink returns False
3771 self.assertFalse(entry.is_dir(follow_symlinks=False))
3772 self.assertFalse(entry.is_file(follow_symlinks=False))
3773 self.assertTrue(entry.is_symlink())
3774 self.assertRaises(FileNotFoundError, entry.stat)
3775 # don't fail
3776 entry.stat(follow_symlinks=False)
3777
3778 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003779 self.create_file("file.txt")
3780
3781 path_bytes = os.fsencode(self.path)
3782 entries = list(os.scandir(path_bytes))
3783 self.assertEqual(len(entries), 1, entries)
3784 entry = entries[0]
3785
3786 self.assertEqual(entry.name, b'file.txt')
3787 self.assertEqual(entry.path,
3788 os.fsencode(os.path.join(self.path, 'file.txt')))
3789
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003790 def test_bytes_like(self):
3791 self.create_file("file.txt")
3792
3793 for cls in bytearray, memoryview:
3794 path_bytes = cls(os.fsencode(self.path))
3795 with self.assertWarns(DeprecationWarning):
3796 entries = list(os.scandir(path_bytes))
3797 self.assertEqual(len(entries), 1, entries)
3798 entry = entries[0]
3799
3800 self.assertEqual(entry.name, b'file.txt')
3801 self.assertEqual(entry.path,
3802 os.fsencode(os.path.join(self.path, 'file.txt')))
3803 self.assertIs(type(entry.name), bytes)
3804 self.assertIs(type(entry.path), bytes)
3805
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003806 @unittest.skipUnless(os.listdir in os.supports_fd,
3807 'fd support for listdir required for this test.')
3808 def test_fd(self):
3809 self.assertIn(os.scandir, os.supports_fd)
3810 self.create_file('file.txt')
3811 expected_names = ['file.txt']
3812 if support.can_symlink():
3813 os.symlink('file.txt', os.path.join(self.path, 'link'))
3814 expected_names.append('link')
3815
3816 fd = os.open(self.path, os.O_RDONLY)
3817 try:
3818 with os.scandir(fd) as it:
3819 entries = list(it)
3820 names = [entry.name for entry in entries]
3821 self.assertEqual(sorted(names), expected_names)
3822 self.assertEqual(names, os.listdir(fd))
3823 for entry in entries:
3824 self.assertEqual(entry.path, entry.name)
3825 self.assertEqual(os.fspath(entry), entry.name)
3826 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3827 if os.stat in os.supports_dir_fd:
3828 st = os.stat(entry.name, dir_fd=fd)
3829 self.assertEqual(entry.stat(), st)
3830 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3831 self.assertEqual(entry.stat(follow_symlinks=False), st)
3832 finally:
3833 os.close(fd)
3834
Victor Stinner6036e442015-03-08 01:58:04 +01003835 def test_empty_path(self):
3836 self.assertRaises(FileNotFoundError, os.scandir, '')
3837
3838 def test_consume_iterator_twice(self):
3839 self.create_file("file.txt")
3840 iterator = os.scandir(self.path)
3841
3842 entries = list(iterator)
3843 self.assertEqual(len(entries), 1, entries)
3844
3845 # check than consuming the iterator twice doesn't raise exception
3846 entries2 = list(iterator)
3847 self.assertEqual(len(entries2), 0, entries2)
3848
3849 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003850 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003851 self.assertRaises(TypeError, os.scandir, obj)
3852
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003853 def test_close(self):
3854 self.create_file("file.txt")
3855 self.create_file("file2.txt")
3856 iterator = os.scandir(self.path)
3857 next(iterator)
3858 iterator.close()
3859 # multiple closes
3860 iterator.close()
3861 with self.check_no_resource_warning():
3862 del iterator
3863
3864 def test_context_manager(self):
3865 self.create_file("file.txt")
3866 self.create_file("file2.txt")
3867 with os.scandir(self.path) as iterator:
3868 next(iterator)
3869 with self.check_no_resource_warning():
3870 del iterator
3871
3872 def test_context_manager_close(self):
3873 self.create_file("file.txt")
3874 self.create_file("file2.txt")
3875 with os.scandir(self.path) as iterator:
3876 next(iterator)
3877 iterator.close()
3878
3879 def test_context_manager_exception(self):
3880 self.create_file("file.txt")
3881 self.create_file("file2.txt")
3882 with self.assertRaises(ZeroDivisionError):
3883 with os.scandir(self.path) as iterator:
3884 next(iterator)
3885 1/0
3886 with self.check_no_resource_warning():
3887 del iterator
3888
3889 def test_resource_warning(self):
3890 self.create_file("file.txt")
3891 self.create_file("file2.txt")
3892 iterator = os.scandir(self.path)
3893 next(iterator)
3894 with self.assertWarns(ResourceWarning):
3895 del iterator
3896 support.gc_collect()
3897 # exhausted iterator
3898 iterator = os.scandir(self.path)
3899 list(iterator)
3900 with self.check_no_resource_warning():
3901 del iterator
3902
Victor Stinner6036e442015-03-08 01:58:04 +01003903
Ethan Furmancdc08792016-06-02 15:06:09 -07003904class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003905
3906 # Abstracted so it can be overridden to test pure Python implementation
3907 # if a C version is provided.
3908 fspath = staticmethod(os.fspath)
3909
Ethan Furmancdc08792016-06-02 15:06:09 -07003910 def test_return_bytes(self):
3911 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003912 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003913
3914 def test_return_string(self):
3915 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003916 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003917
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003918 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003919 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003920 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003921
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003922 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003923 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3924 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3925
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003926 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003927 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3928 self.assertTrue(issubclass(FakePath, os.PathLike))
3929 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003930
Ethan Furmancdc08792016-06-02 15:06:09 -07003931 def test_garbage_in_exception_out(self):
3932 vapor = type('blah', (), {})
3933 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003934 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003935
3936 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003937 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003938
Brett Cannon044283a2016-07-15 10:41:49 -07003939 def test_bad_pathlike(self):
3940 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003941 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003942 # __fspath__ attribute that is not callable.
3943 c = type('foo', (), {})
3944 c.__fspath__ = 1
3945 self.assertRaises(TypeError, self.fspath, c())
3946 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003947 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003948 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003949
Victor Stinnerc29b5852017-11-02 07:28:27 -07003950
3951class TimesTests(unittest.TestCase):
3952 def test_times(self):
3953 times = os.times()
3954 self.assertIsInstance(times, os.times_result)
3955
3956 for field in ('user', 'system', 'children_user', 'children_system',
3957 'elapsed'):
3958 value = getattr(times, field)
3959 self.assertIsInstance(value, float)
3960
3961 if os.name == 'nt':
3962 self.assertEqual(times.children_user, 0)
3963 self.assertEqual(times.children_system, 0)
3964 self.assertEqual(times.elapsed, 0)
3965
3966
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003967# Only test if the C version is provided, otherwise TestPEP519 already tested
3968# the pure Python implementation.
3969if hasattr(os, "_fspath"):
3970 class TestPEP519PurePython(TestPEP519):
3971
3972 """Explicitly test the pure Python implementation of os.fspath()."""
3973
3974 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003975
3976
Fred Drake2e2be372001-09-20 21:33:42 +00003977if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003978 unittest.main()