blob: 2e73906f93c3ee3e1cb5a3eeeaaccc69d1688d59 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
109 with support.change_cwd(tmpdir):
110 path = tmpdir
111 expected = path
112
113 while True:
114 cwd = os.getcwd()
115 self.assertEqual(cwd, expected)
116
117 need = min_len - (len(cwd) + len(os.path.sep))
118 if need <= 0:
119 break
120 if len(dirname) > need and need > 0:
121 dirname = dirname[:need]
122
123 path = os.path.join(path, dirname)
124 try:
125 os.mkdir(path)
126 # On Windows, chdir() can fail
127 # even if mkdir() succeeded
128 os.chdir(path)
129 except FileNotFoundError:
130 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
131 # ERROR_FILENAME_EXCED_RANGE (206) errors
132 # ("The filename or extension is too long")
133 break
134 except OSError as exc:
135 if exc.errno == errno.ENAMETOOLONG:
136 break
137 else:
138 raise
139
140 expected = path
141
142 if support.verbose:
143 print(f"Tested current directory length: {len(cwd)}")
144
Victor Stinner689830e2019-06-26 17:31:12 +0200145 def test_getcwdb(self):
146 cwd = os.getcwdb()
147 self.assertIsInstance(cwd, bytes)
148 self.assertEqual(os.fsdecode(cwd), os.getcwd())
149
150
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000151# Tests creating TESTFN
152class FileTests(unittest.TestCase):
153 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000154 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000155 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000156 tearDown = setUp
157
158 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000159 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000160 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000161 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000162
Christian Heimesfdab48e2008-01-20 09:06:41 +0000163 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000164 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
165 # We must allocate two consecutive file descriptors, otherwise
166 # it will mess up other file descriptors (perhaps even the three
167 # standard ones).
168 second = os.dup(first)
169 try:
170 retries = 0
171 while second != first + 1:
172 os.close(first)
173 retries += 1
174 if retries > 10:
175 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000176 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000177 first, second = second, os.dup(second)
178 finally:
179 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000180 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000181 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000182 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000183
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000184 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000185 def test_rename(self):
186 path = support.TESTFN
187 old = sys.getrefcount(path)
188 self.assertRaises(TypeError, os.rename, path, 0)
189 new = sys.getrefcount(path)
190 self.assertEqual(old, new)
191
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000192 def test_read(self):
193 with open(support.TESTFN, "w+b") as fobj:
194 fobj.write(b"spam")
195 fobj.flush()
196 fd = fobj.fileno()
197 os.lseek(fd, 0, 0)
198 s = os.read(fd, 4)
199 self.assertEqual(type(s), bytes)
200 self.assertEqual(s, b"spam")
201
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200202 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200203 # Skip the test on 32-bit platforms: the number of bytes must fit in a
204 # Py_ssize_t type
205 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
206 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200207 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
208 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200209 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100210 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200211
212 # Issue #21932: Make sure that os.read() does not raise an
213 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200214 with open(support.TESTFN, "rb") as fp:
215 data = os.read(fp.fileno(), size)
216
Victor Stinner8c663fd2017-11-08 14:44:44 -0800217 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200218 # operating system is free to return less bytes than requested.
219 self.assertEqual(data, b'test')
220
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000221 def test_write(self):
222 # os.write() accepts bytes- and buffer-like objects but not strings
223 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
224 self.assertRaises(TypeError, os.write, fd, "beans")
225 os.write(fd, b"bacon\n")
226 os.write(fd, bytearray(b"eggs\n"))
227 os.write(fd, memoryview(b"spam\n"))
228 os.close(fd)
229 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000230 self.assertEqual(fobj.read().splitlines(),
231 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000232
Victor Stinnere0daff12011-03-20 23:36:35 +0100233 def write_windows_console(self, *args):
234 retcode = subprocess.call(args,
235 # use a new console to not flood the test output
236 creationflags=subprocess.CREATE_NEW_CONSOLE,
237 # use a shell to hide the console window (SW_HIDE)
238 shell=True)
239 self.assertEqual(retcode, 0)
240
241 @unittest.skipUnless(sys.platform == 'win32',
242 'test specific to the Windows console')
243 def test_write_windows_console(self):
244 # Issue #11395: the Windows console returns an error (12: not enough
245 # space error) on writing into stdout if stdout mode is binary and the
246 # length is greater than 66,000 bytes (or less, depending on heap
247 # usage).
248 code = "print('x' * 100000)"
249 self.write_windows_console(sys.executable, "-c", code)
250 self.write_windows_console(sys.executable, "-u", "-c", code)
251
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000252 def fdopen_helper(self, *args):
253 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200254 f = os.fdopen(fd, *args)
255 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000256
257 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200258 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
259 os.close(fd)
260
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000261 self.fdopen_helper()
262 self.fdopen_helper('r')
263 self.fdopen_helper('r', 100)
264
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100265 def test_replace(self):
266 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100267 self.addCleanup(support.unlink, support.TESTFN)
268 self.addCleanup(support.unlink, TESTFN2)
269
270 create_file(support.TESTFN, b"1")
271 create_file(TESTFN2, b"2")
272
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100273 os.replace(support.TESTFN, TESTFN2)
274 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
275 with open(TESTFN2, 'r') as f:
276 self.assertEqual(f.read(), "1")
277
Martin Panterbf19d162015-09-09 01:01:13 +0000278 def test_open_keywords(self):
279 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
280 dir_fd=None)
281 os.close(f)
282
283 def test_symlink_keywords(self):
284 symlink = support.get_attribute(os, "symlink")
285 try:
286 symlink(src='target', dst=support.TESTFN,
287 target_is_directory=False, dir_fd=None)
288 except (NotImplementedError, OSError):
289 pass # No OS support or unprivileged user
290
Pablo Galindoaac4d032019-05-31 19:39:47 +0100291 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
292 def test_copy_file_range_invalid_values(self):
293 with self.assertRaises(ValueError):
294 os.copy_file_range(0, 1, -10)
295
296 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
297 def test_copy_file_range(self):
298 TESTFN2 = support.TESTFN + ".3"
299 data = b'0123456789'
300
301 create_file(support.TESTFN, data)
302 self.addCleanup(support.unlink, support.TESTFN)
303
304 in_file = open(support.TESTFN, 'rb')
305 self.addCleanup(in_file.close)
306 in_fd = in_file.fileno()
307
308 out_file = open(TESTFN2, 'w+b')
309 self.addCleanup(support.unlink, TESTFN2)
310 self.addCleanup(out_file.close)
311 out_fd = out_file.fileno()
312
313 try:
314 i = os.copy_file_range(in_fd, out_fd, 5)
315 except OSError as e:
316 # Handle the case in which Python was compiled
317 # in a system with the syscall but without support
318 # in the kernel.
319 if e.errno != errno.ENOSYS:
320 raise
321 self.skipTest(e)
322 else:
323 # The number of copied bytes can be less than
324 # the number of bytes originally requested.
325 self.assertIn(i, range(0, 6));
326
327 with open(TESTFN2, 'rb') as in_file:
328 self.assertEqual(in_file.read(), data[:i])
329
330 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
331 def test_copy_file_range_offset(self):
332 TESTFN4 = support.TESTFN + ".4"
333 data = b'0123456789'
334 bytes_to_copy = 6
335 in_skip = 3
336 out_seek = 5
337
338 create_file(support.TESTFN, data)
339 self.addCleanup(support.unlink, support.TESTFN)
340
341 in_file = open(support.TESTFN, 'rb')
342 self.addCleanup(in_file.close)
343 in_fd = in_file.fileno()
344
345 out_file = open(TESTFN4, 'w+b')
346 self.addCleanup(support.unlink, TESTFN4)
347 self.addCleanup(out_file.close)
348 out_fd = out_file.fileno()
349
350 try:
351 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
352 offset_src=in_skip,
353 offset_dst=out_seek)
354 except OSError as e:
355 # Handle the case in which Python was compiled
356 # in a system with the syscall but without support
357 # in the kernel.
358 if e.errno != errno.ENOSYS:
359 raise
360 self.skipTest(e)
361 else:
362 # The number of copied bytes can be less than
363 # the number of bytes originally requested.
364 self.assertIn(i, range(0, bytes_to_copy+1));
365
366 with open(TESTFN4, 'rb') as in_file:
367 read = in_file.read()
368 # seeked bytes (5) are zero'ed
369 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
370 # 012 are skipped (in_skip)
371 # 345678 are copied in the file (in_skip + bytes_to_copy)
372 self.assertEqual(read[out_seek:],
373 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200374
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000375# Test attributes on return values from os.*stat* family.
376class StatAttributeTests(unittest.TestCase):
377 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200378 self.fname = support.TESTFN
379 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100380 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000383 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000384
385 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000386 self.assertEqual(result[stat.ST_SIZE], 3)
387 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 # Make sure all the attributes are there
390 members = dir(result)
391 for name in dir(stat):
392 if name[:3] == 'ST_':
393 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000394 if name.endswith("TIME"):
395 def trunc(x): return int(x)
396 else:
397 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000398 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000399 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000400 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000401
Larry Hastings6fe20b32012-04-19 15:07:49 -0700402 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700403 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700404 for name in 'st_atime st_mtime st_ctime'.split():
405 floaty = int(getattr(result, name) * 100000)
406 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700407 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700408
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000409 try:
410 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200411 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000412 except IndexError:
413 pass
414
415 # Make sure that assignment fails
416 try:
417 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200418 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000419 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000420 pass
421
422 try:
423 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200424 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000425 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000426 pass
427
428 try:
429 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200430 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000431 except AttributeError:
432 pass
433
434 # Use the stat_result constructor with a too-short tuple.
435 try:
436 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200437 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000438 except TypeError:
439 pass
440
Ezio Melotti42da6632011-03-15 05:18:48 +0200441 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000442 try:
443 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
444 except TypeError:
445 pass
446
Antoine Pitrou38425292010-09-21 18:19:07 +0000447 def test_stat_attributes(self):
448 self.check_stat_attributes(self.fname)
449
450 def test_stat_attributes_bytes(self):
451 try:
452 fname = self.fname.encode(sys.getfilesystemencoding())
453 except UnicodeEncodeError:
454 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700455 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000456
Christian Heimes25827622013-10-12 01:27:08 +0200457 def test_stat_result_pickle(self):
458 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200459 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
460 p = pickle.dumps(result, proto)
461 self.assertIn(b'stat_result', p)
462 if proto < 4:
463 self.assertIn(b'cos\nstat_result\n', p)
464 unpickled = pickle.loads(p)
465 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200466
Serhiy Storchaka43767632013-11-03 21:31:38 +0200467 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000468 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700469 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000470
471 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000472 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000473
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000474 # Make sure all the attributes are there.
475 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
476 'ffree', 'favail', 'flag', 'namemax')
477 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000478 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000479
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100480 self.assertTrue(isinstance(result.f_fsid, int))
481
482 # Test that the size of the tuple doesn't change
483 self.assertEqual(len(result), 10)
484
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000485 # Make sure that assignment really fails
486 try:
487 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200488 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000489 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000490 pass
491
492 try:
493 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200494 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000495 except AttributeError:
496 pass
497
498 # Use the constructor with a too-short tuple.
499 try:
500 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200501 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000502 except TypeError:
503 pass
504
Ezio Melotti42da6632011-03-15 05:18:48 +0200505 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000506 try:
507 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
508 except TypeError:
509 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000510
Christian Heimes25827622013-10-12 01:27:08 +0200511 @unittest.skipUnless(hasattr(os, 'statvfs'),
512 "need os.statvfs()")
513 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700514 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200515
Serhiy Storchakabad12572014-12-15 14:03:42 +0200516 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
517 p = pickle.dumps(result, proto)
518 self.assertIn(b'statvfs_result', p)
519 if proto < 4:
520 self.assertIn(b'cos\nstatvfs_result\n', p)
521 unpickled = pickle.loads(p)
522 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200523
Serhiy Storchaka43767632013-11-03 21:31:38 +0200524 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
525 def test_1686475(self):
526 # Verify that an open file can be stat'ed
527 try:
528 os.stat(r"c:\pagefile.sys")
529 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600530 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200531 except OSError as e:
532 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000533
Serhiy Storchaka43767632013-11-03 21:31:38 +0200534 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
535 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
536 def test_15261(self):
537 # Verify that stat'ing a closed fd does not cause crash
538 r, w = os.pipe()
539 try:
540 os.stat(r) # should not raise error
541 finally:
542 os.close(r)
543 os.close(w)
544 with self.assertRaises(OSError) as ctx:
545 os.stat(r)
546 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100547
Zachary Ware63f277b2014-06-19 09:46:37 -0500548 def check_file_attributes(self, result):
549 self.assertTrue(hasattr(result, 'st_file_attributes'))
550 self.assertTrue(isinstance(result.st_file_attributes, int))
551 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
552
553 @unittest.skipUnless(sys.platform == "win32",
554 "st_file_attributes is Win32 specific")
555 def test_file_attributes(self):
556 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
557 result = os.stat(self.fname)
558 self.check_file_attributes(result)
559 self.assertEqual(
560 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
561 0)
562
563 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200564 dirname = support.TESTFN + "dir"
565 os.mkdir(dirname)
566 self.addCleanup(os.rmdir, dirname)
567
568 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500569 self.check_file_attributes(result)
570 self.assertEqual(
571 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
572 stat.FILE_ATTRIBUTE_DIRECTORY)
573
Berker Peksag0b4dc482016-09-17 15:49:59 +0300574 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
575 def test_access_denied(self):
576 # Default to FindFirstFile WIN32_FIND_DATA when access is
577 # denied. See issue 28075.
578 # os.environ['TEMP'] should be located on a volume that
579 # supports file ACLs.
580 fname = os.path.join(os.environ['TEMP'], self.fname)
581 self.addCleanup(support.unlink, fname)
582 create_file(fname, b'ABC')
583 # Deny the right to [S]YNCHRONIZE on the file to
584 # force CreateFile to fail with ERROR_ACCESS_DENIED.
585 DETACHED_PROCESS = 8
586 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500587 # bpo-30584: Use security identifier *S-1-5-32-545 instead
588 # of localized "Users" to not depend on the locale.
589 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300590 creationflags=DETACHED_PROCESS
591 )
592 result = os.stat(fname)
593 self.assertNotEqual(result.st_size, 0)
594
Victor Stinner47aacc82015-06-12 17:26:23 +0200595
596class UtimeTests(unittest.TestCase):
597 def setUp(self):
598 self.dirname = support.TESTFN
599 self.fname = os.path.join(self.dirname, "f1")
600
601 self.addCleanup(support.rmtree, self.dirname)
602 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100603 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200604
Victor Stinner47aacc82015-06-12 17:26:23 +0200605 def support_subsecond(self, filename):
606 # Heuristic to check if the filesystem supports timestamp with
607 # subsecond resolution: check if float and int timestamps are different
608 st = os.stat(filename)
609 return ((st.st_atime != st[7])
610 or (st.st_mtime != st[8])
611 or (st.st_ctime != st[9]))
612
613 def _test_utime(self, set_time, filename=None):
614 if not filename:
615 filename = self.fname
616
617 support_subsecond = self.support_subsecond(filename)
618 if support_subsecond:
619 # Timestamp with a resolution of 1 microsecond (10^-6).
620 #
621 # The resolution of the C internal function used by os.utime()
622 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
623 # test with a resolution of 1 ns requires more work:
624 # see the issue #15745.
625 atime_ns = 1002003000 # 1.002003 seconds
626 mtime_ns = 4005006000 # 4.005006 seconds
627 else:
628 # use a resolution of 1 second
629 atime_ns = 5 * 10**9
630 mtime_ns = 8 * 10**9
631
632 set_time(filename, (atime_ns, mtime_ns))
633 st = os.stat(filename)
634
635 if support_subsecond:
636 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
637 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
638 else:
639 self.assertEqual(st.st_atime, atime_ns * 1e-9)
640 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
641 self.assertEqual(st.st_atime_ns, atime_ns)
642 self.assertEqual(st.st_mtime_ns, mtime_ns)
643
644 def test_utime(self):
645 def set_time(filename, ns):
646 # test the ns keyword parameter
647 os.utime(filename, ns=ns)
648 self._test_utime(set_time)
649
650 @staticmethod
651 def ns_to_sec(ns):
652 # Convert a number of nanosecond (int) to a number of seconds (float).
653 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
654 # issue, os.utime() rounds towards minus infinity.
655 return (ns * 1e-9) + 0.5e-9
656
657 def test_utime_by_indexed(self):
658 # pass times as floating point seconds as the second indexed parameter
659 def set_time(filename, ns):
660 atime_ns, mtime_ns = ns
661 atime = self.ns_to_sec(atime_ns)
662 mtime = self.ns_to_sec(mtime_ns)
663 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
664 # or utime(time_t)
665 os.utime(filename, (atime, mtime))
666 self._test_utime(set_time)
667
668 def test_utime_by_times(self):
669 def set_time(filename, ns):
670 atime_ns, mtime_ns = ns
671 atime = self.ns_to_sec(atime_ns)
672 mtime = self.ns_to_sec(mtime_ns)
673 # test the times keyword parameter
674 os.utime(filename, times=(atime, mtime))
675 self._test_utime(set_time)
676
677 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
678 "follow_symlinks support for utime required "
679 "for this test.")
680 def test_utime_nofollow_symlinks(self):
681 def set_time(filename, ns):
682 # use follow_symlinks=False to test utimensat(timespec)
683 # or lutimes(timeval)
684 os.utime(filename, ns=ns, follow_symlinks=False)
685 self._test_utime(set_time)
686
687 @unittest.skipUnless(os.utime in os.supports_fd,
688 "fd support for utime required for this test.")
689 def test_utime_fd(self):
690 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100691 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200692 # use a file descriptor to test futimens(timespec)
693 # or futimes(timeval)
694 os.utime(fp.fileno(), ns=ns)
695 self._test_utime(set_time)
696
697 @unittest.skipUnless(os.utime in os.supports_dir_fd,
698 "dir_fd support for utime required for this test.")
699 def test_utime_dir_fd(self):
700 def set_time(filename, ns):
701 dirname, name = os.path.split(filename)
702 dirfd = os.open(dirname, os.O_RDONLY)
703 try:
704 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
705 os.utime(name, dir_fd=dirfd, ns=ns)
706 finally:
707 os.close(dirfd)
708 self._test_utime(set_time)
709
710 def test_utime_directory(self):
711 def set_time(filename, ns):
712 # test calling os.utime() on a directory
713 os.utime(filename, ns=ns)
714 self._test_utime(set_time, filename=self.dirname)
715
716 def _test_utime_current(self, set_time):
717 # Get the system clock
718 current = time.time()
719
720 # Call os.utime() to set the timestamp to the current system clock
721 set_time(self.fname)
722
723 if not self.support_subsecond(self.fname):
724 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700725 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200726 # On Windows, the usual resolution of time.time() is 15.6 ms.
727 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700728 #
729 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
730 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200731 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200732 st = os.stat(self.fname)
733 msg = ("st_time=%r, current=%r, dt=%r"
734 % (st.st_mtime, current, st.st_mtime - current))
735 self.assertAlmostEqual(st.st_mtime, current,
736 delta=delta, msg=msg)
737
738 def test_utime_current(self):
739 def set_time(filename):
740 # Set to the current time in the new way
741 os.utime(self.fname)
742 self._test_utime_current(set_time)
743
744 def test_utime_current_old(self):
745 def set_time(filename):
746 # Set to the current time in the old explicit way.
747 os.utime(self.fname, None)
748 self._test_utime_current(set_time)
749
750 def get_file_system(self, path):
751 if sys.platform == 'win32':
752 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
753 import ctypes
754 kernel32 = ctypes.windll.kernel32
755 buf = ctypes.create_unicode_buffer("", 100)
756 ok = kernel32.GetVolumeInformationW(root, None, 0,
757 None, None, None,
758 buf, len(buf))
759 if ok:
760 return buf.value
761 # return None if the filesystem is unknown
762
763 def test_large_time(self):
764 # Many filesystems are limited to the year 2038. At least, the test
765 # pass with NTFS filesystem.
766 if self.get_file_system(self.dirname) != "NTFS":
767 self.skipTest("requires NTFS")
768
769 large = 5000000000 # some day in 2128
770 os.utime(self.fname, (large, large))
771 self.assertEqual(os.stat(self.fname).st_mtime, large)
772
773 def test_utime_invalid_arguments(self):
774 # seconds and nanoseconds parameters are mutually exclusive
775 with self.assertRaises(ValueError):
776 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200777 with self.assertRaises(TypeError):
778 os.utime(self.fname, [5, 5])
779 with self.assertRaises(TypeError):
780 os.utime(self.fname, (5,))
781 with self.assertRaises(TypeError):
782 os.utime(self.fname, (5, 5, 5))
783 with self.assertRaises(TypeError):
784 os.utime(self.fname, ns=[5, 5])
785 with self.assertRaises(TypeError):
786 os.utime(self.fname, ns=(5,))
787 with self.assertRaises(TypeError):
788 os.utime(self.fname, ns=(5, 5, 5))
789
790 if os.utime not in os.supports_follow_symlinks:
791 with self.assertRaises(NotImplementedError):
792 os.utime(self.fname, (5, 5), follow_symlinks=False)
793 if os.utime not in os.supports_fd:
794 with open(self.fname, 'wb', 0) as fp:
795 with self.assertRaises(TypeError):
796 os.utime(fp.fileno(), (5, 5))
797 if os.utime not in os.supports_dir_fd:
798 with self.assertRaises(NotImplementedError):
799 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200800
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300801 @support.cpython_only
802 def test_issue31577(self):
803 # The interpreter shouldn't crash in case utime() received a bad
804 # ns argument.
805 def get_bad_int(divmod_ret_val):
806 class BadInt:
807 def __divmod__(*args):
808 return divmod_ret_val
809 return BadInt()
810 with self.assertRaises(TypeError):
811 os.utime(self.fname, ns=(get_bad_int(42), 1))
812 with self.assertRaises(TypeError):
813 os.utime(self.fname, ns=(get_bad_int(()), 1))
814 with self.assertRaises(TypeError):
815 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
816
Victor Stinner47aacc82015-06-12 17:26:23 +0200817
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000818from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000819
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000820class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000821 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000822 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000823
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000824 def setUp(self):
825 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000826 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000827 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000828 for key, value in self._reference().items():
829 os.environ[key] = value
830
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 def tearDown(self):
832 os.environ.clear()
833 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000834 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000835 os.environb.clear()
836 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000837
Christian Heimes90333392007-11-01 19:08:42 +0000838 def _reference(self):
839 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
840
841 def _empty_mapping(self):
842 os.environ.clear()
843 return os.environ
844
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000845 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200846 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
847 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000848 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000849 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300850 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200851 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300852 value = popen.read().strip()
853 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000854
Xavier de Gayed1415312016-07-22 12:15:29 +0200855 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
856 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000857 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200858 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
859 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300860 it = iter(popen)
861 self.assertEqual(next(it), "line1\n")
862 self.assertEqual(next(it), "line2\n")
863 self.assertEqual(next(it), "line3\n")
864 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000865
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000866 # Verify environ keys and values from the OS are of the
867 # correct str type.
868 def test_keyvalue_types(self):
869 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000870 self.assertEqual(type(key), str)
871 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000872
Christian Heimes90333392007-11-01 19:08:42 +0000873 def test_items(self):
874 for key, value in self._reference().items():
875 self.assertEqual(os.environ.get(key), value)
876
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000877 # Issue 7310
878 def test___repr__(self):
879 """Check that the repr() of os.environ looks like environ({...})."""
880 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000881 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
882 '{!r}: {!r}'.format(key, value)
883 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000884
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000885 def test_get_exec_path(self):
886 defpath_list = os.defpath.split(os.pathsep)
887 test_path = ['/monty', '/python', '', '/flying/circus']
888 test_env = {'PATH': os.pathsep.join(test_path)}
889
890 saved_environ = os.environ
891 try:
892 os.environ = dict(test_env)
893 # Test that defaulting to os.environ works.
894 self.assertSequenceEqual(test_path, os.get_exec_path())
895 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
896 finally:
897 os.environ = saved_environ
898
899 # No PATH environment variable
900 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
901 # Empty PATH environment variable
902 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
903 # Supplied PATH environment variable
904 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
905
Victor Stinnerb745a742010-05-18 17:17:23 +0000906 if os.supports_bytes_environ:
907 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000908 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000909 # ignore BytesWarning warning
910 with warnings.catch_warnings(record=True):
911 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000912 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000913 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000914 pass
915 else:
916 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000917
918 # bytes key and/or value
919 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
920 ['abc'])
921 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
922 ['abc'])
923 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
924 ['abc'])
925
926 @unittest.skipUnless(os.supports_bytes_environ,
927 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000928 def test_environb(self):
929 # os.environ -> os.environb
930 value = 'euro\u20ac'
931 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000932 value_bytes = value.encode(sys.getfilesystemencoding(),
933 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000934 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000935 msg = "U+20AC character is not encodable to %s" % (
936 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000937 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000938 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000939 self.assertEqual(os.environ['unicode'], value)
940 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000941
942 # os.environb -> os.environ
943 value = b'\xff'
944 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000945 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000946 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000948
Victor Stinner13ff2452018-01-22 18:32:50 +0100949 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100950 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100951 def test_unset_error(self):
952 if sys.platform == "win32":
953 # an environment variable is limited to 32,767 characters
954 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100955 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100956 else:
957 # "=" is not allowed in a variable name
958 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100959 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100960
Victor Stinner6d101392013-04-14 16:35:04 +0200961 def test_key_type(self):
962 missing = 'missingkey'
963 self.assertNotIn(missing, os.environ)
964
Victor Stinner839e5ea2013-04-14 16:43:03 +0200965 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200966 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200967 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200968 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200969
Victor Stinner839e5ea2013-04-14 16:43:03 +0200970 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200971 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200972 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200973 self.assertTrue(cm.exception.__suppress_context__)
974
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300975 def _test_environ_iteration(self, collection):
976 iterator = iter(collection)
977 new_key = "__new_key__"
978
979 next(iterator) # start iteration over os.environ.items
980
981 # add a new key in os.environ mapping
982 os.environ[new_key] = "test_environ_iteration"
983
984 try:
985 next(iterator) # force iteration over modified mapping
986 self.assertEqual(os.environ[new_key], "test_environ_iteration")
987 finally:
988 del os.environ[new_key]
989
990 def test_iter_error_when_changing_os_environ(self):
991 self._test_environ_iteration(os.environ)
992
993 def test_iter_error_when_changing_os_environ_items(self):
994 self._test_environ_iteration(os.environ.items())
995
996 def test_iter_error_when_changing_os_environ_values(self):
997 self._test_environ_iteration(os.environ.values())
998
Victor Stinner6d101392013-04-14 16:35:04 +0200999
Tim Petersc4e09402003-04-25 07:11:48 +00001000class WalkTests(unittest.TestCase):
1001 """Tests for os.walk()."""
1002
Victor Stinner0561c532015-03-12 10:28:24 +01001003 # Wrapper to hide minor differences between os.walk and os.fwalk
1004 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001005 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001006 if 'follow_symlinks' in kwargs:
1007 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001008 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001009
Charles-François Natali7372b062012-02-05 15:15:38 +01001010 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001011 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001012 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001013
1014 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001015 # TESTFN/
1016 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001017 # tmp1
1018 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001019 # tmp2
1020 # SUB11/ no kids
1021 # SUB2/ a file kid and a dirsymlink kid
1022 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001023 # SUB21/ not readable
1024 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001025 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001026 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001027 # broken_link2
1028 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001029 # TEST2/
1030 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001031 self.walk_path = join(support.TESTFN, "TEST1")
1032 self.sub1_path = join(self.walk_path, "SUB1")
1033 self.sub11_path = join(self.sub1_path, "SUB11")
1034 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001035 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001036 tmp1_path = join(self.walk_path, "tmp1")
1037 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001038 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001039 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001040 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001041 t2_path = join(support.TESTFN, "TEST2")
1042 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001043 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001044 broken_link2_path = join(sub2_path, "broken_link2")
1045 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001046
1047 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001048 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001049 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001050 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001051 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001052
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001053 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001054 with open(path, "x") as f:
1055 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001056
Victor Stinner0561c532015-03-12 10:28:24 +01001057 if support.can_symlink():
1058 os.symlink(os.path.abspath(t2_path), self.link_path)
1059 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001060 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1061 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001062 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001063 ["broken_link", "broken_link2", "broken_link3",
1064 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001065 else:
pxinwr3e028b22019-02-15 13:04:47 +08001066 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001067
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001068 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001069 try:
1070 os.listdir(sub21_path)
1071 except PermissionError:
1072 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1073 else:
1074 os.chmod(sub21_path, stat.S_IRWXU)
1075 os.unlink(tmp5_path)
1076 os.rmdir(sub21_path)
1077 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001078
Victor Stinner0561c532015-03-12 10:28:24 +01001079 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001080 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001081 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001082
Tim Petersc4e09402003-04-25 07:11:48 +00001083 self.assertEqual(len(all), 4)
1084 # We can't know which order SUB1 and SUB2 will appear in.
1085 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1086 # flipped: TESTFN, SUB2, SUB1, SUB11
1087 flipped = all[0][1][0] != "SUB1"
1088 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001089 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001090 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001091 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1092 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1093 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1094 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001095
Brett Cannon3f9183b2016-08-26 14:44:48 -07001096 def test_walk_prune(self, walk_path=None):
1097 if walk_path is None:
1098 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001099 # Prune the search.
1100 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001101 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001102 all.append((root, dirs, files))
1103 # Don't descend into SUB1.
1104 if 'SUB1' in dirs:
1105 # Note that this also mutates the dirs we appended to all!
1106 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001107
Victor Stinner0561c532015-03-12 10:28:24 +01001108 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001109 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001110
1111 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001112 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001113 self.assertEqual(all[1], self.sub2_tree)
1114
Brett Cannon3f9183b2016-08-26 14:44:48 -07001115 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001116 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001117
Victor Stinner0561c532015-03-12 10:28:24 +01001118 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001119 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001120 all = list(self.walk(self.walk_path, topdown=False))
1121
Victor Stinner53b0a412016-03-26 01:12:36 +01001122 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001123 # We can't know which order SUB1 and SUB2 will appear in.
1124 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1125 # flipped: SUB2, SUB11, SUB1, TESTFN
1126 flipped = all[3][1][0] != "SUB1"
1127 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001128 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001129 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001130 self.assertEqual(all[3],
1131 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1132 self.assertEqual(all[flipped],
1133 (self.sub11_path, [], []))
1134 self.assertEqual(all[flipped + 1],
1135 (self.sub1_path, ["SUB11"], ["tmp2"]))
1136 self.assertEqual(all[2 - 2 * flipped],
1137 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001138
Victor Stinner0561c532015-03-12 10:28:24 +01001139 def test_walk_symlink(self):
1140 if not support.can_symlink():
1141 self.skipTest("need symlink support")
1142
1143 # Walk, following symlinks.
1144 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1145 for root, dirs, files in walk_it:
1146 if root == self.link_path:
1147 self.assertEqual(dirs, [])
1148 self.assertEqual(files, ["tmp4"])
1149 break
1150 else:
1151 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001152
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001153 def test_walk_bad_dir(self):
1154 # Walk top-down.
1155 errors = []
1156 walk_it = self.walk(self.walk_path, onerror=errors.append)
1157 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001158 self.assertEqual(errors, [])
1159 dir1 = 'SUB1'
1160 path1 = os.path.join(root, dir1)
1161 path1new = os.path.join(root, dir1 + '.new')
1162 os.rename(path1, path1new)
1163 try:
1164 roots = [r for r, d, f in walk_it]
1165 self.assertTrue(errors)
1166 self.assertNotIn(path1, roots)
1167 self.assertNotIn(path1new, roots)
1168 for dir2 in dirs:
1169 if dir2 != dir1:
1170 self.assertIn(os.path.join(root, dir2), roots)
1171 finally:
1172 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001173
Charles-François Natali7372b062012-02-05 15:15:38 +01001174
1175@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1176class FwalkTests(WalkTests):
1177 """Tests for os.fwalk()."""
1178
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001179 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001180 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001181 yield (root, dirs, files)
1182
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001183 def fwalk(self, *args, **kwargs):
1184 return os.fwalk(*args, **kwargs)
1185
Larry Hastingsc48fe982012-06-25 04:49:05 -07001186 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1187 """
1188 compare with walk() results.
1189 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001190 walk_kwargs = walk_kwargs.copy()
1191 fwalk_kwargs = fwalk_kwargs.copy()
1192 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1193 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1194 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001195
Charles-François Natali7372b062012-02-05 15:15:38 +01001196 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001197 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001198 expected[root] = (set(dirs), set(files))
1199
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001200 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001201 self.assertIn(root, expected)
1202 self.assertEqual(expected[root], (set(dirs), set(files)))
1203
Larry Hastingsc48fe982012-06-25 04:49:05 -07001204 def test_compare_to_walk(self):
1205 kwargs = {'top': support.TESTFN}
1206 self._compare_to_walk(kwargs, kwargs)
1207
Charles-François Natali7372b062012-02-05 15:15:38 +01001208 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001209 try:
1210 fd = os.open(".", os.O_RDONLY)
1211 walk_kwargs = {'top': support.TESTFN}
1212 fwalk_kwargs = walk_kwargs.copy()
1213 fwalk_kwargs['dir_fd'] = fd
1214 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1215 finally:
1216 os.close(fd)
1217
1218 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001219 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001220 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1221 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001222 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001223 # check that the FD is valid
1224 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001225 # redundant check
1226 os.stat(rootfd)
1227 # check that listdir() returns consistent information
1228 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001229
1230 def test_fd_leak(self):
1231 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1232 # we both check that calling fwalk() a large number of times doesn't
1233 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1234 minfd = os.dup(1)
1235 os.close(minfd)
1236 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001237 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001238 pass
1239 newfd = os.dup(1)
1240 self.addCleanup(os.close, newfd)
1241 self.assertEqual(newfd, minfd)
1242
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001243class BytesWalkTests(WalkTests):
1244 """Tests for os.walk() with bytes."""
1245 def walk(self, top, **kwargs):
1246 if 'follow_symlinks' in kwargs:
1247 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1248 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1249 root = os.fsdecode(broot)
1250 dirs = list(map(os.fsdecode, bdirs))
1251 files = list(map(os.fsdecode, bfiles))
1252 yield (root, dirs, files)
1253 bdirs[:] = list(map(os.fsencode, dirs))
1254 bfiles[:] = list(map(os.fsencode, files))
1255
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001256@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1257class BytesFwalkTests(FwalkTests):
1258 """Tests for os.walk() with bytes."""
1259 def fwalk(self, top='.', *args, **kwargs):
1260 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1261 root = os.fsdecode(broot)
1262 dirs = list(map(os.fsdecode, bdirs))
1263 files = list(map(os.fsdecode, bfiles))
1264 yield (root, dirs, files, topfd)
1265 bdirs[:] = list(map(os.fsencode, dirs))
1266 bfiles[:] = list(map(os.fsencode, files))
1267
Charles-François Natali7372b062012-02-05 15:15:38 +01001268
Guido van Rossume7ba4952007-06-06 23:52:48 +00001269class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001271 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001272
1273 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001274 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001275 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1276 os.makedirs(path) # Should work
1277 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1278 os.makedirs(path)
1279
1280 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001281 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001282 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1283 os.makedirs(path)
1284 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1285 'dir5', 'dir6')
1286 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001287
Serhiy Storchakae304e332017-03-24 13:27:42 +02001288 def test_mode(self):
1289 with support.temp_umask(0o002):
1290 base = support.TESTFN
1291 parent = os.path.join(base, 'dir1')
1292 path = os.path.join(parent, 'dir2')
1293 os.makedirs(path, 0o555)
1294 self.assertTrue(os.path.exists(path))
1295 self.assertTrue(os.path.isdir(path))
1296 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001297 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1298 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001299
Terry Reedy5a22b652010-12-02 07:05:56 +00001300 def test_exist_ok_existing_directory(self):
1301 path = os.path.join(support.TESTFN, 'dir1')
1302 mode = 0o777
1303 old_mask = os.umask(0o022)
1304 os.makedirs(path, mode)
1305 self.assertRaises(OSError, os.makedirs, path, mode)
1306 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001307 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001308 os.makedirs(path, mode=mode, exist_ok=True)
1309 os.umask(old_mask)
1310
Martin Pantera82642f2015-11-19 04:48:44 +00001311 # Issue #25583: A drive root could raise PermissionError on Windows
1312 os.makedirs(os.path.abspath('/'), exist_ok=True)
1313
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001314 def test_exist_ok_s_isgid_directory(self):
1315 path = os.path.join(support.TESTFN, 'dir1')
1316 S_ISGID = stat.S_ISGID
1317 mode = 0o777
1318 old_mask = os.umask(0o022)
1319 try:
1320 existing_testfn_mode = stat.S_IMODE(
1321 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001322 try:
1323 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001324 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001325 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001326 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1327 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1328 # The os should apply S_ISGID from the parent dir for us, but
1329 # this test need not depend on that behavior. Be explicit.
1330 os.makedirs(path, mode | S_ISGID)
1331 # http://bugs.python.org/issue14992
1332 # Should not fail when the bit is already set.
1333 os.makedirs(path, mode, exist_ok=True)
1334 # remove the bit.
1335 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001336 # May work even when the bit is not already set when demanded.
1337 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001338 finally:
1339 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001340
1341 def test_exist_ok_existing_regular_file(self):
1342 base = support.TESTFN
1343 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001344 with open(path, 'w') as f:
1345 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001346 self.assertRaises(OSError, os.makedirs, path)
1347 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1348 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1349 os.remove(path)
1350
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001351 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001352 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001353 'dir4', 'dir5', 'dir6')
1354 # If the tests failed, the bottom-most directory ('../dir6')
1355 # may not have been created, so we look for the outermost directory
1356 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001357 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001358 path = os.path.dirname(path)
1359
1360 os.removedirs(path)
1361
Andrew Svetlov405faed2012-12-25 12:18:09 +02001362
R David Murrayf2ad1732014-12-25 18:36:56 -05001363@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1364class ChownFileTests(unittest.TestCase):
1365
Berker Peksag036a71b2015-07-21 09:29:48 +03001366 @classmethod
1367 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001368 os.mkdir(support.TESTFN)
1369
1370 def test_chown_uid_gid_arguments_must_be_index(self):
1371 stat = os.stat(support.TESTFN)
1372 uid = stat.st_uid
1373 gid = stat.st_gid
1374 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1375 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1376 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1377 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1378 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1379
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001380 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1381 def test_chown_gid(self):
1382 groups = os.getgroups()
1383 if len(groups) < 2:
1384 self.skipTest("test needs at least 2 groups")
1385
R David Murrayf2ad1732014-12-25 18:36:56 -05001386 gid_1, gid_2 = groups[:2]
1387 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001388
R David Murrayf2ad1732014-12-25 18:36:56 -05001389 os.chown(support.TESTFN, uid, gid_1)
1390 gid = os.stat(support.TESTFN).st_gid
1391 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001392
R David Murrayf2ad1732014-12-25 18:36:56 -05001393 os.chown(support.TESTFN, uid, gid_2)
1394 gid = os.stat(support.TESTFN).st_gid
1395 self.assertEqual(gid, gid_2)
1396
1397 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1398 "test needs root privilege and more than one user")
1399 def test_chown_with_root(self):
1400 uid_1, uid_2 = all_users[:2]
1401 gid = os.stat(support.TESTFN).st_gid
1402 os.chown(support.TESTFN, uid_1, gid)
1403 uid = os.stat(support.TESTFN).st_uid
1404 self.assertEqual(uid, uid_1)
1405 os.chown(support.TESTFN, uid_2, gid)
1406 uid = os.stat(support.TESTFN).st_uid
1407 self.assertEqual(uid, uid_2)
1408
1409 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1410 "test needs non-root account and more than one user")
1411 def test_chown_without_permission(self):
1412 uid_1, uid_2 = all_users[:2]
1413 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001414 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001415 os.chown(support.TESTFN, uid_1, gid)
1416 os.chown(support.TESTFN, uid_2, gid)
1417
Berker Peksag036a71b2015-07-21 09:29:48 +03001418 @classmethod
1419 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001420 os.rmdir(support.TESTFN)
1421
1422
Andrew Svetlov405faed2012-12-25 12:18:09 +02001423class RemoveDirsTests(unittest.TestCase):
1424 def setUp(self):
1425 os.makedirs(support.TESTFN)
1426
1427 def tearDown(self):
1428 support.rmtree(support.TESTFN)
1429
1430 def test_remove_all(self):
1431 dira = os.path.join(support.TESTFN, 'dira')
1432 os.mkdir(dira)
1433 dirb = os.path.join(dira, 'dirb')
1434 os.mkdir(dirb)
1435 os.removedirs(dirb)
1436 self.assertFalse(os.path.exists(dirb))
1437 self.assertFalse(os.path.exists(dira))
1438 self.assertFalse(os.path.exists(support.TESTFN))
1439
1440 def test_remove_partial(self):
1441 dira = os.path.join(support.TESTFN, 'dira')
1442 os.mkdir(dira)
1443 dirb = os.path.join(dira, 'dirb')
1444 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001445 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001446 os.removedirs(dirb)
1447 self.assertFalse(os.path.exists(dirb))
1448 self.assertTrue(os.path.exists(dira))
1449 self.assertTrue(os.path.exists(support.TESTFN))
1450
1451 def test_remove_nothing(self):
1452 dira = os.path.join(support.TESTFN, 'dira')
1453 os.mkdir(dira)
1454 dirb = os.path.join(dira, 'dirb')
1455 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001456 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001457 with self.assertRaises(OSError):
1458 os.removedirs(dirb)
1459 self.assertTrue(os.path.exists(dirb))
1460 self.assertTrue(os.path.exists(dira))
1461 self.assertTrue(os.path.exists(support.TESTFN))
1462
1463
Guido van Rossume7ba4952007-06-06 23:52:48 +00001464class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001465 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001466 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001467 f.write(b'hello')
1468 f.close()
1469 with open(os.devnull, 'rb') as f:
1470 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001471
Andrew Svetlov405faed2012-12-25 12:18:09 +02001472
Guido van Rossume7ba4952007-06-06 23:52:48 +00001473class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001474 def test_urandom_length(self):
1475 self.assertEqual(len(os.urandom(0)), 0)
1476 self.assertEqual(len(os.urandom(1)), 1)
1477 self.assertEqual(len(os.urandom(10)), 10)
1478 self.assertEqual(len(os.urandom(100)), 100)
1479 self.assertEqual(len(os.urandom(1000)), 1000)
1480
1481 def test_urandom_value(self):
1482 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001483 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001484 data2 = os.urandom(16)
1485 self.assertNotEqual(data1, data2)
1486
1487 def get_urandom_subprocess(self, count):
1488 code = '\n'.join((
1489 'import os, sys',
1490 'data = os.urandom(%s)' % count,
1491 'sys.stdout.buffer.write(data)',
1492 'sys.stdout.buffer.flush()'))
1493 out = assert_python_ok('-c', code)
1494 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001495 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001496 return stdout
1497
1498 def test_urandom_subprocess(self):
1499 data1 = self.get_urandom_subprocess(16)
1500 data2 = self.get_urandom_subprocess(16)
1501 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001502
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001503
Victor Stinner9b1f4742016-09-06 16:18:52 -07001504@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1505class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001506 @classmethod
1507 def setUpClass(cls):
1508 try:
1509 os.getrandom(1)
1510 except OSError as exc:
1511 if exc.errno == errno.ENOSYS:
1512 # Python compiled on a more recent Linux version
1513 # than the current Linux kernel
1514 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1515 else:
1516 raise
1517
Victor Stinner9b1f4742016-09-06 16:18:52 -07001518 def test_getrandom_type(self):
1519 data = os.getrandom(16)
1520 self.assertIsInstance(data, bytes)
1521 self.assertEqual(len(data), 16)
1522
1523 def test_getrandom0(self):
1524 empty = os.getrandom(0)
1525 self.assertEqual(empty, b'')
1526
1527 def test_getrandom_random(self):
1528 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1529
1530 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1531 # resource /dev/random
1532
1533 def test_getrandom_nonblock(self):
1534 # The call must not fail. Check also that the flag exists
1535 try:
1536 os.getrandom(1, os.GRND_NONBLOCK)
1537 except BlockingIOError:
1538 # System urandom is not initialized yet
1539 pass
1540
1541 def test_getrandom_value(self):
1542 data1 = os.getrandom(16)
1543 data2 = os.getrandom(16)
1544 self.assertNotEqual(data1, data2)
1545
1546
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001547# os.urandom() doesn't use a file descriptor when it is implemented with the
1548# getentropy() function, the getrandom() function or the getrandom() syscall
1549OS_URANDOM_DONT_USE_FD = (
1550 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1551 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1552 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001553
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001554@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1555 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001556@unittest.skipIf(sys.platform == "vxworks",
1557 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001558class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001559 @unittest.skipUnless(resource, "test requires the resource module")
1560 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001561 # Check urandom() failing when it is not able to open /dev/random.
1562 # We spawn a new process to make the test more robust (if getrlimit()
1563 # failed to restore the file descriptor limit after this, the whole
1564 # test suite would crash; this actually happened on the OS X Tiger
1565 # buildbot).
1566 code = """if 1:
1567 import errno
1568 import os
1569 import resource
1570
1571 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1572 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1573 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001574 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001575 except OSError as e:
1576 assert e.errno == errno.EMFILE, e.errno
1577 else:
1578 raise AssertionError("OSError not raised")
1579 """
1580 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001581
Antoine Pitroue472aea2014-04-26 14:33:03 +02001582 def test_urandom_fd_closed(self):
1583 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1584 # closed.
1585 code = """if 1:
1586 import os
1587 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001588 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001589 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001590 with test.support.SuppressCrashReport():
1591 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001592 sys.stdout.buffer.write(os.urandom(4))
1593 """
1594 rc, out, err = assert_python_ok('-Sc', code)
1595
1596 def test_urandom_fd_reopened(self):
1597 # Issue #21207: urandom() should detect its fd to /dev/urandom
1598 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001599 self.addCleanup(support.unlink, support.TESTFN)
1600 create_file(support.TESTFN, b"x" * 256)
1601
Antoine Pitroue472aea2014-04-26 14:33:03 +02001602 code = """if 1:
1603 import os
1604 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001605 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001606 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001607 with test.support.SuppressCrashReport():
1608 for fd in range(3, 256):
1609 try:
1610 os.close(fd)
1611 except OSError:
1612 pass
1613 else:
1614 # Found the urandom fd (XXX hopefully)
1615 break
1616 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001617 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001618 new_fd = f.fileno()
1619 # Issue #26935: posix allows new_fd and fd to be equal but
1620 # some libc implementations have dup2 return an error in this
1621 # case.
1622 if new_fd != fd:
1623 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001624 sys.stdout.buffer.write(os.urandom(4))
1625 sys.stdout.buffer.write(os.urandom(4))
1626 """.format(TESTFN=support.TESTFN)
1627 rc, out, err = assert_python_ok('-Sc', code)
1628 self.assertEqual(len(out), 8)
1629 self.assertNotEqual(out[0:4], out[4:8])
1630 rc, out2, err2 = assert_python_ok('-Sc', code)
1631 self.assertEqual(len(out2), 8)
1632 self.assertNotEqual(out2, out)
1633
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001634
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001635@contextlib.contextmanager
1636def _execvpe_mockup(defpath=None):
1637 """
1638 Stubs out execv and execve functions when used as context manager.
1639 Records exec calls. The mock execv and execve functions always raise an
1640 exception as they would normally never return.
1641 """
1642 # A list of tuples containing (function name, first arg, args)
1643 # of calls to execv or execve that have been made.
1644 calls = []
1645
1646 def mock_execv(name, *args):
1647 calls.append(('execv', name, args))
1648 raise RuntimeError("execv called")
1649
1650 def mock_execve(name, *args):
1651 calls.append(('execve', name, args))
1652 raise OSError(errno.ENOTDIR, "execve called")
1653
1654 try:
1655 orig_execv = os.execv
1656 orig_execve = os.execve
1657 orig_defpath = os.defpath
1658 os.execv = mock_execv
1659 os.execve = mock_execve
1660 if defpath is not None:
1661 os.defpath = defpath
1662 yield calls
1663 finally:
1664 os.execv = orig_execv
1665 os.execve = orig_execve
1666 os.defpath = orig_defpath
1667
pxinwrf2d7ac72019-05-21 18:46:37 +08001668@unittest.skipUnless(hasattr(os, 'execv'),
1669 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001670class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001671 @unittest.skipIf(USING_LINUXTHREADS,
1672 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001673 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001674 self.assertRaises(OSError, os.execvpe, 'no such app-',
1675 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001676
Steve Dowerbce26262016-11-19 19:17:26 -08001677 def test_execv_with_bad_arglist(self):
1678 self.assertRaises(ValueError, os.execv, 'notepad', ())
1679 self.assertRaises(ValueError, os.execv, 'notepad', [])
1680 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1681 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1682
Thomas Heller6790d602007-08-30 17:15:14 +00001683 def test_execvpe_with_bad_arglist(self):
1684 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001685 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1686 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001687
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001688 @unittest.skipUnless(hasattr(os, '_execvpe'),
1689 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001690 def _test_internal_execvpe(self, test_type):
1691 program_path = os.sep + 'absolutepath'
1692 if test_type is bytes:
1693 program = b'executable'
1694 fullpath = os.path.join(os.fsencode(program_path), program)
1695 native_fullpath = fullpath
1696 arguments = [b'progname', 'arg1', 'arg2']
1697 else:
1698 program = 'executable'
1699 arguments = ['progname', 'arg1', 'arg2']
1700 fullpath = os.path.join(program_path, program)
1701 if os.name != "nt":
1702 native_fullpath = os.fsencode(fullpath)
1703 else:
1704 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001705 env = {'spam': 'beans'}
1706
Victor Stinnerb745a742010-05-18 17:17:23 +00001707 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001708 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001709 self.assertRaises(RuntimeError,
1710 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001711 self.assertEqual(len(calls), 1)
1712 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1713
Victor Stinnerb745a742010-05-18 17:17:23 +00001714 # test os._execvpe() with a relative path:
1715 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001716 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001717 self.assertRaises(OSError,
1718 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001719 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001720 self.assertSequenceEqual(calls[0],
1721 ('execve', native_fullpath, (arguments, env)))
1722
1723 # test os._execvpe() with a relative path:
1724 # os.get_exec_path() reads the 'PATH' variable
1725 with _execvpe_mockup() as calls:
1726 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001727 if test_type is bytes:
1728 env_path[b'PATH'] = program_path
1729 else:
1730 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001731 self.assertRaises(OSError,
1732 os._execvpe, program, arguments, env=env_path)
1733 self.assertEqual(len(calls), 1)
1734 self.assertSequenceEqual(calls[0],
1735 ('execve', native_fullpath, (arguments, env_path)))
1736
1737 def test_internal_execvpe_str(self):
1738 self._test_internal_execvpe(str)
1739 if os.name != "nt":
1740 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001741
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001742 def test_execve_invalid_env(self):
1743 args = [sys.executable, '-c', 'pass']
1744
Ville Skyttä49b27342017-08-03 09:00:59 +03001745 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001746 newenv = os.environ.copy()
1747 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1748 with self.assertRaises(ValueError):
1749 os.execve(args[0], args, newenv)
1750
Ville Skyttä49b27342017-08-03 09:00:59 +03001751 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001752 newenv = os.environ.copy()
1753 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1754 with self.assertRaises(ValueError):
1755 os.execve(args[0], args, newenv)
1756
Ville Skyttä49b27342017-08-03 09:00:59 +03001757 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001758 newenv = os.environ.copy()
1759 newenv["FRUIT=ORANGE"] = "lemon"
1760 with self.assertRaises(ValueError):
1761 os.execve(args[0], args, newenv)
1762
Alexey Izbyshev83460312018-10-20 03:28:22 +03001763 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1764 def test_execve_with_empty_path(self):
1765 # bpo-32890: Check GetLastError() misuse
1766 try:
1767 os.execve('', ['arg'], {})
1768 except OSError as e:
1769 self.assertTrue(e.winerror is None or e.winerror != 0)
1770 else:
1771 self.fail('No OSError raised')
1772
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001773
Serhiy Storchaka43767632013-11-03 21:31:38 +02001774@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001775class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001776 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001777 try:
1778 os.stat(support.TESTFN)
1779 except FileNotFoundError:
1780 exists = False
1781 except OSError as exc:
1782 exists = True
1783 self.fail("file %s must not exist; os.stat failed with %s"
1784 % (support.TESTFN, exc))
1785 else:
1786 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001787
Thomas Wouters477c8d52006-05-27 19:21:47 +00001788 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001789 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001790
1791 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001792 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001793
1794 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001795 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001796
1797 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001798 self.addCleanup(support.unlink, support.TESTFN)
1799
Victor Stinnere77c9742016-03-25 10:28:23 +01001800 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001801 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001802
1803 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001804 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001805
Thomas Wouters477c8d52006-05-27 19:21:47 +00001806 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001807 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001808
Victor Stinnere77c9742016-03-25 10:28:23 +01001809
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001810class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001811 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001812 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1813 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001814 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001815 def get_single(f):
1816 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001817 if hasattr(os, f):
1818 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001819 return helper
1820 for f in singles:
1821 locals()["test_"+f] = get_single(f)
1822
Benjamin Peterson7522c742009-01-19 21:00:09 +00001823 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001824 try:
1825 f(support.make_bad_fd(), *args)
1826 except OSError as e:
1827 self.assertEqual(e.errno, errno.EBADF)
1828 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001829 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001830 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001831
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001833 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001834 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001835
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001837 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001838 fd = support.make_bad_fd()
1839 # Make sure none of the descriptors we are about to close are
1840 # currently valid (issue 6542).
1841 for i in range(10):
1842 try: os.fstat(fd+i)
1843 except OSError:
1844 pass
1845 else:
1846 break
1847 if i < 2:
1848 raise unittest.SkipTest(
1849 "Unable to acquire a range of invalid file descriptors")
1850 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001851
Serhiy Storchaka43767632013-11-03 21:31:38 +02001852 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001853 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001854 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001855
Serhiy Storchaka43767632013-11-03 21:31:38 +02001856 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001857 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001858 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001859
Serhiy Storchaka43767632013-11-03 21:31:38 +02001860 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001861 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001862 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001863
Serhiy Storchaka43767632013-11-03 21:31:38 +02001864 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001865 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001866 self.check(os.pathconf, "PC_NAME_MAX")
1867 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001868
Serhiy Storchaka43767632013-11-03 21:31:38 +02001869 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001870 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001871 self.check(os.truncate, 0)
1872 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001873
Serhiy Storchaka43767632013-11-03 21:31:38 +02001874 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001875 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001876 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001877
Serhiy Storchaka43767632013-11-03 21:31:38 +02001878 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001879 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001880 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001881
Victor Stinner57ddf782014-01-08 15:21:28 +01001882 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1883 def test_readv(self):
1884 buf = bytearray(10)
1885 self.check(os.readv, [buf])
1886
Serhiy Storchaka43767632013-11-03 21:31:38 +02001887 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001888 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001889 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001890
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001892 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001893 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001894
Victor Stinner57ddf782014-01-08 15:21:28 +01001895 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1896 def test_writev(self):
1897 self.check(os.writev, [b'abc'])
1898
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001899 def test_inheritable(self):
1900 self.check(os.get_inheritable)
1901 self.check(os.set_inheritable, True)
1902
1903 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1904 'needs os.get_blocking() and os.set_blocking()')
1905 def test_blocking(self):
1906 self.check(os.get_blocking)
1907 self.check(os.set_blocking, True)
1908
Brian Curtin1b9df392010-11-24 20:24:31 +00001909
1910class LinkTests(unittest.TestCase):
1911 def setUp(self):
1912 self.file1 = support.TESTFN
1913 self.file2 = os.path.join(support.TESTFN + "2")
1914
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001915 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001916 for file in (self.file1, self.file2):
1917 if os.path.exists(file):
1918 os.unlink(file)
1919
Brian Curtin1b9df392010-11-24 20:24:31 +00001920 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001921 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001922
xdegaye6a55d092017-11-12 17:57:04 +01001923 try:
1924 os.link(file1, file2)
1925 except PermissionError as e:
1926 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001927 with open(file1, "r") as f1, open(file2, "r") as f2:
1928 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1929
1930 def test_link(self):
1931 self._test_link(self.file1, self.file2)
1932
1933 def test_link_bytes(self):
1934 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1935 bytes(self.file2, sys.getfilesystemencoding()))
1936
Brian Curtinf498b752010-11-30 15:54:04 +00001937 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001938 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001939 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001940 except UnicodeError:
1941 raise unittest.SkipTest("Unable to encode for this platform.")
1942
Brian Curtinf498b752010-11-30 15:54:04 +00001943 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001944 self.file2 = self.file1 + "2"
1945 self._test_link(self.file1, self.file2)
1946
Serhiy Storchaka43767632013-11-03 21:31:38 +02001947@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1948class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001949 # uid_t and gid_t are 32-bit unsigned integers on Linux
1950 UID_OVERFLOW = (1 << 32)
1951 GID_OVERFLOW = (1 << 32)
1952
Serhiy Storchaka43767632013-11-03 21:31:38 +02001953 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1954 def test_setuid(self):
1955 if os.getuid() != 0:
1956 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001957 self.assertRaises(TypeError, os.setuid, 'not an int')
1958 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001959
Serhiy Storchaka43767632013-11-03 21:31:38 +02001960 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1961 def test_setgid(self):
1962 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1963 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001964 self.assertRaises(TypeError, os.setgid, 'not an int')
1965 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001966
Serhiy Storchaka43767632013-11-03 21:31:38 +02001967 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1968 def test_seteuid(self):
1969 if os.getuid() != 0:
1970 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001971 self.assertRaises(TypeError, os.setegid, 'not an int')
1972 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001973
Serhiy Storchaka43767632013-11-03 21:31:38 +02001974 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1975 def test_setegid(self):
1976 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1977 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001978 self.assertRaises(TypeError, os.setegid, 'not an int')
1979 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001980
Serhiy Storchaka43767632013-11-03 21:31:38 +02001981 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1982 def test_setreuid(self):
1983 if os.getuid() != 0:
1984 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001985 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1986 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1987 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1988 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001989
Serhiy Storchaka43767632013-11-03 21:31:38 +02001990 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1991 def test_setreuid_neg1(self):
1992 # Needs to accept -1. We run this in a subprocess to avoid
1993 # altering the test runner's process state (issue8045).
1994 subprocess.check_call([
1995 sys.executable, '-c',
1996 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001997
Serhiy Storchaka43767632013-11-03 21:31:38 +02001998 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1999 def test_setregid(self):
2000 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2001 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002002 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2003 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2004 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2005 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002006
Serhiy Storchaka43767632013-11-03 21:31:38 +02002007 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2008 def test_setregid_neg1(self):
2009 # Needs to accept -1. We run this in a subprocess to avoid
2010 # altering the test runner's process state (issue8045).
2011 subprocess.check_call([
2012 sys.executable, '-c',
2013 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002014
Serhiy Storchaka43767632013-11-03 21:31:38 +02002015@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2016class Pep383Tests(unittest.TestCase):
2017 def setUp(self):
2018 if support.TESTFN_UNENCODABLE:
2019 self.dir = support.TESTFN_UNENCODABLE
2020 elif support.TESTFN_NONASCII:
2021 self.dir = support.TESTFN_NONASCII
2022 else:
2023 self.dir = support.TESTFN
2024 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002025
Serhiy Storchaka43767632013-11-03 21:31:38 +02002026 bytesfn = []
2027 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002028 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002029 fn = os.fsencode(fn)
2030 except UnicodeEncodeError:
2031 return
2032 bytesfn.append(fn)
2033 add_filename(support.TESTFN_UNICODE)
2034 if support.TESTFN_UNENCODABLE:
2035 add_filename(support.TESTFN_UNENCODABLE)
2036 if support.TESTFN_NONASCII:
2037 add_filename(support.TESTFN_NONASCII)
2038 if not bytesfn:
2039 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002040
Serhiy Storchaka43767632013-11-03 21:31:38 +02002041 self.unicodefn = set()
2042 os.mkdir(self.dir)
2043 try:
2044 for fn in bytesfn:
2045 support.create_empty_file(os.path.join(self.bdir, fn))
2046 fn = os.fsdecode(fn)
2047 if fn in self.unicodefn:
2048 raise ValueError("duplicate filename")
2049 self.unicodefn.add(fn)
2050 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002051 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002052 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002053
Serhiy Storchaka43767632013-11-03 21:31:38 +02002054 def tearDown(self):
2055 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002056
Serhiy Storchaka43767632013-11-03 21:31:38 +02002057 def test_listdir(self):
2058 expected = self.unicodefn
2059 found = set(os.listdir(self.dir))
2060 self.assertEqual(found, expected)
2061 # test listdir without arguments
2062 current_directory = os.getcwd()
2063 try:
2064 os.chdir(os.sep)
2065 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2066 finally:
2067 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002068
Serhiy Storchaka43767632013-11-03 21:31:38 +02002069 def test_open(self):
2070 for fn in self.unicodefn:
2071 f = open(os.path.join(self.dir, fn), 'rb')
2072 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002073
Serhiy Storchaka43767632013-11-03 21:31:38 +02002074 @unittest.skipUnless(hasattr(os, 'statvfs'),
2075 "need os.statvfs()")
2076 def test_statvfs(self):
2077 # issue #9645
2078 for fn in self.unicodefn:
2079 # should not fail with file not found error
2080 fullname = os.path.join(self.dir, fn)
2081 os.statvfs(fullname)
2082
2083 def test_stat(self):
2084 for fn in self.unicodefn:
2085 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002086
Brian Curtineb24d742010-04-12 17:16:38 +00002087@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2088class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002089 def _kill(self, sig):
2090 # Start sys.executable as a subprocess and communicate from the
2091 # subprocess to the parent that the interpreter is ready. When it
2092 # becomes ready, send *sig* via os.kill to the subprocess and check
2093 # that the return code is equal to *sig*.
2094 import ctypes
2095 from ctypes import wintypes
2096 import msvcrt
2097
2098 # Since we can't access the contents of the process' stdout until the
2099 # process has exited, use PeekNamedPipe to see what's inside stdout
2100 # without waiting. This is done so we can tell that the interpreter
2101 # is started and running at a point where it could handle a signal.
2102 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2103 PeekNamedPipe.restype = wintypes.BOOL
2104 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2105 ctypes.POINTER(ctypes.c_char), # stdout buf
2106 wintypes.DWORD, # Buffer size
2107 ctypes.POINTER(wintypes.DWORD), # bytes read
2108 ctypes.POINTER(wintypes.DWORD), # bytes avail
2109 ctypes.POINTER(wintypes.DWORD)) # bytes left
2110 msg = "running"
2111 proc = subprocess.Popen([sys.executable, "-c",
2112 "import sys;"
2113 "sys.stdout.write('{}');"
2114 "sys.stdout.flush();"
2115 "input()".format(msg)],
2116 stdout=subprocess.PIPE,
2117 stderr=subprocess.PIPE,
2118 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002119 self.addCleanup(proc.stdout.close)
2120 self.addCleanup(proc.stderr.close)
2121 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002122
2123 count, max = 0, 100
2124 while count < max and proc.poll() is None:
2125 # Create a string buffer to store the result of stdout from the pipe
2126 buf = ctypes.create_string_buffer(len(msg))
2127 # Obtain the text currently in proc.stdout
2128 # Bytes read/avail/left are left as NULL and unused
2129 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2130 buf, ctypes.sizeof(buf), None, None, None)
2131 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2132 if buf.value:
2133 self.assertEqual(msg, buf.value.decode())
2134 break
2135 time.sleep(0.1)
2136 count += 1
2137 else:
2138 self.fail("Did not receive communication from the subprocess")
2139
Brian Curtineb24d742010-04-12 17:16:38 +00002140 os.kill(proc.pid, sig)
2141 self.assertEqual(proc.wait(), sig)
2142
2143 def test_kill_sigterm(self):
2144 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002145 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002146
2147 def test_kill_int(self):
2148 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002149 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002150
2151 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002152 tagname = "test_os_%s" % uuid.uuid1()
2153 m = mmap.mmap(-1, 1, tagname)
2154 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002155 # Run a script which has console control handling enabled.
2156 proc = subprocess.Popen([sys.executable,
2157 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002158 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002159 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2160 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002161 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002162 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002163 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002164 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002165 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002166 count += 1
2167 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002168 # Forcefully kill the process if we weren't able to signal it.
2169 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002170 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002171 os.kill(proc.pid, event)
2172 # proc.send_signal(event) could also be done here.
2173 # Allow time for the signal to be passed and the process to exit.
2174 time.sleep(0.5)
2175 if not proc.poll():
2176 # Forcefully kill the process if we weren't able to signal it.
2177 os.kill(proc.pid, signal.SIGINT)
2178 self.fail("subprocess did not stop on {}".format(name))
2179
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002180 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002181 def test_CTRL_C_EVENT(self):
2182 from ctypes import wintypes
2183 import ctypes
2184
2185 # Make a NULL value by creating a pointer with no argument.
2186 NULL = ctypes.POINTER(ctypes.c_int)()
2187 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2188 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2189 wintypes.BOOL)
2190 SetConsoleCtrlHandler.restype = wintypes.BOOL
2191
2192 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002193 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002194 # by subprocesses.
2195 SetConsoleCtrlHandler(NULL, 0)
2196
2197 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2198
2199 def test_CTRL_BREAK_EVENT(self):
2200 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2201
2202
Brian Curtind40e6f72010-07-08 21:39:08 +00002203@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002204class Win32ListdirTests(unittest.TestCase):
2205 """Test listdir on Windows."""
2206
2207 def setUp(self):
2208 self.created_paths = []
2209 for i in range(2):
2210 dir_name = 'SUB%d' % i
2211 dir_path = os.path.join(support.TESTFN, dir_name)
2212 file_name = 'FILE%d' % i
2213 file_path = os.path.join(support.TESTFN, file_name)
2214 os.makedirs(dir_path)
2215 with open(file_path, 'w') as f:
2216 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2217 self.created_paths.extend([dir_name, file_name])
2218 self.created_paths.sort()
2219
2220 def tearDown(self):
2221 shutil.rmtree(support.TESTFN)
2222
2223 def test_listdir_no_extended_path(self):
2224 """Test when the path is not an "extended" path."""
2225 # unicode
2226 self.assertEqual(
2227 sorted(os.listdir(support.TESTFN)),
2228 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002229
Tim Golden781bbeb2013-10-25 20:24:06 +01002230 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002231 self.assertEqual(
2232 sorted(os.listdir(os.fsencode(support.TESTFN))),
2233 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002234
2235 def test_listdir_extended_path(self):
2236 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002237 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002238 # unicode
2239 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2240 self.assertEqual(
2241 sorted(os.listdir(path)),
2242 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002243
Tim Golden781bbeb2013-10-25 20:24:06 +01002244 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002245 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2246 self.assertEqual(
2247 sorted(os.listdir(path)),
2248 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002249
2250
Berker Peksage0b5b202018-08-15 13:03:41 +03002251@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2252class ReadlinkTests(unittest.TestCase):
2253 filelink = 'readlinktest'
2254 filelink_target = os.path.abspath(__file__)
2255 filelinkb = os.fsencode(filelink)
2256 filelinkb_target = os.fsencode(filelink_target)
2257
2258 def setUp(self):
2259 self.assertTrue(os.path.exists(self.filelink_target))
2260 self.assertTrue(os.path.exists(self.filelinkb_target))
2261 self.assertFalse(os.path.exists(self.filelink))
2262 self.assertFalse(os.path.exists(self.filelinkb))
2263
2264 def test_not_symlink(self):
2265 filelink_target = FakePath(self.filelink_target)
2266 self.assertRaises(OSError, os.readlink, self.filelink_target)
2267 self.assertRaises(OSError, os.readlink, filelink_target)
2268
2269 def test_missing_link(self):
2270 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2271 self.assertRaises(FileNotFoundError, os.readlink,
2272 FakePath('missing-link'))
2273
2274 @support.skip_unless_symlink
2275 def test_pathlike(self):
2276 os.symlink(self.filelink_target, self.filelink)
2277 self.addCleanup(support.unlink, self.filelink)
2278 filelink = FakePath(self.filelink)
2279 self.assertEqual(os.readlink(filelink), self.filelink_target)
2280
2281 @support.skip_unless_symlink
2282 def test_pathlike_bytes(self):
2283 os.symlink(self.filelinkb_target, self.filelinkb)
2284 self.addCleanup(support.unlink, self.filelinkb)
2285 path = os.readlink(FakePath(self.filelinkb))
2286 self.assertEqual(path, self.filelinkb_target)
2287 self.assertIsInstance(path, bytes)
2288
2289 @support.skip_unless_symlink
2290 def test_bytes(self):
2291 os.symlink(self.filelinkb_target, self.filelinkb)
2292 self.addCleanup(support.unlink, self.filelinkb)
2293 path = os.readlink(self.filelinkb)
2294 self.assertEqual(path, self.filelinkb_target)
2295 self.assertIsInstance(path, bytes)
2296
2297
Tim Golden781bbeb2013-10-25 20:24:06 +01002298@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002299@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002300class Win32SymlinkTests(unittest.TestCase):
2301 filelink = 'filelinktest'
2302 filelink_target = os.path.abspath(__file__)
2303 dirlink = 'dirlinktest'
2304 dirlink_target = os.path.dirname(filelink_target)
2305 missing_link = 'missing link'
2306
2307 def setUp(self):
2308 assert os.path.exists(self.dirlink_target)
2309 assert os.path.exists(self.filelink_target)
2310 assert not os.path.exists(self.dirlink)
2311 assert not os.path.exists(self.filelink)
2312 assert not os.path.exists(self.missing_link)
2313
2314 def tearDown(self):
2315 if os.path.exists(self.filelink):
2316 os.remove(self.filelink)
2317 if os.path.exists(self.dirlink):
2318 os.rmdir(self.dirlink)
2319 if os.path.lexists(self.missing_link):
2320 os.remove(self.missing_link)
2321
2322 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002323 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002324 self.assertTrue(os.path.exists(self.dirlink))
2325 self.assertTrue(os.path.isdir(self.dirlink))
2326 self.assertTrue(os.path.islink(self.dirlink))
2327 self.check_stat(self.dirlink, self.dirlink_target)
2328
2329 def test_file_link(self):
2330 os.symlink(self.filelink_target, self.filelink)
2331 self.assertTrue(os.path.exists(self.filelink))
2332 self.assertTrue(os.path.isfile(self.filelink))
2333 self.assertTrue(os.path.islink(self.filelink))
2334 self.check_stat(self.filelink, self.filelink_target)
2335
2336 def _create_missing_dir_link(self):
2337 'Create a "directory" link to a non-existent target'
2338 linkname = self.missing_link
2339 if os.path.lexists(linkname):
2340 os.remove(linkname)
2341 target = r'c:\\target does not exist.29r3c740'
2342 assert not os.path.exists(target)
2343 target_is_dir = True
2344 os.symlink(target, linkname, target_is_dir)
2345
2346 def test_remove_directory_link_to_missing_target(self):
2347 self._create_missing_dir_link()
2348 # For compatibility with Unix, os.remove will check the
2349 # directory status and call RemoveDirectory if the symlink
2350 # was created with target_is_dir==True.
2351 os.remove(self.missing_link)
2352
2353 @unittest.skip("currently fails; consider for improvement")
2354 def test_isdir_on_directory_link_to_missing_target(self):
2355 self._create_missing_dir_link()
2356 # consider having isdir return true for directory links
2357 self.assertTrue(os.path.isdir(self.missing_link))
2358
2359 @unittest.skip("currently fails; consider for improvement")
2360 def test_rmdir_on_directory_link_to_missing_target(self):
2361 self._create_missing_dir_link()
2362 # consider allowing rmdir to remove directory links
2363 os.rmdir(self.missing_link)
2364
2365 def check_stat(self, link, target):
2366 self.assertEqual(os.stat(link), os.stat(target))
2367 self.assertNotEqual(os.lstat(link), os.stat(link))
2368
Brian Curtind25aef52011-06-13 15:16:04 -05002369 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002370 self.assertEqual(os.stat(bytes_link), os.stat(target))
2371 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002372
2373 def test_12084(self):
2374 level1 = os.path.abspath(support.TESTFN)
2375 level2 = os.path.join(level1, "level2")
2376 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002377 self.addCleanup(support.rmtree, level1)
2378
2379 os.mkdir(level1)
2380 os.mkdir(level2)
2381 os.mkdir(level3)
2382
2383 file1 = os.path.abspath(os.path.join(level1, "file1"))
2384 create_file(file1)
2385
2386 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002387 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002388 os.chdir(level2)
2389 link = os.path.join(level2, "link")
2390 os.symlink(os.path.relpath(file1), "link")
2391 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002392
Victor Stinnerae39d232016-03-24 17:12:55 +01002393 # Check os.stat calls from the same dir as the link
2394 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002395
Victor Stinnerae39d232016-03-24 17:12:55 +01002396 # Check os.stat calls from a dir below the link
2397 os.chdir(level1)
2398 self.assertEqual(os.stat(file1),
2399 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002400
Victor Stinnerae39d232016-03-24 17:12:55 +01002401 # Check os.stat calls from a dir above the link
2402 os.chdir(level3)
2403 self.assertEqual(os.stat(file1),
2404 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002405 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002406 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002407
SSE43c34aad2018-02-13 00:10:35 +07002408 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2409 and os.path.exists(r'C:\ProgramData'),
2410 'Test directories not found')
2411 def test_29248(self):
2412 # os.symlink() calls CreateSymbolicLink, which creates
2413 # the reparse data buffer with the print name stored
2414 # first, so the offset is always 0. CreateSymbolicLink
2415 # stores the "PrintName" DOS path (e.g. "C:\") first,
2416 # with an offset of 0, followed by the "SubstituteName"
2417 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2418 # the other hand, seems to have been created manually
2419 # with an inverted order.
2420 target = os.readlink(r'C:\Users\All Users')
2421 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2422
Steve Dower6921e732018-03-05 14:26:08 -08002423 def test_buffer_overflow(self):
2424 # Older versions would have a buffer overflow when detecting
2425 # whether a link source was a directory. This test ensures we
2426 # no longer crash, but does not otherwise validate the behavior
2427 segment = 'X' * 27
2428 path = os.path.join(*[segment] * 10)
2429 test_cases = [
2430 # overflow with absolute src
2431 ('\\' + path, segment),
2432 # overflow dest with relative src
2433 (segment, path),
2434 # overflow when joining src
2435 (path[:180], path[:180]),
2436 ]
2437 for src, dest in test_cases:
2438 try:
2439 os.symlink(src, dest)
2440 except FileNotFoundError:
2441 pass
2442 else:
2443 try:
2444 os.remove(dest)
2445 except OSError:
2446 pass
2447 # Also test with bytes, since that is a separate code path.
2448 try:
2449 os.symlink(os.fsencode(src), os.fsencode(dest))
2450 except FileNotFoundError:
2451 pass
2452 else:
2453 try:
2454 os.remove(dest)
2455 except OSError:
2456 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002457
Tim Golden0321cf22014-05-05 19:46:17 +01002458@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2459class Win32JunctionTests(unittest.TestCase):
2460 junction = 'junctiontest'
2461 junction_target = os.path.dirname(os.path.abspath(__file__))
2462
2463 def setUp(self):
2464 assert os.path.exists(self.junction_target)
2465 assert not os.path.exists(self.junction)
2466
2467 def tearDown(self):
2468 if os.path.exists(self.junction):
2469 # os.rmdir delegates to Windows' RemoveDirectoryW,
2470 # which removes junction points safely.
2471 os.rmdir(self.junction)
2472
2473 def test_create_junction(self):
2474 _winapi.CreateJunction(self.junction_target, self.junction)
2475 self.assertTrue(os.path.exists(self.junction))
2476 self.assertTrue(os.path.isdir(self.junction))
2477
2478 # Junctions are not recognized as links.
2479 self.assertFalse(os.path.islink(self.junction))
2480
2481 def test_unlink_removes_junction(self):
2482 _winapi.CreateJunction(self.junction_target, self.junction)
2483 self.assertTrue(os.path.exists(self.junction))
2484
2485 os.unlink(self.junction)
2486 self.assertFalse(os.path.exists(self.junction))
2487
Mark Becwarb82bfac2019-02-02 16:08:23 -05002488@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2489class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002490 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002491 nt = support.import_module('nt')
2492 ctypes = support.import_module('ctypes')
2493 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002494
2495 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2496 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2497
2498 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2499 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2500 ctypes.wintypes.LPDWORD)
2501
2502 # This is a pseudo-handle that doesn't need to be closed
2503 hproc = kernel.GetCurrentProcess()
2504
2505 handle_count = ctypes.wintypes.DWORD()
2506 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2507 self.assertEqual(1, ok)
2508
2509 before_count = handle_count.value
2510
2511 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002512 filenames = [
2513 r'\\?\C:',
2514 r'\\?\NUL',
2515 r'\\?\CONIN',
2516 __file__,
2517 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002518
Berker Peksag6ef726a2019-04-22 18:46:28 +03002519 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002520 for name in filenames:
2521 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002522 nt._getfinalpathname(name)
2523 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002524 # Failure is expected
2525 pass
2526 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002527 os.stat(name)
2528 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002529 pass
2530
2531 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2532 self.assertEqual(1, ok)
2533
2534 handle_delta = handle_count.value - before_count
2535
2536 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002537
Jason R. Coombs3a092862013-05-27 23:21:28 -04002538@support.skip_unless_symlink
2539class NonLocalSymlinkTests(unittest.TestCase):
2540
2541 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002542 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002543 Create this structure:
2544
2545 base
2546 \___ some_dir
2547 """
2548 os.makedirs('base/some_dir')
2549
2550 def tearDown(self):
2551 shutil.rmtree('base')
2552
2553 def test_directory_link_nonlocal(self):
2554 """
2555 The symlink target should resolve relative to the link, not relative
2556 to the current directory.
2557
2558 Then, link base/some_link -> base/some_dir and ensure that some_link
2559 is resolved as a directory.
2560
2561 In issue13772, it was discovered that directory detection failed if
2562 the symlink target was not specified relative to the current
2563 directory, which was a defect in the implementation.
2564 """
2565 src = os.path.join('base', 'some_link')
2566 os.symlink('some_dir', src)
2567 assert os.path.isdir(src)
2568
2569
Victor Stinnere8d51452010-08-19 01:05:19 +00002570class FSEncodingTests(unittest.TestCase):
2571 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002572 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2573 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002574
Victor Stinnere8d51452010-08-19 01:05:19 +00002575 def test_identity(self):
2576 # assert fsdecode(fsencode(x)) == x
2577 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2578 try:
2579 bytesfn = os.fsencode(fn)
2580 except UnicodeEncodeError:
2581 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002582 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002583
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002584
Brett Cannonefb00c02012-02-29 18:31:31 -05002585
2586class DeviceEncodingTests(unittest.TestCase):
2587
2588 def test_bad_fd(self):
2589 # Return None when an fd doesn't actually exist.
2590 self.assertIsNone(os.device_encoding(123456))
2591
Paul Monson62dfd7d2019-04-25 11:36:45 -07002592 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002593 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002594 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002595 def test_device_encoding(self):
2596 encoding = os.device_encoding(0)
2597 self.assertIsNotNone(encoding)
2598 self.assertTrue(codecs.lookup(encoding))
2599
2600
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002601class PidTests(unittest.TestCase):
2602 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2603 def test_getppid(self):
2604 p = subprocess.Popen([sys.executable, '-c',
2605 'import os; print(os.getppid())'],
2606 stdout=subprocess.PIPE)
2607 stdout, _ = p.communicate()
2608 # We are the parent of our subprocess
2609 self.assertEqual(int(stdout), os.getpid())
2610
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002611 def test_waitpid(self):
2612 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002613 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002614 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002615 status = os.waitpid(pid, 0)
2616 self.assertEqual(status, (pid, 0))
2617
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002618
Victor Stinner4659ccf2016-09-14 10:57:00 +02002619class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002620 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002621 self.exitcode = 17
2622
2623 filename = support.TESTFN
2624 self.addCleanup(support.unlink, filename)
2625
2626 if not with_env:
2627 code = 'import sys; sys.exit(%s)' % self.exitcode
2628 else:
2629 self.env = dict(os.environ)
2630 # create an unique key
2631 self.key = str(uuid.uuid4())
2632 self.env[self.key] = self.key
2633 # read the variable from os.environ to check that it exists
2634 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2635 % (self.key, self.exitcode))
2636
2637 with open(filename, "w") as fp:
2638 fp.write(code)
2639
Berker Peksag81816462016-09-15 20:19:47 +03002640 args = [sys.executable, filename]
2641 if use_bytes:
2642 args = [os.fsencode(a) for a in args]
2643 self.env = {os.fsencode(k): os.fsencode(v)
2644 for k, v in self.env.items()}
2645
2646 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002647
Berker Peksag4af23d72016-09-15 20:32:44 +03002648 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002649 def test_spawnl(self):
2650 args = self.create_args()
2651 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2652 self.assertEqual(exitcode, self.exitcode)
2653
Berker Peksag4af23d72016-09-15 20:32:44 +03002654 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002655 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002656 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002657 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2658 self.assertEqual(exitcode, self.exitcode)
2659
Berker Peksag4af23d72016-09-15 20:32:44 +03002660 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002661 def test_spawnlp(self):
2662 args = self.create_args()
2663 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2664 self.assertEqual(exitcode, self.exitcode)
2665
Berker Peksag4af23d72016-09-15 20:32:44 +03002666 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002667 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002668 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002669 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2670 self.assertEqual(exitcode, self.exitcode)
2671
Berker Peksag4af23d72016-09-15 20:32:44 +03002672 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002673 def test_spawnv(self):
2674 args = self.create_args()
2675 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2676 self.assertEqual(exitcode, self.exitcode)
2677
Berker Peksag4af23d72016-09-15 20:32:44 +03002678 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002679 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002680 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002681 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2682 self.assertEqual(exitcode, self.exitcode)
2683
Berker Peksag4af23d72016-09-15 20:32:44 +03002684 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002685 def test_spawnvp(self):
2686 args = self.create_args()
2687 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2688 self.assertEqual(exitcode, self.exitcode)
2689
Berker Peksag4af23d72016-09-15 20:32:44 +03002690 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002691 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002692 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002693 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2694 self.assertEqual(exitcode, self.exitcode)
2695
Berker Peksag4af23d72016-09-15 20:32:44 +03002696 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002697 def test_nowait(self):
2698 args = self.create_args()
2699 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2700 result = os.waitpid(pid, 0)
2701 self.assertEqual(result[0], pid)
2702 status = result[1]
2703 if hasattr(os, 'WIFEXITED'):
2704 self.assertTrue(os.WIFEXITED(status))
2705 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2706 else:
2707 self.assertEqual(status, self.exitcode << 8)
2708
Berker Peksag4af23d72016-09-15 20:32:44 +03002709 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002710 def test_spawnve_bytes(self):
2711 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2712 args = self.create_args(with_env=True, use_bytes=True)
2713 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2714 self.assertEqual(exitcode, self.exitcode)
2715
Steve Dower859fd7b2016-11-19 18:53:19 -08002716 @requires_os_func('spawnl')
2717 def test_spawnl_noargs(self):
2718 args = self.create_args()
2719 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002720 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002721
2722 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002723 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002724 args = self.create_args()
2725 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002726 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002727
2728 @requires_os_func('spawnv')
2729 def test_spawnv_noargs(self):
2730 args = self.create_args()
2731 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2732 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002733 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2734 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002735
2736 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002737 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002738 args = self.create_args()
2739 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2740 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002741 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2742 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002743
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002744 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002745 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002746
Ville Skyttä49b27342017-08-03 09:00:59 +03002747 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002748 newenv = os.environ.copy()
2749 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2750 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002751 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002752 except ValueError:
2753 pass
2754 else:
2755 self.assertEqual(exitcode, 127)
2756
Ville Skyttä49b27342017-08-03 09:00:59 +03002757 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002758 newenv = os.environ.copy()
2759 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2760 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002761 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002762 except ValueError:
2763 pass
2764 else:
2765 self.assertEqual(exitcode, 127)
2766
Ville Skyttä49b27342017-08-03 09:00:59 +03002767 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002768 newenv = os.environ.copy()
2769 newenv["FRUIT=ORANGE"] = "lemon"
2770 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002771 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002772 except ValueError:
2773 pass
2774 else:
2775 self.assertEqual(exitcode, 127)
2776
Ville Skyttä49b27342017-08-03 09:00:59 +03002777 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002778 filename = support.TESTFN
2779 self.addCleanup(support.unlink, filename)
2780 with open(filename, "w") as fp:
2781 fp.write('import sys, os\n'
2782 'if os.getenv("FRUIT") != "orange=lemon":\n'
2783 ' raise AssertionError')
2784 args = [sys.executable, filename]
2785 newenv = os.environ.copy()
2786 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002787 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002788 self.assertEqual(exitcode, 0)
2789
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002790 @requires_os_func('spawnve')
2791 def test_spawnve_invalid_env(self):
2792 self._test_invalid_env(os.spawnve)
2793
2794 @requires_os_func('spawnvpe')
2795 def test_spawnvpe_invalid_env(self):
2796 self._test_invalid_env(os.spawnvpe)
2797
Serhiy Storchaka77703942017-06-25 07:33:01 +03002798
Brian Curtin0151b8e2010-09-24 13:43:43 +00002799# The introduction of this TestCase caused at least two different errors on
2800# *nix buildbots. Temporarily skip this to let the buildbots move along.
2801@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002802@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2803class LoginTests(unittest.TestCase):
2804 def test_getlogin(self):
2805 user_name = os.getlogin()
2806 self.assertNotEqual(len(user_name), 0)
2807
2808
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002809@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2810 "needs os.getpriority and os.setpriority")
2811class ProgramPriorityTests(unittest.TestCase):
2812 """Tests for os.getpriority() and os.setpriority()."""
2813
2814 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002815
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002816 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2817 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2818 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002819 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2820 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002821 raise unittest.SkipTest("unable to reliably test setpriority "
2822 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002823 else:
2824 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002825 finally:
2826 try:
2827 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2828 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002829 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002830 raise
2831
2832
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002833class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002834
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002836
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002837 def __init__(self, conn):
2838 asynchat.async_chat.__init__(self, conn)
2839 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002840 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002841 self.closed = False
2842 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002843
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844 def handle_read(self):
2845 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002846 if self.accumulate:
2847 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002848
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002849 def get_data(self):
2850 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002851
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002852 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002853 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002854 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002855
2856 def handle_error(self):
2857 raise
2858
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002859 def __init__(self, address):
2860 threading.Thread.__init__(self)
2861 asyncore.dispatcher.__init__(self)
2862 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2863 self.bind(address)
2864 self.listen(5)
2865 self.host, self.port = self.socket.getsockname()[:2]
2866 self.handler_instance = None
2867 self._active = False
2868 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002869
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002870 # --- public API
2871
2872 @property
2873 def running(self):
2874 return self._active
2875
2876 def start(self):
2877 assert not self.running
2878 self.__flag = threading.Event()
2879 threading.Thread.start(self)
2880 self.__flag.wait()
2881
2882 def stop(self):
2883 assert self.running
2884 self._active = False
2885 self.join()
2886
2887 def wait(self):
2888 # wait for handler connection to be closed, then stop the server
2889 while not getattr(self.handler_instance, "closed", False):
2890 time.sleep(0.001)
2891 self.stop()
2892
2893 # --- internals
2894
2895 def run(self):
2896 self._active = True
2897 self.__flag.set()
2898 while self._active and asyncore.socket_map:
2899 self._active_lock.acquire()
2900 asyncore.loop(timeout=0.001, count=1)
2901 self._active_lock.release()
2902 asyncore.close_all()
2903
2904 def handle_accept(self):
2905 conn, addr = self.accept()
2906 self.handler_instance = self.Handler(conn)
2907
2908 def handle_connect(self):
2909 self.close()
2910 handle_read = handle_connect
2911
2912 def writable(self):
2913 return 0
2914
2915 def handle_error(self):
2916 raise
2917
2918
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002919@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2920class TestSendfile(unittest.TestCase):
2921
Victor Stinner8c663fd2017-11-08 14:44:44 -08002922 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002923 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002924 not sys.platform.startswith("solaris") and \
2925 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002926 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2927 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002928 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2929 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002930
2931 @classmethod
2932 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002933 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002934 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002935
2936 @classmethod
2937 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002938 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002939 support.unlink(support.TESTFN)
2940
2941 def setUp(self):
2942 self.server = SendfileTestServer((support.HOST, 0))
2943 self.server.start()
2944 self.client = socket.socket()
2945 self.client.connect((self.server.host, self.server.port))
2946 self.client.settimeout(1)
2947 # synchronize by waiting for "220 ready" response
2948 self.client.recv(1024)
2949 self.sockno = self.client.fileno()
2950 self.file = open(support.TESTFN, 'rb')
2951 self.fileno = self.file.fileno()
2952
2953 def tearDown(self):
2954 self.file.close()
2955 self.client.close()
2956 if self.server.running:
2957 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002958 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002959
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002960 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002961 """A higher level wrapper representing how an application is
2962 supposed to use sendfile().
2963 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002964 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002965 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002966 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002967 except OSError as err:
2968 if err.errno == errno.ECONNRESET:
2969 # disconnected
2970 raise
2971 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2972 # we have to retry send data
2973 continue
2974 else:
2975 raise
2976
2977 def test_send_whole_file(self):
2978 # normal send
2979 total_sent = 0
2980 offset = 0
2981 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002982 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002983 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2984 if sent == 0:
2985 break
2986 offset += sent
2987 total_sent += sent
2988 self.assertTrue(sent <= nbytes)
2989 self.assertEqual(offset, total_sent)
2990
2991 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002992 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002993 self.client.close()
2994 self.server.wait()
2995 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002996 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002997 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002998
2999 def test_send_at_certain_offset(self):
3000 # start sending a file at a certain offset
3001 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003002 offset = len(self.DATA) // 2
3003 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003004 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003005 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003006 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3007 if sent == 0:
3008 break
3009 offset += sent
3010 total_sent += sent
3011 self.assertTrue(sent <= nbytes)
3012
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003013 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003014 self.client.close()
3015 self.server.wait()
3016 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003017 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003018 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003019 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003020 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003021
3022 def test_offset_overflow(self):
3023 # specify an offset > file size
3024 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003025 try:
3026 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3027 except OSError as e:
3028 # Solaris can raise EINVAL if offset >= file length, ignore.
3029 if e.errno != errno.EINVAL:
3030 raise
3031 else:
3032 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003033 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003034 self.client.close()
3035 self.server.wait()
3036 data = self.server.handler_instance.get_data()
3037 self.assertEqual(data, b'')
3038
3039 def test_invalid_offset(self):
3040 with self.assertRaises(OSError) as cm:
3041 os.sendfile(self.sockno, self.fileno, -1, 4096)
3042 self.assertEqual(cm.exception.errno, errno.EINVAL)
3043
Martin Panterbf19d162015-09-09 01:01:13 +00003044 def test_keywords(self):
3045 # Keyword arguments should be supported
3046 os.sendfile(out=self.sockno, offset=0, count=4096,
3047 **{'in': self.fileno})
3048 if self.SUPPORT_HEADERS_TRAILERS:
3049 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003050 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003051
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003052 # --- headers / trailers tests
3053
Serhiy Storchaka43767632013-11-03 21:31:38 +02003054 @requires_headers_trailers
3055 def test_headers(self):
3056 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003057 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003058 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003059 headers=[b"x" * 512, b"y" * 256])
3060 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003061 total_sent += sent
3062 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003063 while total_sent < len(expected_data):
3064 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003065 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3066 offset, nbytes)
3067 if sent == 0:
3068 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003069 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003070 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003071 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003072
Serhiy Storchaka43767632013-11-03 21:31:38 +02003073 self.assertEqual(total_sent, len(expected_data))
3074 self.client.close()
3075 self.server.wait()
3076 data = self.server.handler_instance.get_data()
3077 self.assertEqual(hash(data), hash(expected_data))
3078
3079 @requires_headers_trailers
3080 def test_trailers(self):
3081 TESTFN2 = support.TESTFN + "2"
3082 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003083
3084 self.addCleanup(support.unlink, TESTFN2)
3085 create_file(TESTFN2, file_data)
3086
3087 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003088 os.sendfile(self.sockno, f.fileno(), 0, 5,
3089 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003090 self.client.close()
3091 self.server.wait()
3092 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003093 self.assertEqual(data, b"abcde123456789")
3094
3095 @requires_headers_trailers
3096 @requires_32b
3097 def test_headers_overflow_32bits(self):
3098 self.server.handler_instance.accumulate = False
3099 with self.assertRaises(OSError) as cm:
3100 os.sendfile(self.sockno, self.fileno, 0, 0,
3101 headers=[b"x" * 2**16] * 2**15)
3102 self.assertEqual(cm.exception.errno, errno.EINVAL)
3103
3104 @requires_headers_trailers
3105 @requires_32b
3106 def test_trailers_overflow_32bits(self):
3107 self.server.handler_instance.accumulate = False
3108 with self.assertRaises(OSError) as cm:
3109 os.sendfile(self.sockno, self.fileno, 0, 0,
3110 trailers=[b"x" * 2**16] * 2**15)
3111 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003112
Serhiy Storchaka43767632013-11-03 21:31:38 +02003113 @requires_headers_trailers
3114 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3115 'test needs os.SF_NODISKIO')
3116 def test_flags(self):
3117 try:
3118 os.sendfile(self.sockno, self.fileno, 0, 4096,
3119 flags=os.SF_NODISKIO)
3120 except OSError as err:
3121 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3122 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003123
3124
Larry Hastings9cf065c2012-06-22 16:30:09 -07003125def supports_extended_attributes():
3126 if not hasattr(os, "setxattr"):
3127 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003128
Larry Hastings9cf065c2012-06-22 16:30:09 -07003129 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003130 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003131 try:
3132 os.setxattr(fp.fileno(), b"user.test", b"")
3133 except OSError:
3134 return False
3135 finally:
3136 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003137
3138 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003139
3140
3141@unittest.skipUnless(supports_extended_attributes(),
3142 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003143# Kernels < 2.6.39 don't respect setxattr flags.
3144@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003145class ExtendedAttributeTests(unittest.TestCase):
3146
Larry Hastings9cf065c2012-06-22 16:30:09 -07003147 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003148 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003149 self.addCleanup(support.unlink, fn)
3150 create_file(fn)
3151
Benjamin Peterson799bd802011-08-31 22:15:17 -04003152 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003153 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003154 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003155
Victor Stinnerf12e5062011-10-16 22:12:03 +02003156 init_xattr = listxattr(fn)
3157 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003158
Larry Hastings9cf065c2012-06-22 16:30:09 -07003159 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003160 xattr = set(init_xattr)
3161 xattr.add("user.test")
3162 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003163 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3164 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3165 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003166
Benjamin Peterson799bd802011-08-31 22:15:17 -04003167 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003168 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003169 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003170
Benjamin Peterson799bd802011-08-31 22:15:17 -04003171 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003172 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003173 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003174
Larry Hastings9cf065c2012-06-22 16:30:09 -07003175 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003176 xattr.add("user.test2")
3177 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003178 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003179
Benjamin Peterson799bd802011-08-31 22:15:17 -04003180 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003181 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003182 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003183
Victor Stinnerf12e5062011-10-16 22:12:03 +02003184 xattr.remove("user.test")
3185 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003186 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3187 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3188 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3189 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003190 many = sorted("user.test{}".format(i) for i in range(100))
3191 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003192 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003193 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003194
Larry Hastings9cf065c2012-06-22 16:30:09 -07003195 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003196 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003197 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003198
3199 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3200 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003201
3202 def test_simple(self):
3203 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3204 os.listxattr)
3205
3206 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003207 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3208 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003209
3210 def test_fds(self):
3211 def getxattr(path, *args):
3212 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003213 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003214 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003215 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003216 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003217 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003218 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003219 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003220 def listxattr(path, *args):
3221 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003222 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003223 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3224
3225
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003226@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3227class TermsizeTests(unittest.TestCase):
3228 def test_does_not_crash(self):
3229 """Check if get_terminal_size() returns a meaningful value.
3230
3231 There's no easy portable way to actually check the size of the
3232 terminal, so let's check if it returns something sensible instead.
3233 """
3234 try:
3235 size = os.get_terminal_size()
3236 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003237 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003238 # Under win32 a generic OSError can be thrown if the
3239 # handle cannot be retrieved
3240 self.skipTest("failed to query terminal size")
3241 raise
3242
Antoine Pitroucfade362012-02-08 23:48:59 +01003243 self.assertGreaterEqual(size.columns, 0)
3244 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003245
3246 def test_stty_match(self):
3247 """Check if stty returns the same results
3248
3249 stty actually tests stdin, so get_terminal_size is invoked on
3250 stdin explicitly. If stty succeeded, then get_terminal_size()
3251 should work too.
3252 """
3253 try:
3254 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003255 except (FileNotFoundError, subprocess.CalledProcessError,
3256 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003257 self.skipTest("stty invocation failed")
3258 expected = (int(size[1]), int(size[0])) # reversed order
3259
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003260 try:
3261 actual = os.get_terminal_size(sys.__stdin__.fileno())
3262 except OSError as e:
3263 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3264 # Under win32 a generic OSError can be thrown if the
3265 # handle cannot be retrieved
3266 self.skipTest("failed to query terminal size")
3267 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003268 self.assertEqual(expected, actual)
3269
3270
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003271@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003272@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003273class MemfdCreateTests(unittest.TestCase):
3274 def test_memfd_create(self):
3275 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3276 self.assertNotEqual(fd, -1)
3277 self.addCleanup(os.close, fd)
3278 self.assertFalse(os.get_inheritable(fd))
3279 with open(fd, "wb", closefd=False) as f:
3280 f.write(b'memfd_create')
3281 self.assertEqual(f.tell(), 12)
3282
3283 fd2 = os.memfd_create("Hi")
3284 self.addCleanup(os.close, fd2)
3285 self.assertFalse(os.get_inheritable(fd2))
3286
3287
Victor Stinner292c8352012-10-30 02:17:38 +01003288class OSErrorTests(unittest.TestCase):
3289 def setUp(self):
3290 class Str(str):
3291 pass
3292
Victor Stinnerafe17062012-10-31 22:47:43 +01003293 self.bytes_filenames = []
3294 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003295 if support.TESTFN_UNENCODABLE is not None:
3296 decoded = support.TESTFN_UNENCODABLE
3297 else:
3298 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003299 self.unicode_filenames.append(decoded)
3300 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003301 if support.TESTFN_UNDECODABLE is not None:
3302 encoded = support.TESTFN_UNDECODABLE
3303 else:
3304 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003305 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003306 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003307 self.bytes_filenames.append(memoryview(encoded))
3308
3309 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003310
3311 def test_oserror_filename(self):
3312 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003313 (self.filenames, os.chdir,),
3314 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003315 (self.filenames, os.lstat,),
3316 (self.filenames, os.open, os.O_RDONLY),
3317 (self.filenames, os.rmdir,),
3318 (self.filenames, os.stat,),
3319 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003320 ]
3321 if sys.platform == "win32":
3322 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003323 (self.bytes_filenames, os.rename, b"dst"),
3324 (self.bytes_filenames, os.replace, b"dst"),
3325 (self.unicode_filenames, os.rename, "dst"),
3326 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003327 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003328 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003329 else:
3330 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003331 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003332 (self.filenames, os.rename, "dst"),
3333 (self.filenames, os.replace, "dst"),
3334 ))
3335 if hasattr(os, "chown"):
3336 funcs.append((self.filenames, os.chown, 0, 0))
3337 if hasattr(os, "lchown"):
3338 funcs.append((self.filenames, os.lchown, 0, 0))
3339 if hasattr(os, "truncate"):
3340 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003341 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003342 funcs.append((self.filenames, os.chflags, 0))
3343 if hasattr(os, "lchflags"):
3344 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003345 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003346 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003347 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003348 if sys.platform == "win32":
3349 funcs.append((self.bytes_filenames, os.link, b"dst"))
3350 funcs.append((self.unicode_filenames, os.link, "dst"))
3351 else:
3352 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003353 if hasattr(os, "listxattr"):
3354 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003355 (self.filenames, os.listxattr,),
3356 (self.filenames, os.getxattr, "user.test"),
3357 (self.filenames, os.setxattr, "user.test", b'user'),
3358 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003359 ))
3360 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003361 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003362 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003363 if sys.platform == "win32":
3364 funcs.append((self.unicode_filenames, os.readlink,))
3365 else:
3366 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003367
Steve Dowercc16be82016-09-08 10:35:16 -07003368
Victor Stinnerafe17062012-10-31 22:47:43 +01003369 for filenames, func, *func_args in funcs:
3370 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003371 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003372 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003373 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003374 else:
3375 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3376 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003377 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003378 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003379 except UnicodeDecodeError:
3380 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003381 else:
3382 self.fail("No exception thrown by {}".format(func))
3383
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003384class CPUCountTests(unittest.TestCase):
3385 def test_cpu_count(self):
3386 cpus = os.cpu_count()
3387 if cpus is not None:
3388 self.assertIsInstance(cpus, int)
3389 self.assertGreater(cpus, 0)
3390 else:
3391 self.skipTest("Could not determine the number of CPUs")
3392
Victor Stinnerdaf45552013-08-28 00:53:59 +02003393
3394class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003395 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003396 fd = os.open(__file__, os.O_RDONLY)
3397 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003398 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003399
Victor Stinnerdaf45552013-08-28 00:53:59 +02003400 os.set_inheritable(fd, True)
3401 self.assertEqual(os.get_inheritable(fd), True)
3402
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003403 @unittest.skipIf(fcntl is None, "need fcntl")
3404 def test_get_inheritable_cloexec(self):
3405 fd = os.open(__file__, os.O_RDONLY)
3406 self.addCleanup(os.close, fd)
3407 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003408
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003409 # clear FD_CLOEXEC flag
3410 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3411 flags &= ~fcntl.FD_CLOEXEC
3412 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003413
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003414 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003415
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003416 @unittest.skipIf(fcntl is None, "need fcntl")
3417 def test_set_inheritable_cloexec(self):
3418 fd = os.open(__file__, os.O_RDONLY)
3419 self.addCleanup(os.close, fd)
3420 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3421 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003422
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003423 os.set_inheritable(fd, True)
3424 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3425 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003426
Victor Stinnerdaf45552013-08-28 00:53:59 +02003427 def test_open(self):
3428 fd = os.open(__file__, os.O_RDONLY)
3429 self.addCleanup(os.close, fd)
3430 self.assertEqual(os.get_inheritable(fd), False)
3431
3432 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3433 def test_pipe(self):
3434 rfd, wfd = os.pipe()
3435 self.addCleanup(os.close, rfd)
3436 self.addCleanup(os.close, wfd)
3437 self.assertEqual(os.get_inheritable(rfd), False)
3438 self.assertEqual(os.get_inheritable(wfd), False)
3439
3440 def test_dup(self):
3441 fd1 = os.open(__file__, os.O_RDONLY)
3442 self.addCleanup(os.close, fd1)
3443
3444 fd2 = os.dup(fd1)
3445 self.addCleanup(os.close, fd2)
3446 self.assertEqual(os.get_inheritable(fd2), False)
3447
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003448 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3449 def test_dup_nul(self):
3450 # os.dup() was creating inheritable fds for character files.
3451 fd1 = os.open('NUL', os.O_RDONLY)
3452 self.addCleanup(os.close, fd1)
3453 fd2 = os.dup(fd1)
3454 self.addCleanup(os.close, fd2)
3455 self.assertFalse(os.get_inheritable(fd2))
3456
Victor Stinnerdaf45552013-08-28 00:53:59 +02003457 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3458 def test_dup2(self):
3459 fd = os.open(__file__, os.O_RDONLY)
3460 self.addCleanup(os.close, fd)
3461
3462 # inheritable by default
3463 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003464 self.addCleanup(os.close, fd2)
3465 self.assertEqual(os.dup2(fd, fd2), fd2)
3466 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003467
3468 # force non-inheritable
3469 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003470 self.addCleanup(os.close, fd3)
3471 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3472 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003473
3474 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3475 def test_openpty(self):
3476 master_fd, slave_fd = os.openpty()
3477 self.addCleanup(os.close, master_fd)
3478 self.addCleanup(os.close, slave_fd)
3479 self.assertEqual(os.get_inheritable(master_fd), False)
3480 self.assertEqual(os.get_inheritable(slave_fd), False)
3481
3482
Brett Cannon3f9183b2016-08-26 14:44:48 -07003483class PathTConverterTests(unittest.TestCase):
3484 # tuples of (function name, allows fd arguments, additional arguments to
3485 # function, cleanup function)
3486 functions = [
3487 ('stat', True, (), None),
3488 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003489 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003490 ('chflags', False, (0,), None),
3491 ('lchflags', False, (0,), None),
3492 ('open', False, (0,), getattr(os, 'close', None)),
3493 ]
3494
3495 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003496 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003497 if os.name == 'nt':
3498 bytes_fspath = bytes_filename = None
3499 else:
3500 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003501 bytes_fspath = FakePath(bytes_filename)
3502 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003503 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003504 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003505
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003506 int_fspath = FakePath(fd)
3507 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003508
3509 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3510 with self.subTest(name=name):
3511 try:
3512 fn = getattr(os, name)
3513 except AttributeError:
3514 continue
3515
Brett Cannon8f96a302016-08-26 19:30:11 -07003516 for path in (str_filename, bytes_filename, str_fspath,
3517 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003518 if path is None:
3519 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003520 with self.subTest(name=name, path=path):
3521 result = fn(path, *extra_args)
3522 if cleanup_fn is not None:
3523 cleanup_fn(result)
3524
3525 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003526 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003527 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003528
3529 if allow_fd:
3530 result = fn(fd, *extra_args) # should not fail
3531 if cleanup_fn is not None:
3532 cleanup_fn(result)
3533 else:
3534 with self.assertRaisesRegex(
3535 TypeError,
3536 'os.PathLike'):
3537 fn(fd, *extra_args)
3538
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003539 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003540 msg = r'__fspath__\(\) to return str or bytes, not %s'
3541 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003542 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003543 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003544 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003545 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003546 os.stat(FakePath(object()))
3547
Brett Cannon3f9183b2016-08-26 14:44:48 -07003548
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003549@unittest.skipUnless(hasattr(os, 'get_blocking'),
3550 'needs os.get_blocking() and os.set_blocking()')
3551class BlockingTests(unittest.TestCase):
3552 def test_blocking(self):
3553 fd = os.open(__file__, os.O_RDONLY)
3554 self.addCleanup(os.close, fd)
3555 self.assertEqual(os.get_blocking(fd), True)
3556
3557 os.set_blocking(fd, False)
3558 self.assertEqual(os.get_blocking(fd), False)
3559
3560 os.set_blocking(fd, True)
3561 self.assertEqual(os.get_blocking(fd), True)
3562
3563
Yury Selivanov97e2e062014-09-26 12:33:06 -04003564
3565class ExportsTests(unittest.TestCase):
3566 def test_os_all(self):
3567 self.assertIn('open', os.__all__)
3568 self.assertIn('walk', os.__all__)
3569
3570
Victor Stinner6036e442015-03-08 01:58:04 +01003571class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003572 check_no_resource_warning = support.check_no_resource_warning
3573
Victor Stinner6036e442015-03-08 01:58:04 +01003574 def setUp(self):
3575 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003576 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003577 self.addCleanup(support.rmtree, self.path)
3578 os.mkdir(self.path)
3579
3580 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003581 path = self.bytes_path if isinstance(name, bytes) else self.path
3582 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003583 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003584 return filename
3585
3586 def get_entries(self, names):
3587 entries = dict((entry.name, entry)
3588 for entry in os.scandir(self.path))
3589 self.assertEqual(sorted(entries.keys()), names)
3590 return entries
3591
3592 def assert_stat_equal(self, stat1, stat2, skip_fields):
3593 if skip_fields:
3594 for attr in dir(stat1):
3595 if not attr.startswith("st_"):
3596 continue
3597 if attr in ("st_dev", "st_ino", "st_nlink"):
3598 continue
3599 self.assertEqual(getattr(stat1, attr),
3600 getattr(stat2, attr),
3601 (stat1, stat2, attr))
3602 else:
3603 self.assertEqual(stat1, stat2)
3604
3605 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003606 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003607 self.assertEqual(entry.name, name)
3608 self.assertEqual(entry.path, os.path.join(self.path, name))
3609 self.assertEqual(entry.inode(),
3610 os.stat(entry.path, follow_symlinks=False).st_ino)
3611
3612 entry_stat = os.stat(entry.path)
3613 self.assertEqual(entry.is_dir(),
3614 stat.S_ISDIR(entry_stat.st_mode))
3615 self.assertEqual(entry.is_file(),
3616 stat.S_ISREG(entry_stat.st_mode))
3617 self.assertEqual(entry.is_symlink(),
3618 os.path.islink(entry.path))
3619
3620 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3621 self.assertEqual(entry.is_dir(follow_symlinks=False),
3622 stat.S_ISDIR(entry_lstat.st_mode))
3623 self.assertEqual(entry.is_file(follow_symlinks=False),
3624 stat.S_ISREG(entry_lstat.st_mode))
3625
3626 self.assert_stat_equal(entry.stat(),
3627 entry_stat,
3628 os.name == 'nt' and not is_symlink)
3629 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3630 entry_lstat,
3631 os.name == 'nt')
3632
3633 def test_attributes(self):
3634 link = hasattr(os, 'link')
3635 symlink = support.can_symlink()
3636
3637 dirname = os.path.join(self.path, "dir")
3638 os.mkdir(dirname)
3639 filename = self.create_file("file.txt")
3640 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003641 try:
3642 os.link(filename, os.path.join(self.path, "link_file.txt"))
3643 except PermissionError as e:
3644 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003645 if symlink:
3646 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3647 target_is_directory=True)
3648 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3649
3650 names = ['dir', 'file.txt']
3651 if link:
3652 names.append('link_file.txt')
3653 if symlink:
3654 names.extend(('symlink_dir', 'symlink_file.txt'))
3655 entries = self.get_entries(names)
3656
3657 entry = entries['dir']
3658 self.check_entry(entry, 'dir', True, False, False)
3659
3660 entry = entries['file.txt']
3661 self.check_entry(entry, 'file.txt', False, True, False)
3662
3663 if link:
3664 entry = entries['link_file.txt']
3665 self.check_entry(entry, 'link_file.txt', False, True, False)
3666
3667 if symlink:
3668 entry = entries['symlink_dir']
3669 self.check_entry(entry, 'symlink_dir', True, False, True)
3670
3671 entry = entries['symlink_file.txt']
3672 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3673
3674 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003675 path = self.bytes_path if isinstance(name, bytes) else self.path
3676 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003677 self.assertEqual(len(entries), 1)
3678
3679 entry = entries[0]
3680 self.assertEqual(entry.name, name)
3681 return entry
3682
Brett Cannon96881cd2016-06-10 14:37:21 -07003683 def create_file_entry(self, name='file.txt'):
3684 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003685 return self.get_entry(os.path.basename(filename))
3686
3687 def test_current_directory(self):
3688 filename = self.create_file()
3689 old_dir = os.getcwd()
3690 try:
3691 os.chdir(self.path)
3692
3693 # call scandir() without parameter: it must list the content
3694 # of the current directory
3695 entries = dict((entry.name, entry) for entry in os.scandir())
3696 self.assertEqual(sorted(entries.keys()),
3697 [os.path.basename(filename)])
3698 finally:
3699 os.chdir(old_dir)
3700
3701 def test_repr(self):
3702 entry = self.create_file_entry()
3703 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3704
Brett Cannon96881cd2016-06-10 14:37:21 -07003705 def test_fspath_protocol(self):
3706 entry = self.create_file_entry()
3707 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3708
3709 def test_fspath_protocol_bytes(self):
3710 bytes_filename = os.fsencode('bytesfile.txt')
3711 bytes_entry = self.create_file_entry(name=bytes_filename)
3712 fspath = os.fspath(bytes_entry)
3713 self.assertIsInstance(fspath, bytes)
3714 self.assertEqual(fspath,
3715 os.path.join(os.fsencode(self.path),bytes_filename))
3716
Victor Stinner6036e442015-03-08 01:58:04 +01003717 def test_removed_dir(self):
3718 path = os.path.join(self.path, 'dir')
3719
3720 os.mkdir(path)
3721 entry = self.get_entry('dir')
3722 os.rmdir(path)
3723
3724 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3725 if os.name == 'nt':
3726 self.assertTrue(entry.is_dir())
3727 self.assertFalse(entry.is_file())
3728 self.assertFalse(entry.is_symlink())
3729 if os.name == 'nt':
3730 self.assertRaises(FileNotFoundError, entry.inode)
3731 # don't fail
3732 entry.stat()
3733 entry.stat(follow_symlinks=False)
3734 else:
3735 self.assertGreater(entry.inode(), 0)
3736 self.assertRaises(FileNotFoundError, entry.stat)
3737 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3738
3739 def test_removed_file(self):
3740 entry = self.create_file_entry()
3741 os.unlink(entry.path)
3742
3743 self.assertFalse(entry.is_dir())
3744 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3745 if os.name == 'nt':
3746 self.assertTrue(entry.is_file())
3747 self.assertFalse(entry.is_symlink())
3748 if os.name == 'nt':
3749 self.assertRaises(FileNotFoundError, entry.inode)
3750 # don't fail
3751 entry.stat()
3752 entry.stat(follow_symlinks=False)
3753 else:
3754 self.assertGreater(entry.inode(), 0)
3755 self.assertRaises(FileNotFoundError, entry.stat)
3756 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3757
3758 def test_broken_symlink(self):
3759 if not support.can_symlink():
3760 return self.skipTest('cannot create symbolic link')
3761
3762 filename = self.create_file("file.txt")
3763 os.symlink(filename,
3764 os.path.join(self.path, "symlink.txt"))
3765 entries = self.get_entries(['file.txt', 'symlink.txt'])
3766 entry = entries['symlink.txt']
3767 os.unlink(filename)
3768
3769 self.assertGreater(entry.inode(), 0)
3770 self.assertFalse(entry.is_dir())
3771 self.assertFalse(entry.is_file()) # broken symlink returns False
3772 self.assertFalse(entry.is_dir(follow_symlinks=False))
3773 self.assertFalse(entry.is_file(follow_symlinks=False))
3774 self.assertTrue(entry.is_symlink())
3775 self.assertRaises(FileNotFoundError, entry.stat)
3776 # don't fail
3777 entry.stat(follow_symlinks=False)
3778
3779 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003780 self.create_file("file.txt")
3781
3782 path_bytes = os.fsencode(self.path)
3783 entries = list(os.scandir(path_bytes))
3784 self.assertEqual(len(entries), 1, entries)
3785 entry = entries[0]
3786
3787 self.assertEqual(entry.name, b'file.txt')
3788 self.assertEqual(entry.path,
3789 os.fsencode(os.path.join(self.path, 'file.txt')))
3790
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003791 def test_bytes_like(self):
3792 self.create_file("file.txt")
3793
3794 for cls in bytearray, memoryview:
3795 path_bytes = cls(os.fsencode(self.path))
3796 with self.assertWarns(DeprecationWarning):
3797 entries = list(os.scandir(path_bytes))
3798 self.assertEqual(len(entries), 1, entries)
3799 entry = entries[0]
3800
3801 self.assertEqual(entry.name, b'file.txt')
3802 self.assertEqual(entry.path,
3803 os.fsencode(os.path.join(self.path, 'file.txt')))
3804 self.assertIs(type(entry.name), bytes)
3805 self.assertIs(type(entry.path), bytes)
3806
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003807 @unittest.skipUnless(os.listdir in os.supports_fd,
3808 'fd support for listdir required for this test.')
3809 def test_fd(self):
3810 self.assertIn(os.scandir, os.supports_fd)
3811 self.create_file('file.txt')
3812 expected_names = ['file.txt']
3813 if support.can_symlink():
3814 os.symlink('file.txt', os.path.join(self.path, 'link'))
3815 expected_names.append('link')
3816
3817 fd = os.open(self.path, os.O_RDONLY)
3818 try:
3819 with os.scandir(fd) as it:
3820 entries = list(it)
3821 names = [entry.name for entry in entries]
3822 self.assertEqual(sorted(names), expected_names)
3823 self.assertEqual(names, os.listdir(fd))
3824 for entry in entries:
3825 self.assertEqual(entry.path, entry.name)
3826 self.assertEqual(os.fspath(entry), entry.name)
3827 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3828 if os.stat in os.supports_dir_fd:
3829 st = os.stat(entry.name, dir_fd=fd)
3830 self.assertEqual(entry.stat(), st)
3831 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3832 self.assertEqual(entry.stat(follow_symlinks=False), st)
3833 finally:
3834 os.close(fd)
3835
Victor Stinner6036e442015-03-08 01:58:04 +01003836 def test_empty_path(self):
3837 self.assertRaises(FileNotFoundError, os.scandir, '')
3838
3839 def test_consume_iterator_twice(self):
3840 self.create_file("file.txt")
3841 iterator = os.scandir(self.path)
3842
3843 entries = list(iterator)
3844 self.assertEqual(len(entries), 1, entries)
3845
3846 # check than consuming the iterator twice doesn't raise exception
3847 entries2 = list(iterator)
3848 self.assertEqual(len(entries2), 0, entries2)
3849
3850 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003851 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003852 self.assertRaises(TypeError, os.scandir, obj)
3853
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003854 def test_close(self):
3855 self.create_file("file.txt")
3856 self.create_file("file2.txt")
3857 iterator = os.scandir(self.path)
3858 next(iterator)
3859 iterator.close()
3860 # multiple closes
3861 iterator.close()
3862 with self.check_no_resource_warning():
3863 del iterator
3864
3865 def test_context_manager(self):
3866 self.create_file("file.txt")
3867 self.create_file("file2.txt")
3868 with os.scandir(self.path) as iterator:
3869 next(iterator)
3870 with self.check_no_resource_warning():
3871 del iterator
3872
3873 def test_context_manager_close(self):
3874 self.create_file("file.txt")
3875 self.create_file("file2.txt")
3876 with os.scandir(self.path) as iterator:
3877 next(iterator)
3878 iterator.close()
3879
3880 def test_context_manager_exception(self):
3881 self.create_file("file.txt")
3882 self.create_file("file2.txt")
3883 with self.assertRaises(ZeroDivisionError):
3884 with os.scandir(self.path) as iterator:
3885 next(iterator)
3886 1/0
3887 with self.check_no_resource_warning():
3888 del iterator
3889
3890 def test_resource_warning(self):
3891 self.create_file("file.txt")
3892 self.create_file("file2.txt")
3893 iterator = os.scandir(self.path)
3894 next(iterator)
3895 with self.assertWarns(ResourceWarning):
3896 del iterator
3897 support.gc_collect()
3898 # exhausted iterator
3899 iterator = os.scandir(self.path)
3900 list(iterator)
3901 with self.check_no_resource_warning():
3902 del iterator
3903
Victor Stinner6036e442015-03-08 01:58:04 +01003904
Ethan Furmancdc08792016-06-02 15:06:09 -07003905class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003906
3907 # Abstracted so it can be overridden to test pure Python implementation
3908 # if a C version is provided.
3909 fspath = staticmethod(os.fspath)
3910
Ethan Furmancdc08792016-06-02 15:06:09 -07003911 def test_return_bytes(self):
3912 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003913 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003914
3915 def test_return_string(self):
3916 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003917 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003918
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003919 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003920 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003921 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003922
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003923 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003924 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3925 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3926
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003927 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003928 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3929 self.assertTrue(issubclass(FakePath, os.PathLike))
3930 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003931
Ethan Furmancdc08792016-06-02 15:06:09 -07003932 def test_garbage_in_exception_out(self):
3933 vapor = type('blah', (), {})
3934 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003935 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003936
3937 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003938 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003939
Brett Cannon044283a2016-07-15 10:41:49 -07003940 def test_bad_pathlike(self):
3941 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003942 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003943 # __fspath__ attribute that is not callable.
3944 c = type('foo', (), {})
3945 c.__fspath__ = 1
3946 self.assertRaises(TypeError, self.fspath, c())
3947 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003948 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003949 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003950
Victor Stinnerc29b5852017-11-02 07:28:27 -07003951
3952class TimesTests(unittest.TestCase):
3953 def test_times(self):
3954 times = os.times()
3955 self.assertIsInstance(times, os.times_result)
3956
3957 for field in ('user', 'system', 'children_user', 'children_system',
3958 'elapsed'):
3959 value = getattr(times, field)
3960 self.assertIsInstance(value, float)
3961
3962 if os.name == 'nt':
3963 self.assertEqual(times.children_user, 0)
3964 self.assertEqual(times.children_system, 0)
3965 self.assertEqual(times.elapsed, 0)
3966
3967
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003968# Only test if the C version is provided, otherwise TestPEP519 already tested
3969# the pure Python implementation.
3970if hasattr(os, "_fspath"):
3971 class TestPEP519PurePython(TestPEP519):
3972
3973 """Explicitly test the pure Python implementation of os.fspath()."""
3974
3975 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003976
3977
Fred Drake2e2be372001-09-20 21:33:42 +00003978if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003979 unittest.main()