blob: 74aef47243428470ea29b42320c1b3231a14adac [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070028import types
Victor Stinner47aacc82015-06-12 17:26:23 +020029import unittest
30import uuid
31import warnings
32from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070033from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020034
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import pwd
49 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010050except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050051 all_users = []
52try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020053 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020054except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020055 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020056
Berker Peksagce643912015-05-06 06:33:17 +030057from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020058from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000059
Victor Stinner923590e2016-03-24 09:11:48 +010060
R David Murrayf2ad1732014-12-25 18:36:56 -050061root_in_posix = False
62if hasattr(os, 'geteuid'):
63 root_in_posix = (os.geteuid() == 0)
64
Mark Dickinson7cf03892010-04-16 13:45:35 +000065# Detect whether we're on a Linux system that uses the (now outdated
66# and unmaintained) linuxthreads threading library. There's an issue
67# when combining linuxthreads with a failed execv call: see
68# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020069if hasattr(sys, 'thread_info') and sys.thread_info.version:
70 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
71else:
72 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000073
Stefan Krahebee49a2013-01-17 15:31:00 +010074# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
75HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
76
Victor Stinner923590e2016-03-24 09:11:48 +010077
Berker Peksag4af23d72016-09-15 20:32:44 +030078def requires_os_func(name):
79 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
80
81
Victor Stinnerae39d232016-03-24 17:12:55 +010082def create_file(filename, content=b'content'):
83 with open(filename, "xb", 0) as fp:
84 fp.write(content)
85
86
Victor Stinner689830e2019-06-26 17:31:12 +020087class MiscTests(unittest.TestCase):
88 def test_getcwd(self):
89 cwd = os.getcwd()
90 self.assertIsInstance(cwd, str)
91
Victor Stinnerec3e20a2019-06-28 18:01:59 +020092 def test_getcwd_long_path(self):
93 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
94 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
95 # longer path if longer paths support is enabled. Internally, the os
96 # module uses MAXPATHLEN which is at least 1024.
97 #
98 # Use a directory name of 200 characters to fit into Windows MAX_PATH
99 # limit.
100 #
101 # On Windows, the test can stop when trying to create a path longer
102 # than MAX_PATH if long paths support is disabled:
103 # see RtlAreLongPathsEnabled().
104 min_len = 2000 # characters
105 dirlen = 200 # characters
106 dirname = 'python_test_dir_'
107 dirname = dirname + ('a' * (dirlen - len(dirname)))
108
109 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200110 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200111 expected = path
112
113 while True:
114 cwd = os.getcwd()
115 self.assertEqual(cwd, expected)
116
117 need = min_len - (len(cwd) + len(os.path.sep))
118 if need <= 0:
119 break
120 if len(dirname) > need and need > 0:
121 dirname = dirname[:need]
122
123 path = os.path.join(path, dirname)
124 try:
125 os.mkdir(path)
126 # On Windows, chdir() can fail
127 # even if mkdir() succeeded
128 os.chdir(path)
129 except FileNotFoundError:
130 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
131 # ERROR_FILENAME_EXCED_RANGE (206) errors
132 # ("The filename or extension is too long")
133 break
134 except OSError as exc:
135 if exc.errno == errno.ENAMETOOLONG:
136 break
137 else:
138 raise
139
140 expected = path
141
142 if support.verbose:
143 print(f"Tested current directory length: {len(cwd)}")
144
Victor Stinner689830e2019-06-26 17:31:12 +0200145 def test_getcwdb(self):
146 cwd = os.getcwdb()
147 self.assertIsInstance(cwd, bytes)
148 self.assertEqual(os.fsdecode(cwd), os.getcwd())
149
150
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000151# Tests creating TESTFN
152class FileTests(unittest.TestCase):
153 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000154 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000155 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000156 tearDown = setUp
157
158 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000159 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000160 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000161 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000162
Christian Heimesfdab48e2008-01-20 09:06:41 +0000163 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000164 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
165 # We must allocate two consecutive file descriptors, otherwise
166 # it will mess up other file descriptors (perhaps even the three
167 # standard ones).
168 second = os.dup(first)
169 try:
170 retries = 0
171 while second != first + 1:
172 os.close(first)
173 retries += 1
174 if retries > 10:
175 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000176 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000177 first, second = second, os.dup(second)
178 finally:
179 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000180 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000181 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000182 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000183
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000184 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000185 def test_rename(self):
186 path = support.TESTFN
187 old = sys.getrefcount(path)
188 self.assertRaises(TypeError, os.rename, path, 0)
189 new = sys.getrefcount(path)
190 self.assertEqual(old, new)
191
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000192 def test_read(self):
193 with open(support.TESTFN, "w+b") as fobj:
194 fobj.write(b"spam")
195 fobj.flush()
196 fd = fobj.fileno()
197 os.lseek(fd, 0, 0)
198 s = os.read(fd, 4)
199 self.assertEqual(type(s), bytes)
200 self.assertEqual(s, b"spam")
201
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200202 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200203 # Skip the test on 32-bit platforms: the number of bytes must fit in a
204 # Py_ssize_t type
205 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
206 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200207 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
208 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200209 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100210 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200211
212 # Issue #21932: Make sure that os.read() does not raise an
213 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200214 with open(support.TESTFN, "rb") as fp:
215 data = os.read(fp.fileno(), size)
216
Victor Stinner8c663fd2017-11-08 14:44:44 -0800217 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200218 # operating system is free to return less bytes than requested.
219 self.assertEqual(data, b'test')
220
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000221 def test_write(self):
222 # os.write() accepts bytes- and buffer-like objects but not strings
223 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
224 self.assertRaises(TypeError, os.write, fd, "beans")
225 os.write(fd, b"bacon\n")
226 os.write(fd, bytearray(b"eggs\n"))
227 os.write(fd, memoryview(b"spam\n"))
228 os.close(fd)
229 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000230 self.assertEqual(fobj.read().splitlines(),
231 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000232
Victor Stinnere0daff12011-03-20 23:36:35 +0100233 def write_windows_console(self, *args):
234 retcode = subprocess.call(args,
235 # use a new console to not flood the test output
236 creationflags=subprocess.CREATE_NEW_CONSOLE,
237 # use a shell to hide the console window (SW_HIDE)
238 shell=True)
239 self.assertEqual(retcode, 0)
240
241 @unittest.skipUnless(sys.platform == 'win32',
242 'test specific to the Windows console')
243 def test_write_windows_console(self):
244 # Issue #11395: the Windows console returns an error (12: not enough
245 # space error) on writing into stdout if stdout mode is binary and the
246 # length is greater than 66,000 bytes (or less, depending on heap
247 # usage).
248 code = "print('x' * 100000)"
249 self.write_windows_console(sys.executable, "-c", code)
250 self.write_windows_console(sys.executable, "-u", "-c", code)
251
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000252 def fdopen_helper(self, *args):
253 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200254 f = os.fdopen(fd, *args)
255 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000256
257 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200258 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
259 os.close(fd)
260
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000261 self.fdopen_helper()
262 self.fdopen_helper('r')
263 self.fdopen_helper('r', 100)
264
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100265 def test_replace(self):
266 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100267 self.addCleanup(support.unlink, support.TESTFN)
268 self.addCleanup(support.unlink, TESTFN2)
269
270 create_file(support.TESTFN, b"1")
271 create_file(TESTFN2, b"2")
272
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100273 os.replace(support.TESTFN, TESTFN2)
274 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
275 with open(TESTFN2, 'r') as f:
276 self.assertEqual(f.read(), "1")
277
Martin Panterbf19d162015-09-09 01:01:13 +0000278 def test_open_keywords(self):
279 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
280 dir_fd=None)
281 os.close(f)
282
283 def test_symlink_keywords(self):
284 symlink = support.get_attribute(os, "symlink")
285 try:
286 symlink(src='target', dst=support.TESTFN,
287 target_is_directory=False, dir_fd=None)
288 except (NotImplementedError, OSError):
289 pass # No OS support or unprivileged user
290
Pablo Galindoaac4d032019-05-31 19:39:47 +0100291 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
292 def test_copy_file_range_invalid_values(self):
293 with self.assertRaises(ValueError):
294 os.copy_file_range(0, 1, -10)
295
296 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
297 def test_copy_file_range(self):
298 TESTFN2 = support.TESTFN + ".3"
299 data = b'0123456789'
300
301 create_file(support.TESTFN, data)
302 self.addCleanup(support.unlink, support.TESTFN)
303
304 in_file = open(support.TESTFN, 'rb')
305 self.addCleanup(in_file.close)
306 in_fd = in_file.fileno()
307
308 out_file = open(TESTFN2, 'w+b')
309 self.addCleanup(support.unlink, TESTFN2)
310 self.addCleanup(out_file.close)
311 out_fd = out_file.fileno()
312
313 try:
314 i = os.copy_file_range(in_fd, out_fd, 5)
315 except OSError as e:
316 # Handle the case in which Python was compiled
317 # in a system with the syscall but without support
318 # in the kernel.
319 if e.errno != errno.ENOSYS:
320 raise
321 self.skipTest(e)
322 else:
323 # The number of copied bytes can be less than
324 # the number of bytes originally requested.
325 self.assertIn(i, range(0, 6));
326
327 with open(TESTFN2, 'rb') as in_file:
328 self.assertEqual(in_file.read(), data[:i])
329
330 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
331 def test_copy_file_range_offset(self):
332 TESTFN4 = support.TESTFN + ".4"
333 data = b'0123456789'
334 bytes_to_copy = 6
335 in_skip = 3
336 out_seek = 5
337
338 create_file(support.TESTFN, data)
339 self.addCleanup(support.unlink, support.TESTFN)
340
341 in_file = open(support.TESTFN, 'rb')
342 self.addCleanup(in_file.close)
343 in_fd = in_file.fileno()
344
345 out_file = open(TESTFN4, 'w+b')
346 self.addCleanup(support.unlink, TESTFN4)
347 self.addCleanup(out_file.close)
348 out_fd = out_file.fileno()
349
350 try:
351 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
352 offset_src=in_skip,
353 offset_dst=out_seek)
354 except OSError as e:
355 # Handle the case in which Python was compiled
356 # in a system with the syscall but without support
357 # in the kernel.
358 if e.errno != errno.ENOSYS:
359 raise
360 self.skipTest(e)
361 else:
362 # The number of copied bytes can be less than
363 # the number of bytes originally requested.
364 self.assertIn(i, range(0, bytes_to_copy+1));
365
366 with open(TESTFN4, 'rb') as in_file:
367 read = in_file.read()
368 # seeked bytes (5) are zero'ed
369 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
370 # 012 are skipped (in_skip)
371 # 345678 are copied in the file (in_skip + bytes_to_copy)
372 self.assertEqual(read[out_seek:],
373 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200374
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000375# Test attributes on return values from os.*stat* family.
376class StatAttributeTests(unittest.TestCase):
377 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200378 self.fname = support.TESTFN
379 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100380 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000383 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000384
385 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000386 self.assertEqual(result[stat.ST_SIZE], 3)
387 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 # Make sure all the attributes are there
390 members = dir(result)
391 for name in dir(stat):
392 if name[:3] == 'ST_':
393 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000394 if name.endswith("TIME"):
395 def trunc(x): return int(x)
396 else:
397 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000398 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000399 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000400 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000401
Larry Hastings6fe20b32012-04-19 15:07:49 -0700402 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700403 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700404 for name in 'st_atime st_mtime st_ctime'.split():
405 floaty = int(getattr(result, name) * 100000)
406 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700407 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700408
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000409 try:
410 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200411 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000412 except IndexError:
413 pass
414
415 # Make sure that assignment fails
416 try:
417 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200418 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000419 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000420 pass
421
422 try:
423 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200424 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000425 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000426 pass
427
428 try:
429 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200430 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000431 except AttributeError:
432 pass
433
434 # Use the stat_result constructor with a too-short tuple.
435 try:
436 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200437 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000438 except TypeError:
439 pass
440
Ezio Melotti42da6632011-03-15 05:18:48 +0200441 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000442 try:
443 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
444 except TypeError:
445 pass
446
Antoine Pitrou38425292010-09-21 18:19:07 +0000447 def test_stat_attributes(self):
448 self.check_stat_attributes(self.fname)
449
450 def test_stat_attributes_bytes(self):
451 try:
452 fname = self.fname.encode(sys.getfilesystemencoding())
453 except UnicodeEncodeError:
454 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700455 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000456
Christian Heimes25827622013-10-12 01:27:08 +0200457 def test_stat_result_pickle(self):
458 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200459 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
460 p = pickle.dumps(result, proto)
461 self.assertIn(b'stat_result', p)
462 if proto < 4:
463 self.assertIn(b'cos\nstat_result\n', p)
464 unpickled = pickle.loads(p)
465 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200466
Serhiy Storchaka43767632013-11-03 21:31:38 +0200467 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000468 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700469 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000470
471 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000472 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000473
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000474 # Make sure all the attributes are there.
475 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
476 'ffree', 'favail', 'flag', 'namemax')
477 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000478 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000479
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100480 self.assertTrue(isinstance(result.f_fsid, int))
481
482 # Test that the size of the tuple doesn't change
483 self.assertEqual(len(result), 10)
484
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000485 # Make sure that assignment really fails
486 try:
487 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200488 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000489 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000490 pass
491
492 try:
493 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200494 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000495 except AttributeError:
496 pass
497
498 # Use the constructor with a too-short tuple.
499 try:
500 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200501 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000502 except TypeError:
503 pass
504
Ezio Melotti42da6632011-03-15 05:18:48 +0200505 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000506 try:
507 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
508 except TypeError:
509 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000510
Christian Heimes25827622013-10-12 01:27:08 +0200511 @unittest.skipUnless(hasattr(os, 'statvfs'),
512 "need os.statvfs()")
513 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700514 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200515
Serhiy Storchakabad12572014-12-15 14:03:42 +0200516 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
517 p = pickle.dumps(result, proto)
518 self.assertIn(b'statvfs_result', p)
519 if proto < 4:
520 self.assertIn(b'cos\nstatvfs_result\n', p)
521 unpickled = pickle.loads(p)
522 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200523
Serhiy Storchaka43767632013-11-03 21:31:38 +0200524 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
525 def test_1686475(self):
526 # Verify that an open file can be stat'ed
527 try:
528 os.stat(r"c:\pagefile.sys")
529 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600530 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200531 except OSError as e:
532 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000533
Serhiy Storchaka43767632013-11-03 21:31:38 +0200534 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
535 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
536 def test_15261(self):
537 # Verify that stat'ing a closed fd does not cause crash
538 r, w = os.pipe()
539 try:
540 os.stat(r) # should not raise error
541 finally:
542 os.close(r)
543 os.close(w)
544 with self.assertRaises(OSError) as ctx:
545 os.stat(r)
546 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100547
Zachary Ware63f277b2014-06-19 09:46:37 -0500548 def check_file_attributes(self, result):
549 self.assertTrue(hasattr(result, 'st_file_attributes'))
550 self.assertTrue(isinstance(result.st_file_attributes, int))
551 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
552
553 @unittest.skipUnless(sys.platform == "win32",
554 "st_file_attributes is Win32 specific")
555 def test_file_attributes(self):
556 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
557 result = os.stat(self.fname)
558 self.check_file_attributes(result)
559 self.assertEqual(
560 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
561 0)
562
563 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200564 dirname = support.TESTFN + "dir"
565 os.mkdir(dirname)
566 self.addCleanup(os.rmdir, dirname)
567
568 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500569 self.check_file_attributes(result)
570 self.assertEqual(
571 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
572 stat.FILE_ATTRIBUTE_DIRECTORY)
573
Berker Peksag0b4dc482016-09-17 15:49:59 +0300574 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
575 def test_access_denied(self):
576 # Default to FindFirstFile WIN32_FIND_DATA when access is
577 # denied. See issue 28075.
578 # os.environ['TEMP'] should be located on a volume that
579 # supports file ACLs.
580 fname = os.path.join(os.environ['TEMP'], self.fname)
581 self.addCleanup(support.unlink, fname)
582 create_file(fname, b'ABC')
583 # Deny the right to [S]YNCHRONIZE on the file to
584 # force CreateFile to fail with ERROR_ACCESS_DENIED.
585 DETACHED_PROCESS = 8
586 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500587 # bpo-30584: Use security identifier *S-1-5-32-545 instead
588 # of localized "Users" to not depend on the locale.
589 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300590 creationflags=DETACHED_PROCESS
591 )
592 result = os.stat(fname)
593 self.assertNotEqual(result.st_size, 0)
594
Steve Dower772ec0f2019-09-04 14:42:54 -0700595 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
596 def test_stat_block_device(self):
597 # bpo-38030: os.stat fails for block devices
598 # Test a filename like "//./C:"
599 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
600 result = os.stat(fname)
601 self.assertEqual(result.st_mode, stat.S_IFBLK)
602
Victor Stinner47aacc82015-06-12 17:26:23 +0200603
604class UtimeTests(unittest.TestCase):
605 def setUp(self):
606 self.dirname = support.TESTFN
607 self.fname = os.path.join(self.dirname, "f1")
608
609 self.addCleanup(support.rmtree, self.dirname)
610 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100611 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200612
Victor Stinner47aacc82015-06-12 17:26:23 +0200613 def support_subsecond(self, filename):
614 # Heuristic to check if the filesystem supports timestamp with
615 # subsecond resolution: check if float and int timestamps are different
616 st = os.stat(filename)
617 return ((st.st_atime != st[7])
618 or (st.st_mtime != st[8])
619 or (st.st_ctime != st[9]))
620
621 def _test_utime(self, set_time, filename=None):
622 if not filename:
623 filename = self.fname
624
625 support_subsecond = self.support_subsecond(filename)
626 if support_subsecond:
627 # Timestamp with a resolution of 1 microsecond (10^-6).
628 #
629 # The resolution of the C internal function used by os.utime()
630 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
631 # test with a resolution of 1 ns requires more work:
632 # see the issue #15745.
633 atime_ns = 1002003000 # 1.002003 seconds
634 mtime_ns = 4005006000 # 4.005006 seconds
635 else:
636 # use a resolution of 1 second
637 atime_ns = 5 * 10**9
638 mtime_ns = 8 * 10**9
639
640 set_time(filename, (atime_ns, mtime_ns))
641 st = os.stat(filename)
642
643 if support_subsecond:
644 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
645 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
646 else:
647 self.assertEqual(st.st_atime, atime_ns * 1e-9)
648 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
649 self.assertEqual(st.st_atime_ns, atime_ns)
650 self.assertEqual(st.st_mtime_ns, mtime_ns)
651
652 def test_utime(self):
653 def set_time(filename, ns):
654 # test the ns keyword parameter
655 os.utime(filename, ns=ns)
656 self._test_utime(set_time)
657
658 @staticmethod
659 def ns_to_sec(ns):
660 # Convert a number of nanosecond (int) to a number of seconds (float).
661 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
662 # issue, os.utime() rounds towards minus infinity.
663 return (ns * 1e-9) + 0.5e-9
664
665 def test_utime_by_indexed(self):
666 # pass times as floating point seconds as the second indexed parameter
667 def set_time(filename, ns):
668 atime_ns, mtime_ns = ns
669 atime = self.ns_to_sec(atime_ns)
670 mtime = self.ns_to_sec(mtime_ns)
671 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
672 # or utime(time_t)
673 os.utime(filename, (atime, mtime))
674 self._test_utime(set_time)
675
676 def test_utime_by_times(self):
677 def set_time(filename, ns):
678 atime_ns, mtime_ns = ns
679 atime = self.ns_to_sec(atime_ns)
680 mtime = self.ns_to_sec(mtime_ns)
681 # test the times keyword parameter
682 os.utime(filename, times=(atime, mtime))
683 self._test_utime(set_time)
684
685 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
686 "follow_symlinks support for utime required "
687 "for this test.")
688 def test_utime_nofollow_symlinks(self):
689 def set_time(filename, ns):
690 # use follow_symlinks=False to test utimensat(timespec)
691 # or lutimes(timeval)
692 os.utime(filename, ns=ns, follow_symlinks=False)
693 self._test_utime(set_time)
694
695 @unittest.skipUnless(os.utime in os.supports_fd,
696 "fd support for utime required for this test.")
697 def test_utime_fd(self):
698 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100699 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200700 # use a file descriptor to test futimens(timespec)
701 # or futimes(timeval)
702 os.utime(fp.fileno(), ns=ns)
703 self._test_utime(set_time)
704
705 @unittest.skipUnless(os.utime in os.supports_dir_fd,
706 "dir_fd support for utime required for this test.")
707 def test_utime_dir_fd(self):
708 def set_time(filename, ns):
709 dirname, name = os.path.split(filename)
710 dirfd = os.open(dirname, os.O_RDONLY)
711 try:
712 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
713 os.utime(name, dir_fd=dirfd, ns=ns)
714 finally:
715 os.close(dirfd)
716 self._test_utime(set_time)
717
718 def test_utime_directory(self):
719 def set_time(filename, ns):
720 # test calling os.utime() on a directory
721 os.utime(filename, ns=ns)
722 self._test_utime(set_time, filename=self.dirname)
723
724 def _test_utime_current(self, set_time):
725 # Get the system clock
726 current = time.time()
727
728 # Call os.utime() to set the timestamp to the current system clock
729 set_time(self.fname)
730
731 if not self.support_subsecond(self.fname):
732 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700733 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200734 # On Windows, the usual resolution of time.time() is 15.6 ms.
735 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700736 #
737 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
738 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200739 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200740 st = os.stat(self.fname)
741 msg = ("st_time=%r, current=%r, dt=%r"
742 % (st.st_mtime, current, st.st_mtime - current))
743 self.assertAlmostEqual(st.st_mtime, current,
744 delta=delta, msg=msg)
745
746 def test_utime_current(self):
747 def set_time(filename):
748 # Set to the current time in the new way
749 os.utime(self.fname)
750 self._test_utime_current(set_time)
751
752 def test_utime_current_old(self):
753 def set_time(filename):
754 # Set to the current time in the old explicit way.
755 os.utime(self.fname, None)
756 self._test_utime_current(set_time)
757
758 def get_file_system(self, path):
759 if sys.platform == 'win32':
760 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
761 import ctypes
762 kernel32 = ctypes.windll.kernel32
763 buf = ctypes.create_unicode_buffer("", 100)
764 ok = kernel32.GetVolumeInformationW(root, None, 0,
765 None, None, None,
766 buf, len(buf))
767 if ok:
768 return buf.value
769 # return None if the filesystem is unknown
770
771 def test_large_time(self):
772 # Many filesystems are limited to the year 2038. At least, the test
773 # pass with NTFS filesystem.
774 if self.get_file_system(self.dirname) != "NTFS":
775 self.skipTest("requires NTFS")
776
777 large = 5000000000 # some day in 2128
778 os.utime(self.fname, (large, large))
779 self.assertEqual(os.stat(self.fname).st_mtime, large)
780
781 def test_utime_invalid_arguments(self):
782 # seconds and nanoseconds parameters are mutually exclusive
783 with self.assertRaises(ValueError):
784 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200785 with self.assertRaises(TypeError):
786 os.utime(self.fname, [5, 5])
787 with self.assertRaises(TypeError):
788 os.utime(self.fname, (5,))
789 with self.assertRaises(TypeError):
790 os.utime(self.fname, (5, 5, 5))
791 with self.assertRaises(TypeError):
792 os.utime(self.fname, ns=[5, 5])
793 with self.assertRaises(TypeError):
794 os.utime(self.fname, ns=(5,))
795 with self.assertRaises(TypeError):
796 os.utime(self.fname, ns=(5, 5, 5))
797
798 if os.utime not in os.supports_follow_symlinks:
799 with self.assertRaises(NotImplementedError):
800 os.utime(self.fname, (5, 5), follow_symlinks=False)
801 if os.utime not in os.supports_fd:
802 with open(self.fname, 'wb', 0) as fp:
803 with self.assertRaises(TypeError):
804 os.utime(fp.fileno(), (5, 5))
805 if os.utime not in os.supports_dir_fd:
806 with self.assertRaises(NotImplementedError):
807 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200808
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300809 @support.cpython_only
810 def test_issue31577(self):
811 # The interpreter shouldn't crash in case utime() received a bad
812 # ns argument.
813 def get_bad_int(divmod_ret_val):
814 class BadInt:
815 def __divmod__(*args):
816 return divmod_ret_val
817 return BadInt()
818 with self.assertRaises(TypeError):
819 os.utime(self.fname, ns=(get_bad_int(42), 1))
820 with self.assertRaises(TypeError):
821 os.utime(self.fname, ns=(get_bad_int(()), 1))
822 with self.assertRaises(TypeError):
823 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
824
Victor Stinner47aacc82015-06-12 17:26:23 +0200825
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000826from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000827
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000828class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000829 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000830 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000831
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000832 def setUp(self):
833 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000834 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000835 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000836 for key, value in self._reference().items():
837 os.environ[key] = value
838
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000839 def tearDown(self):
840 os.environ.clear()
841 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000842 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000843 os.environb.clear()
844 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000845
Christian Heimes90333392007-11-01 19:08:42 +0000846 def _reference(self):
847 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
848
849 def _empty_mapping(self):
850 os.environ.clear()
851 return os.environ
852
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000853 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200854 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
855 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000856 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000857 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300858 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200859 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300860 value = popen.read().strip()
861 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000862
Xavier de Gayed1415312016-07-22 12:15:29 +0200863 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
864 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000865 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200866 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
867 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300868 it = iter(popen)
869 self.assertEqual(next(it), "line1\n")
870 self.assertEqual(next(it), "line2\n")
871 self.assertEqual(next(it), "line3\n")
872 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000873
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000874 # Verify environ keys and values from the OS are of the
875 # correct str type.
876 def test_keyvalue_types(self):
877 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000878 self.assertEqual(type(key), str)
879 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000880
Christian Heimes90333392007-11-01 19:08:42 +0000881 def test_items(self):
882 for key, value in self._reference().items():
883 self.assertEqual(os.environ.get(key), value)
884
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000885 # Issue 7310
886 def test___repr__(self):
887 """Check that the repr() of os.environ looks like environ({...})."""
888 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000889 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
890 '{!r}: {!r}'.format(key, value)
891 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000892
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000893 def test_get_exec_path(self):
894 defpath_list = os.defpath.split(os.pathsep)
895 test_path = ['/monty', '/python', '', '/flying/circus']
896 test_env = {'PATH': os.pathsep.join(test_path)}
897
898 saved_environ = os.environ
899 try:
900 os.environ = dict(test_env)
901 # Test that defaulting to os.environ works.
902 self.assertSequenceEqual(test_path, os.get_exec_path())
903 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
904 finally:
905 os.environ = saved_environ
906
907 # No PATH environment variable
908 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
909 # Empty PATH environment variable
910 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
911 # Supplied PATH environment variable
912 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
913
Victor Stinnerb745a742010-05-18 17:17:23 +0000914 if os.supports_bytes_environ:
915 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000916 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000917 # ignore BytesWarning warning
918 with warnings.catch_warnings(record=True):
919 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000920 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000921 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000922 pass
923 else:
924 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000925
926 # bytes key and/or value
927 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
928 ['abc'])
929 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
930 ['abc'])
931 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
932 ['abc'])
933
934 @unittest.skipUnless(os.supports_bytes_environ,
935 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000936 def test_environb(self):
937 # os.environ -> os.environb
938 value = 'euro\u20ac'
939 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000940 value_bytes = value.encode(sys.getfilesystemencoding(),
941 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000942 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000943 msg = "U+20AC character is not encodable to %s" % (
944 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000945 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000946 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(os.environ['unicode'], value)
948 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000949
950 # os.environb -> os.environ
951 value = b'\xff'
952 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000954 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000955 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000956
Victor Stinner161e7b32020-01-24 11:53:44 +0100957 def test_putenv_unsetenv(self):
958 name = "PYTHONTESTVAR"
959 value = "testvalue"
960 code = f'import os; print(repr(os.environ.get({name!r})))'
961
962 with support.EnvironmentVarGuard() as env:
963 env.pop(name, None)
964
965 os.putenv(name, value)
966 proc = subprocess.run([sys.executable, '-c', code], check=True,
967 stdout=subprocess.PIPE, text=True)
968 self.assertEqual(proc.stdout.rstrip(), repr(value))
969
970 os.unsetenv(name)
971 proc = subprocess.run([sys.executable, '-c', code], check=True,
972 stdout=subprocess.PIPE, text=True)
973 self.assertEqual(proc.stdout.rstrip(), repr(None))
974
Victor Stinner13ff2452018-01-22 18:32:50 +0100975 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100976 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100977 def test_putenv_unsetenv_error(self):
978 # Empty variable name is invalid.
979 # "=" and null character are not allowed in a variable name.
980 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
981 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
982 self.assertRaises((OSError, ValueError), os.unsetenv, name)
983
Victor Stinnerb73dd022020-01-22 21:11:17 +0100984 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100985 # On Windows, an environment variable string ("name=value" string)
986 # is limited to 32,767 characters
987 longstr = 'x' * 32_768
988 self.assertRaises(ValueError, os.putenv, longstr, "1")
989 self.assertRaises(ValueError, os.putenv, "X", longstr)
990 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +0100991
Victor Stinner6d101392013-04-14 16:35:04 +0200992 def test_key_type(self):
993 missing = 'missingkey'
994 self.assertNotIn(missing, os.environ)
995
Victor Stinner839e5ea2013-04-14 16:43:03 +0200996 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200997 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200998 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200999 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001000
Victor Stinner839e5ea2013-04-14 16:43:03 +02001001 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001002 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001003 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001004 self.assertTrue(cm.exception.__suppress_context__)
1005
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001006 def _test_environ_iteration(self, collection):
1007 iterator = iter(collection)
1008 new_key = "__new_key__"
1009
1010 next(iterator) # start iteration over os.environ.items
1011
1012 # add a new key in os.environ mapping
1013 os.environ[new_key] = "test_environ_iteration"
1014
1015 try:
1016 next(iterator) # force iteration over modified mapping
1017 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1018 finally:
1019 del os.environ[new_key]
1020
1021 def test_iter_error_when_changing_os_environ(self):
1022 self._test_environ_iteration(os.environ)
1023
1024 def test_iter_error_when_changing_os_environ_items(self):
1025 self._test_environ_iteration(os.environ.items())
1026
1027 def test_iter_error_when_changing_os_environ_values(self):
1028 self._test_environ_iteration(os.environ.values())
1029
Charles Burklandd648ef12020-03-13 09:04:43 -07001030 def _test_underlying_process_env(self, var, expected):
1031 if not (unix_shell and os.path.exists(unix_shell)):
1032 return
1033
1034 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1035 value = popen.read().strip()
1036
1037 self.assertEqual(expected, value)
1038
1039 def test_or_operator(self):
1040 overridden_key = '_TEST_VAR_'
1041 original_value = 'original_value'
1042 os.environ[overridden_key] = original_value
1043
1044 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1045 expected = dict(os.environ)
1046 expected.update(new_vars_dict)
1047
1048 actual = os.environ | new_vars_dict
1049 self.assertDictEqual(expected, actual)
1050 self.assertEqual('3', actual[overridden_key])
1051
1052 new_vars_items = new_vars_dict.items()
1053 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1054
1055 self._test_underlying_process_env('_A_', '')
1056 self._test_underlying_process_env(overridden_key, original_value)
1057
1058 def test_ior_operator(self):
1059 overridden_key = '_TEST_VAR_'
1060 os.environ[overridden_key] = 'original_value'
1061
1062 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1063 expected = dict(os.environ)
1064 expected.update(new_vars_dict)
1065
1066 os.environ |= new_vars_dict
1067 self.assertEqual(expected, os.environ)
1068 self.assertEqual('3', os.environ[overridden_key])
1069
1070 self._test_underlying_process_env('_A_', '1')
1071 self._test_underlying_process_env(overridden_key, '3')
1072
1073 def test_ior_operator_invalid_dicts(self):
1074 os_environ_copy = os.environ.copy()
1075 with self.assertRaises(TypeError):
1076 dict_with_bad_key = {1: '_A_'}
1077 os.environ |= dict_with_bad_key
1078
1079 with self.assertRaises(TypeError):
1080 dict_with_bad_val = {'_A_': 1}
1081 os.environ |= dict_with_bad_val
1082
1083 # Check nothing was added.
1084 self.assertEqual(os_environ_copy, os.environ)
1085
1086 def test_ior_operator_key_value_iterable(self):
1087 overridden_key = '_TEST_VAR_'
1088 os.environ[overridden_key] = 'original_value'
1089
1090 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1091 expected = dict(os.environ)
1092 expected.update(new_vars_items)
1093
1094 os.environ |= new_vars_items
1095 self.assertEqual(expected, os.environ)
1096 self.assertEqual('3', os.environ[overridden_key])
1097
1098 self._test_underlying_process_env('_A_', '1')
1099 self._test_underlying_process_env(overridden_key, '3')
1100
1101 def test_ror_operator(self):
1102 overridden_key = '_TEST_VAR_'
1103 original_value = 'original_value'
1104 os.environ[overridden_key] = original_value
1105
1106 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1107 expected = dict(new_vars_dict)
1108 expected.update(os.environ)
1109
1110 actual = new_vars_dict | os.environ
1111 self.assertDictEqual(expected, actual)
1112 self.assertEqual(original_value, actual[overridden_key])
1113
1114 new_vars_items = new_vars_dict.items()
1115 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1116
1117 self._test_underlying_process_env('_A_', '')
1118 self._test_underlying_process_env(overridden_key, original_value)
1119
Victor Stinner6d101392013-04-14 16:35:04 +02001120
Tim Petersc4e09402003-04-25 07:11:48 +00001121class WalkTests(unittest.TestCase):
1122 """Tests for os.walk()."""
1123
Victor Stinner0561c532015-03-12 10:28:24 +01001124 # Wrapper to hide minor differences between os.walk and os.fwalk
1125 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001126 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001127 if 'follow_symlinks' in kwargs:
1128 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001129 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001130
Charles-François Natali7372b062012-02-05 15:15:38 +01001131 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001132 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001133 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001134
1135 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001136 # TESTFN/
1137 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001138 # tmp1
1139 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001140 # tmp2
1141 # SUB11/ no kids
1142 # SUB2/ a file kid and a dirsymlink kid
1143 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001144 # SUB21/ not readable
1145 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001146 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001147 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001148 # broken_link2
1149 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001150 # TEST2/
1151 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001152 self.walk_path = join(support.TESTFN, "TEST1")
1153 self.sub1_path = join(self.walk_path, "SUB1")
1154 self.sub11_path = join(self.sub1_path, "SUB11")
1155 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001156 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001157 tmp1_path = join(self.walk_path, "tmp1")
1158 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001159 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001160 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001161 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001162 t2_path = join(support.TESTFN, "TEST2")
1163 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001164 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001165 broken_link2_path = join(sub2_path, "broken_link2")
1166 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001167
1168 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001169 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001170 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001171 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001172 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001173
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001174 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001175 with open(path, "x") as f:
1176 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001177
Victor Stinner0561c532015-03-12 10:28:24 +01001178 if support.can_symlink():
1179 os.symlink(os.path.abspath(t2_path), self.link_path)
1180 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001181 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1182 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001183 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001184 ["broken_link", "broken_link2", "broken_link3",
1185 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001186 else:
pxinwr3e028b22019-02-15 13:04:47 +08001187 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001188
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001189 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001190 try:
1191 os.listdir(sub21_path)
1192 except PermissionError:
1193 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1194 else:
1195 os.chmod(sub21_path, stat.S_IRWXU)
1196 os.unlink(tmp5_path)
1197 os.rmdir(sub21_path)
1198 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001199
Victor Stinner0561c532015-03-12 10:28:24 +01001200 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001201 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001202 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001203
Tim Petersc4e09402003-04-25 07:11:48 +00001204 self.assertEqual(len(all), 4)
1205 # We can't know which order SUB1 and SUB2 will appear in.
1206 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1207 # flipped: TESTFN, SUB2, SUB1, SUB11
1208 flipped = all[0][1][0] != "SUB1"
1209 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001210 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001211 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001212 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1213 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1214 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1215 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001216
Brett Cannon3f9183b2016-08-26 14:44:48 -07001217 def test_walk_prune(self, walk_path=None):
1218 if walk_path is None:
1219 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001220 # Prune the search.
1221 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001222 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001223 all.append((root, dirs, files))
1224 # Don't descend into SUB1.
1225 if 'SUB1' in dirs:
1226 # Note that this also mutates the dirs we appended to all!
1227 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001228
Victor Stinner0561c532015-03-12 10:28:24 +01001229 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001230 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001231
1232 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001233 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001234 self.assertEqual(all[1], self.sub2_tree)
1235
Brett Cannon3f9183b2016-08-26 14:44:48 -07001236 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001237 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001238
Victor Stinner0561c532015-03-12 10:28:24 +01001239 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001240 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001241 all = list(self.walk(self.walk_path, topdown=False))
1242
Victor Stinner53b0a412016-03-26 01:12:36 +01001243 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001244 # We can't know which order SUB1 and SUB2 will appear in.
1245 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1246 # flipped: SUB2, SUB11, SUB1, TESTFN
1247 flipped = all[3][1][0] != "SUB1"
1248 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001249 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001250 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001251 self.assertEqual(all[3],
1252 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1253 self.assertEqual(all[flipped],
1254 (self.sub11_path, [], []))
1255 self.assertEqual(all[flipped + 1],
1256 (self.sub1_path, ["SUB11"], ["tmp2"]))
1257 self.assertEqual(all[2 - 2 * flipped],
1258 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001259
Victor Stinner0561c532015-03-12 10:28:24 +01001260 def test_walk_symlink(self):
1261 if not support.can_symlink():
1262 self.skipTest("need symlink support")
1263
1264 # Walk, following symlinks.
1265 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1266 for root, dirs, files in walk_it:
1267 if root == self.link_path:
1268 self.assertEqual(dirs, [])
1269 self.assertEqual(files, ["tmp4"])
1270 break
1271 else:
1272 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001273
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001274 def test_walk_bad_dir(self):
1275 # Walk top-down.
1276 errors = []
1277 walk_it = self.walk(self.walk_path, onerror=errors.append)
1278 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001279 self.assertEqual(errors, [])
1280 dir1 = 'SUB1'
1281 path1 = os.path.join(root, dir1)
1282 path1new = os.path.join(root, dir1 + '.new')
1283 os.rename(path1, path1new)
1284 try:
1285 roots = [r for r, d, f in walk_it]
1286 self.assertTrue(errors)
1287 self.assertNotIn(path1, roots)
1288 self.assertNotIn(path1new, roots)
1289 for dir2 in dirs:
1290 if dir2 != dir1:
1291 self.assertIn(os.path.join(root, dir2), roots)
1292 finally:
1293 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001294
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001295 def test_walk_many_open_files(self):
1296 depth = 30
1297 base = os.path.join(support.TESTFN, 'deep')
1298 p = os.path.join(base, *(['d']*depth))
1299 os.makedirs(p)
1300
1301 iters = [self.walk(base, topdown=False) for j in range(100)]
1302 for i in range(depth + 1):
1303 expected = (p, ['d'] if i else [], [])
1304 for it in iters:
1305 self.assertEqual(next(it), expected)
1306 p = os.path.dirname(p)
1307
1308 iters = [self.walk(base, topdown=True) for j in range(100)]
1309 p = base
1310 for i in range(depth + 1):
1311 expected = (p, ['d'] if i < depth else [], [])
1312 for it in iters:
1313 self.assertEqual(next(it), expected)
1314 p = os.path.join(p, 'd')
1315
Charles-François Natali7372b062012-02-05 15:15:38 +01001316
1317@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1318class FwalkTests(WalkTests):
1319 """Tests for os.fwalk()."""
1320
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001321 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001322 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001323 yield (root, dirs, files)
1324
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001325 def fwalk(self, *args, **kwargs):
1326 return os.fwalk(*args, **kwargs)
1327
Larry Hastingsc48fe982012-06-25 04:49:05 -07001328 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1329 """
1330 compare with walk() results.
1331 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001332 walk_kwargs = walk_kwargs.copy()
1333 fwalk_kwargs = fwalk_kwargs.copy()
1334 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1335 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1336 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001337
Charles-François Natali7372b062012-02-05 15:15:38 +01001338 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001339 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001340 expected[root] = (set(dirs), set(files))
1341
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001342 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001343 self.assertIn(root, expected)
1344 self.assertEqual(expected[root], (set(dirs), set(files)))
1345
Larry Hastingsc48fe982012-06-25 04:49:05 -07001346 def test_compare_to_walk(self):
1347 kwargs = {'top': support.TESTFN}
1348 self._compare_to_walk(kwargs, kwargs)
1349
Charles-François Natali7372b062012-02-05 15:15:38 +01001350 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001351 try:
1352 fd = os.open(".", os.O_RDONLY)
1353 walk_kwargs = {'top': support.TESTFN}
1354 fwalk_kwargs = walk_kwargs.copy()
1355 fwalk_kwargs['dir_fd'] = fd
1356 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1357 finally:
1358 os.close(fd)
1359
1360 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001361 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001362 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1363 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001364 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001365 # check that the FD is valid
1366 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001367 # redundant check
1368 os.stat(rootfd)
1369 # check that listdir() returns consistent information
1370 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001371
1372 def test_fd_leak(self):
1373 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1374 # we both check that calling fwalk() a large number of times doesn't
1375 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1376 minfd = os.dup(1)
1377 os.close(minfd)
1378 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001379 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001380 pass
1381 newfd = os.dup(1)
1382 self.addCleanup(os.close, newfd)
1383 self.assertEqual(newfd, minfd)
1384
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001385 # fwalk() keeps file descriptors open
1386 test_walk_many_open_files = None
1387
1388
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001389class BytesWalkTests(WalkTests):
1390 """Tests for os.walk() with bytes."""
1391 def walk(self, top, **kwargs):
1392 if 'follow_symlinks' in kwargs:
1393 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1394 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1395 root = os.fsdecode(broot)
1396 dirs = list(map(os.fsdecode, bdirs))
1397 files = list(map(os.fsdecode, bfiles))
1398 yield (root, dirs, files)
1399 bdirs[:] = list(map(os.fsencode, dirs))
1400 bfiles[:] = list(map(os.fsencode, files))
1401
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001402@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1403class BytesFwalkTests(FwalkTests):
1404 """Tests for os.walk() with bytes."""
1405 def fwalk(self, top='.', *args, **kwargs):
1406 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1407 root = os.fsdecode(broot)
1408 dirs = list(map(os.fsdecode, bdirs))
1409 files = list(map(os.fsdecode, bfiles))
1410 yield (root, dirs, files, topfd)
1411 bdirs[:] = list(map(os.fsencode, dirs))
1412 bfiles[:] = list(map(os.fsencode, files))
1413
Charles-François Natali7372b062012-02-05 15:15:38 +01001414
Guido van Rossume7ba4952007-06-06 23:52:48 +00001415class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001416 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001417 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001418
1419 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001420 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001421 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1422 os.makedirs(path) # Should work
1423 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1424 os.makedirs(path)
1425
1426 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001427 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001428 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1429 os.makedirs(path)
1430 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1431 'dir5', 'dir6')
1432 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001433
Serhiy Storchakae304e332017-03-24 13:27:42 +02001434 def test_mode(self):
1435 with support.temp_umask(0o002):
1436 base = support.TESTFN
1437 parent = os.path.join(base, 'dir1')
1438 path = os.path.join(parent, 'dir2')
1439 os.makedirs(path, 0o555)
1440 self.assertTrue(os.path.exists(path))
1441 self.assertTrue(os.path.isdir(path))
1442 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001443 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1444 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001445
Terry Reedy5a22b652010-12-02 07:05:56 +00001446 def test_exist_ok_existing_directory(self):
1447 path = os.path.join(support.TESTFN, 'dir1')
1448 mode = 0o777
1449 old_mask = os.umask(0o022)
1450 os.makedirs(path, mode)
1451 self.assertRaises(OSError, os.makedirs, path, mode)
1452 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001453 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001454 os.makedirs(path, mode=mode, exist_ok=True)
1455 os.umask(old_mask)
1456
Martin Pantera82642f2015-11-19 04:48:44 +00001457 # Issue #25583: A drive root could raise PermissionError on Windows
1458 os.makedirs(os.path.abspath('/'), exist_ok=True)
1459
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001460 def test_exist_ok_s_isgid_directory(self):
1461 path = os.path.join(support.TESTFN, 'dir1')
1462 S_ISGID = stat.S_ISGID
1463 mode = 0o777
1464 old_mask = os.umask(0o022)
1465 try:
1466 existing_testfn_mode = stat.S_IMODE(
1467 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001468 try:
1469 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001470 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001471 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001472 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1473 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1474 # The os should apply S_ISGID from the parent dir for us, but
1475 # this test need not depend on that behavior. Be explicit.
1476 os.makedirs(path, mode | S_ISGID)
1477 # http://bugs.python.org/issue14992
1478 # Should not fail when the bit is already set.
1479 os.makedirs(path, mode, exist_ok=True)
1480 # remove the bit.
1481 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001482 # May work even when the bit is not already set when demanded.
1483 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001484 finally:
1485 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001486
1487 def test_exist_ok_existing_regular_file(self):
1488 base = support.TESTFN
1489 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001490 with open(path, 'w') as f:
1491 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001492 self.assertRaises(OSError, os.makedirs, path)
1493 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1494 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1495 os.remove(path)
1496
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001497 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001498 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001499 'dir4', 'dir5', 'dir6')
1500 # If the tests failed, the bottom-most directory ('../dir6')
1501 # may not have been created, so we look for the outermost directory
1502 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001503 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001504 path = os.path.dirname(path)
1505
1506 os.removedirs(path)
1507
Andrew Svetlov405faed2012-12-25 12:18:09 +02001508
R David Murrayf2ad1732014-12-25 18:36:56 -05001509@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1510class ChownFileTests(unittest.TestCase):
1511
Berker Peksag036a71b2015-07-21 09:29:48 +03001512 @classmethod
1513 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001514 os.mkdir(support.TESTFN)
1515
1516 def test_chown_uid_gid_arguments_must_be_index(self):
1517 stat = os.stat(support.TESTFN)
1518 uid = stat.st_uid
1519 gid = stat.st_gid
1520 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1521 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1522 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1523 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1524 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1525
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001526 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1527 def test_chown_gid(self):
1528 groups = os.getgroups()
1529 if len(groups) < 2:
1530 self.skipTest("test needs at least 2 groups")
1531
R David Murrayf2ad1732014-12-25 18:36:56 -05001532 gid_1, gid_2 = groups[:2]
1533 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001534
R David Murrayf2ad1732014-12-25 18:36:56 -05001535 os.chown(support.TESTFN, uid, gid_1)
1536 gid = os.stat(support.TESTFN).st_gid
1537 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001538
R David Murrayf2ad1732014-12-25 18:36:56 -05001539 os.chown(support.TESTFN, uid, gid_2)
1540 gid = os.stat(support.TESTFN).st_gid
1541 self.assertEqual(gid, gid_2)
1542
1543 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1544 "test needs root privilege and more than one user")
1545 def test_chown_with_root(self):
1546 uid_1, uid_2 = all_users[:2]
1547 gid = os.stat(support.TESTFN).st_gid
1548 os.chown(support.TESTFN, uid_1, gid)
1549 uid = os.stat(support.TESTFN).st_uid
1550 self.assertEqual(uid, uid_1)
1551 os.chown(support.TESTFN, uid_2, gid)
1552 uid = os.stat(support.TESTFN).st_uid
1553 self.assertEqual(uid, uid_2)
1554
1555 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1556 "test needs non-root account and more than one user")
1557 def test_chown_without_permission(self):
1558 uid_1, uid_2 = all_users[:2]
1559 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001560 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001561 os.chown(support.TESTFN, uid_1, gid)
1562 os.chown(support.TESTFN, uid_2, gid)
1563
Berker Peksag036a71b2015-07-21 09:29:48 +03001564 @classmethod
1565 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001566 os.rmdir(support.TESTFN)
1567
1568
Andrew Svetlov405faed2012-12-25 12:18:09 +02001569class RemoveDirsTests(unittest.TestCase):
1570 def setUp(self):
1571 os.makedirs(support.TESTFN)
1572
1573 def tearDown(self):
1574 support.rmtree(support.TESTFN)
1575
1576 def test_remove_all(self):
1577 dira = os.path.join(support.TESTFN, 'dira')
1578 os.mkdir(dira)
1579 dirb = os.path.join(dira, 'dirb')
1580 os.mkdir(dirb)
1581 os.removedirs(dirb)
1582 self.assertFalse(os.path.exists(dirb))
1583 self.assertFalse(os.path.exists(dira))
1584 self.assertFalse(os.path.exists(support.TESTFN))
1585
1586 def test_remove_partial(self):
1587 dira = os.path.join(support.TESTFN, 'dira')
1588 os.mkdir(dira)
1589 dirb = os.path.join(dira, 'dirb')
1590 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001591 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001592 os.removedirs(dirb)
1593 self.assertFalse(os.path.exists(dirb))
1594 self.assertTrue(os.path.exists(dira))
1595 self.assertTrue(os.path.exists(support.TESTFN))
1596
1597 def test_remove_nothing(self):
1598 dira = os.path.join(support.TESTFN, 'dira')
1599 os.mkdir(dira)
1600 dirb = os.path.join(dira, 'dirb')
1601 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001602 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001603 with self.assertRaises(OSError):
1604 os.removedirs(dirb)
1605 self.assertTrue(os.path.exists(dirb))
1606 self.assertTrue(os.path.exists(dira))
1607 self.assertTrue(os.path.exists(support.TESTFN))
1608
1609
Guido van Rossume7ba4952007-06-06 23:52:48 +00001610class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001611 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001612 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001613 f.write(b'hello')
1614 f.close()
1615 with open(os.devnull, 'rb') as f:
1616 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001617
Andrew Svetlov405faed2012-12-25 12:18:09 +02001618
Guido van Rossume7ba4952007-06-06 23:52:48 +00001619class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001620 def test_urandom_length(self):
1621 self.assertEqual(len(os.urandom(0)), 0)
1622 self.assertEqual(len(os.urandom(1)), 1)
1623 self.assertEqual(len(os.urandom(10)), 10)
1624 self.assertEqual(len(os.urandom(100)), 100)
1625 self.assertEqual(len(os.urandom(1000)), 1000)
1626
1627 def test_urandom_value(self):
1628 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001629 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001630 data2 = os.urandom(16)
1631 self.assertNotEqual(data1, data2)
1632
1633 def get_urandom_subprocess(self, count):
1634 code = '\n'.join((
1635 'import os, sys',
1636 'data = os.urandom(%s)' % count,
1637 'sys.stdout.buffer.write(data)',
1638 'sys.stdout.buffer.flush()'))
1639 out = assert_python_ok('-c', code)
1640 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001641 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001642 return stdout
1643
1644 def test_urandom_subprocess(self):
1645 data1 = self.get_urandom_subprocess(16)
1646 data2 = self.get_urandom_subprocess(16)
1647 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001648
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001649
Victor Stinner9b1f4742016-09-06 16:18:52 -07001650@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1651class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001652 @classmethod
1653 def setUpClass(cls):
1654 try:
1655 os.getrandom(1)
1656 except OSError as exc:
1657 if exc.errno == errno.ENOSYS:
1658 # Python compiled on a more recent Linux version
1659 # than the current Linux kernel
1660 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1661 else:
1662 raise
1663
Victor Stinner9b1f4742016-09-06 16:18:52 -07001664 def test_getrandom_type(self):
1665 data = os.getrandom(16)
1666 self.assertIsInstance(data, bytes)
1667 self.assertEqual(len(data), 16)
1668
1669 def test_getrandom0(self):
1670 empty = os.getrandom(0)
1671 self.assertEqual(empty, b'')
1672
1673 def test_getrandom_random(self):
1674 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1675
1676 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1677 # resource /dev/random
1678
1679 def test_getrandom_nonblock(self):
1680 # The call must not fail. Check also that the flag exists
1681 try:
1682 os.getrandom(1, os.GRND_NONBLOCK)
1683 except BlockingIOError:
1684 # System urandom is not initialized yet
1685 pass
1686
1687 def test_getrandom_value(self):
1688 data1 = os.getrandom(16)
1689 data2 = os.getrandom(16)
1690 self.assertNotEqual(data1, data2)
1691
1692
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001693# os.urandom() doesn't use a file descriptor when it is implemented with the
1694# getentropy() function, the getrandom() function or the getrandom() syscall
1695OS_URANDOM_DONT_USE_FD = (
1696 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1697 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1698 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001699
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001700@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1701 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001702@unittest.skipIf(sys.platform == "vxworks",
1703 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001704class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001705 @unittest.skipUnless(resource, "test requires the resource module")
1706 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001707 # Check urandom() failing when it is not able to open /dev/random.
1708 # We spawn a new process to make the test more robust (if getrlimit()
1709 # failed to restore the file descriptor limit after this, the whole
1710 # test suite would crash; this actually happened on the OS X Tiger
1711 # buildbot).
1712 code = """if 1:
1713 import errno
1714 import os
1715 import resource
1716
1717 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1718 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1719 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001720 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001721 except OSError as e:
1722 assert e.errno == errno.EMFILE, e.errno
1723 else:
1724 raise AssertionError("OSError not raised")
1725 """
1726 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001727
Antoine Pitroue472aea2014-04-26 14:33:03 +02001728 def test_urandom_fd_closed(self):
1729 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1730 # closed.
1731 code = """if 1:
1732 import os
1733 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001734 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001735 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001736 with test.support.SuppressCrashReport():
1737 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001738 sys.stdout.buffer.write(os.urandom(4))
1739 """
1740 rc, out, err = assert_python_ok('-Sc', code)
1741
1742 def test_urandom_fd_reopened(self):
1743 # Issue #21207: urandom() should detect its fd to /dev/urandom
1744 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001745 self.addCleanup(support.unlink, support.TESTFN)
1746 create_file(support.TESTFN, b"x" * 256)
1747
Antoine Pitroue472aea2014-04-26 14:33:03 +02001748 code = """if 1:
1749 import os
1750 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001751 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001752 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001753 with test.support.SuppressCrashReport():
1754 for fd in range(3, 256):
1755 try:
1756 os.close(fd)
1757 except OSError:
1758 pass
1759 else:
1760 # Found the urandom fd (XXX hopefully)
1761 break
1762 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001763 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001764 new_fd = f.fileno()
1765 # Issue #26935: posix allows new_fd and fd to be equal but
1766 # some libc implementations have dup2 return an error in this
1767 # case.
1768 if new_fd != fd:
1769 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001770 sys.stdout.buffer.write(os.urandom(4))
1771 sys.stdout.buffer.write(os.urandom(4))
1772 """.format(TESTFN=support.TESTFN)
1773 rc, out, err = assert_python_ok('-Sc', code)
1774 self.assertEqual(len(out), 8)
1775 self.assertNotEqual(out[0:4], out[4:8])
1776 rc, out2, err2 = assert_python_ok('-Sc', code)
1777 self.assertEqual(len(out2), 8)
1778 self.assertNotEqual(out2, out)
1779
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001780
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001781@contextlib.contextmanager
1782def _execvpe_mockup(defpath=None):
1783 """
1784 Stubs out execv and execve functions when used as context manager.
1785 Records exec calls. The mock execv and execve functions always raise an
1786 exception as they would normally never return.
1787 """
1788 # A list of tuples containing (function name, first arg, args)
1789 # of calls to execv or execve that have been made.
1790 calls = []
1791
1792 def mock_execv(name, *args):
1793 calls.append(('execv', name, args))
1794 raise RuntimeError("execv called")
1795
1796 def mock_execve(name, *args):
1797 calls.append(('execve', name, args))
1798 raise OSError(errno.ENOTDIR, "execve called")
1799
1800 try:
1801 orig_execv = os.execv
1802 orig_execve = os.execve
1803 orig_defpath = os.defpath
1804 os.execv = mock_execv
1805 os.execve = mock_execve
1806 if defpath is not None:
1807 os.defpath = defpath
1808 yield calls
1809 finally:
1810 os.execv = orig_execv
1811 os.execve = orig_execve
1812 os.defpath = orig_defpath
1813
pxinwrf2d7ac72019-05-21 18:46:37 +08001814@unittest.skipUnless(hasattr(os, 'execv'),
1815 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001816class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001817 @unittest.skipIf(USING_LINUXTHREADS,
1818 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001819 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001820 self.assertRaises(OSError, os.execvpe, 'no such app-',
1821 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001822
Steve Dowerbce26262016-11-19 19:17:26 -08001823 def test_execv_with_bad_arglist(self):
1824 self.assertRaises(ValueError, os.execv, 'notepad', ())
1825 self.assertRaises(ValueError, os.execv, 'notepad', [])
1826 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1827 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1828
Thomas Heller6790d602007-08-30 17:15:14 +00001829 def test_execvpe_with_bad_arglist(self):
1830 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001831 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1832 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001833
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001834 @unittest.skipUnless(hasattr(os, '_execvpe'),
1835 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001836 def _test_internal_execvpe(self, test_type):
1837 program_path = os.sep + 'absolutepath'
1838 if test_type is bytes:
1839 program = b'executable'
1840 fullpath = os.path.join(os.fsencode(program_path), program)
1841 native_fullpath = fullpath
1842 arguments = [b'progname', 'arg1', 'arg2']
1843 else:
1844 program = 'executable'
1845 arguments = ['progname', 'arg1', 'arg2']
1846 fullpath = os.path.join(program_path, program)
1847 if os.name != "nt":
1848 native_fullpath = os.fsencode(fullpath)
1849 else:
1850 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001851 env = {'spam': 'beans'}
1852
Victor Stinnerb745a742010-05-18 17:17:23 +00001853 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001854 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001855 self.assertRaises(RuntimeError,
1856 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001857 self.assertEqual(len(calls), 1)
1858 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1859
Victor Stinnerb745a742010-05-18 17:17:23 +00001860 # test os._execvpe() with a relative path:
1861 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001862 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001863 self.assertRaises(OSError,
1864 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001865 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001866 self.assertSequenceEqual(calls[0],
1867 ('execve', native_fullpath, (arguments, env)))
1868
1869 # test os._execvpe() with a relative path:
1870 # os.get_exec_path() reads the 'PATH' variable
1871 with _execvpe_mockup() as calls:
1872 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001873 if test_type is bytes:
1874 env_path[b'PATH'] = program_path
1875 else:
1876 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001877 self.assertRaises(OSError,
1878 os._execvpe, program, arguments, env=env_path)
1879 self.assertEqual(len(calls), 1)
1880 self.assertSequenceEqual(calls[0],
1881 ('execve', native_fullpath, (arguments, env_path)))
1882
1883 def test_internal_execvpe_str(self):
1884 self._test_internal_execvpe(str)
1885 if os.name != "nt":
1886 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001887
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001888 def test_execve_invalid_env(self):
1889 args = [sys.executable, '-c', 'pass']
1890
Ville Skyttä49b27342017-08-03 09:00:59 +03001891 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001892 newenv = os.environ.copy()
1893 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1894 with self.assertRaises(ValueError):
1895 os.execve(args[0], args, newenv)
1896
Ville Skyttä49b27342017-08-03 09:00:59 +03001897 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001898 newenv = os.environ.copy()
1899 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1900 with self.assertRaises(ValueError):
1901 os.execve(args[0], args, newenv)
1902
Ville Skyttä49b27342017-08-03 09:00:59 +03001903 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001904 newenv = os.environ.copy()
1905 newenv["FRUIT=ORANGE"] = "lemon"
1906 with self.assertRaises(ValueError):
1907 os.execve(args[0], args, newenv)
1908
Alexey Izbyshev83460312018-10-20 03:28:22 +03001909 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1910 def test_execve_with_empty_path(self):
1911 # bpo-32890: Check GetLastError() misuse
1912 try:
1913 os.execve('', ['arg'], {})
1914 except OSError as e:
1915 self.assertTrue(e.winerror is None or e.winerror != 0)
1916 else:
1917 self.fail('No OSError raised')
1918
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001919
Serhiy Storchaka43767632013-11-03 21:31:38 +02001920@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001921class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001922 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001923 try:
1924 os.stat(support.TESTFN)
1925 except FileNotFoundError:
1926 exists = False
1927 except OSError as exc:
1928 exists = True
1929 self.fail("file %s must not exist; os.stat failed with %s"
1930 % (support.TESTFN, exc))
1931 else:
1932 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001933
Thomas Wouters477c8d52006-05-27 19:21:47 +00001934 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001935 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001936
1937 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001938 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001939
1940 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001941 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001942
1943 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001944 self.addCleanup(support.unlink, support.TESTFN)
1945
Victor Stinnere77c9742016-03-25 10:28:23 +01001946 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001947 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001948
1949 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001950 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001951
Thomas Wouters477c8d52006-05-27 19:21:47 +00001952 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001953 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001954
Victor Stinnere77c9742016-03-25 10:28:23 +01001955
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001956class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001957 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001958 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1959 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001960 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001961 def get_single(f):
1962 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001963 if hasattr(os, f):
1964 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001965 return helper
1966 for f in singles:
1967 locals()["test_"+f] = get_single(f)
1968
Benjamin Peterson7522c742009-01-19 21:00:09 +00001969 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001970 try:
1971 f(support.make_bad_fd(), *args)
1972 except OSError as e:
1973 self.assertEqual(e.errno, errno.EBADF)
1974 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001975 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001976 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001977
Serhiy Storchaka43767632013-11-03 21:31:38 +02001978 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001979 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001980 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001981
Serhiy Storchaka43767632013-11-03 21:31:38 +02001982 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001983 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001984 fd = support.make_bad_fd()
1985 # Make sure none of the descriptors we are about to close are
1986 # currently valid (issue 6542).
1987 for i in range(10):
1988 try: os.fstat(fd+i)
1989 except OSError:
1990 pass
1991 else:
1992 break
1993 if i < 2:
1994 raise unittest.SkipTest(
1995 "Unable to acquire a range of invalid file descriptors")
1996 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001997
Serhiy Storchaka43767632013-11-03 21:31:38 +02001998 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001999 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002000 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002001
Serhiy Storchaka43767632013-11-03 21:31:38 +02002002 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002003 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002004 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002007 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002008 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002009
Serhiy Storchaka43767632013-11-03 21:31:38 +02002010 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002011 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002012 self.check(os.pathconf, "PC_NAME_MAX")
2013 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002014
Serhiy Storchaka43767632013-11-03 21:31:38 +02002015 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002016 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002017 self.check(os.truncate, 0)
2018 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002019
Serhiy Storchaka43767632013-11-03 21:31:38 +02002020 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002021 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002022 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002023
Serhiy Storchaka43767632013-11-03 21:31:38 +02002024 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002025 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002026 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002027
Victor Stinner57ddf782014-01-08 15:21:28 +01002028 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2029 def test_readv(self):
2030 buf = bytearray(10)
2031 self.check(os.readv, [buf])
2032
Serhiy Storchaka43767632013-11-03 21:31:38 +02002033 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002034 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002035 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002036
Serhiy Storchaka43767632013-11-03 21:31:38 +02002037 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002038 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002039 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002040
Victor Stinner57ddf782014-01-08 15:21:28 +01002041 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2042 def test_writev(self):
2043 self.check(os.writev, [b'abc'])
2044
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002045 def test_inheritable(self):
2046 self.check(os.get_inheritable)
2047 self.check(os.set_inheritable, True)
2048
2049 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2050 'needs os.get_blocking() and os.set_blocking()')
2051 def test_blocking(self):
2052 self.check(os.get_blocking)
2053 self.check(os.set_blocking, True)
2054
Brian Curtin1b9df392010-11-24 20:24:31 +00002055
2056class LinkTests(unittest.TestCase):
2057 def setUp(self):
2058 self.file1 = support.TESTFN
2059 self.file2 = os.path.join(support.TESTFN + "2")
2060
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002061 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002062 for file in (self.file1, self.file2):
2063 if os.path.exists(file):
2064 os.unlink(file)
2065
Brian Curtin1b9df392010-11-24 20:24:31 +00002066 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002067 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002068
xdegaye6a55d092017-11-12 17:57:04 +01002069 try:
2070 os.link(file1, file2)
2071 except PermissionError as e:
2072 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002073 with open(file1, "r") as f1, open(file2, "r") as f2:
2074 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2075
2076 def test_link(self):
2077 self._test_link(self.file1, self.file2)
2078
2079 def test_link_bytes(self):
2080 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2081 bytes(self.file2, sys.getfilesystemencoding()))
2082
Brian Curtinf498b752010-11-30 15:54:04 +00002083 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002084 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002085 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002086 except UnicodeError:
2087 raise unittest.SkipTest("Unable to encode for this platform.")
2088
Brian Curtinf498b752010-11-30 15:54:04 +00002089 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002090 self.file2 = self.file1 + "2"
2091 self._test_link(self.file1, self.file2)
2092
Serhiy Storchaka43767632013-11-03 21:31:38 +02002093@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2094class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002095 # uid_t and gid_t are 32-bit unsigned integers on Linux
2096 UID_OVERFLOW = (1 << 32)
2097 GID_OVERFLOW = (1 << 32)
2098
Serhiy Storchaka43767632013-11-03 21:31:38 +02002099 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2100 def test_setuid(self):
2101 if os.getuid() != 0:
2102 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002103 self.assertRaises(TypeError, os.setuid, 'not an int')
2104 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002105
Serhiy Storchaka43767632013-11-03 21:31:38 +02002106 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2107 def test_setgid(self):
2108 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2109 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002110 self.assertRaises(TypeError, os.setgid, 'not an int')
2111 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002112
Serhiy Storchaka43767632013-11-03 21:31:38 +02002113 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2114 def test_seteuid(self):
2115 if os.getuid() != 0:
2116 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002117 self.assertRaises(TypeError, os.setegid, 'not an int')
2118 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002119
Serhiy Storchaka43767632013-11-03 21:31:38 +02002120 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2121 def test_setegid(self):
2122 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2123 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002124 self.assertRaises(TypeError, os.setegid, 'not an int')
2125 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002126
Serhiy Storchaka43767632013-11-03 21:31:38 +02002127 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2128 def test_setreuid(self):
2129 if os.getuid() != 0:
2130 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002131 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2132 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2133 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2134 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002135
Serhiy Storchaka43767632013-11-03 21:31:38 +02002136 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2137 def test_setreuid_neg1(self):
2138 # Needs to accept -1. We run this in a subprocess to avoid
2139 # altering the test runner's process state (issue8045).
2140 subprocess.check_call([
2141 sys.executable, '-c',
2142 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002143
Serhiy Storchaka43767632013-11-03 21:31:38 +02002144 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2145 def test_setregid(self):
2146 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2147 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002148 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2149 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2150 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2151 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002152
Serhiy Storchaka43767632013-11-03 21:31:38 +02002153 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2154 def test_setregid_neg1(self):
2155 # Needs to accept -1. We run this in a subprocess to avoid
2156 # altering the test runner's process state (issue8045).
2157 subprocess.check_call([
2158 sys.executable, '-c',
2159 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002160
Serhiy Storchaka43767632013-11-03 21:31:38 +02002161@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2162class Pep383Tests(unittest.TestCase):
2163 def setUp(self):
2164 if support.TESTFN_UNENCODABLE:
2165 self.dir = support.TESTFN_UNENCODABLE
2166 elif support.TESTFN_NONASCII:
2167 self.dir = support.TESTFN_NONASCII
2168 else:
2169 self.dir = support.TESTFN
2170 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002171
Serhiy Storchaka43767632013-11-03 21:31:38 +02002172 bytesfn = []
2173 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002174 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002175 fn = os.fsencode(fn)
2176 except UnicodeEncodeError:
2177 return
2178 bytesfn.append(fn)
2179 add_filename(support.TESTFN_UNICODE)
2180 if support.TESTFN_UNENCODABLE:
2181 add_filename(support.TESTFN_UNENCODABLE)
2182 if support.TESTFN_NONASCII:
2183 add_filename(support.TESTFN_NONASCII)
2184 if not bytesfn:
2185 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002186
Serhiy Storchaka43767632013-11-03 21:31:38 +02002187 self.unicodefn = set()
2188 os.mkdir(self.dir)
2189 try:
2190 for fn in bytesfn:
2191 support.create_empty_file(os.path.join(self.bdir, fn))
2192 fn = os.fsdecode(fn)
2193 if fn in self.unicodefn:
2194 raise ValueError("duplicate filename")
2195 self.unicodefn.add(fn)
2196 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002197 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002198 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002199
Serhiy Storchaka43767632013-11-03 21:31:38 +02002200 def tearDown(self):
2201 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002202
Serhiy Storchaka43767632013-11-03 21:31:38 +02002203 def test_listdir(self):
2204 expected = self.unicodefn
2205 found = set(os.listdir(self.dir))
2206 self.assertEqual(found, expected)
2207 # test listdir without arguments
2208 current_directory = os.getcwd()
2209 try:
2210 os.chdir(os.sep)
2211 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2212 finally:
2213 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002214
Serhiy Storchaka43767632013-11-03 21:31:38 +02002215 def test_open(self):
2216 for fn in self.unicodefn:
2217 f = open(os.path.join(self.dir, fn), 'rb')
2218 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002219
Serhiy Storchaka43767632013-11-03 21:31:38 +02002220 @unittest.skipUnless(hasattr(os, 'statvfs'),
2221 "need os.statvfs()")
2222 def test_statvfs(self):
2223 # issue #9645
2224 for fn in self.unicodefn:
2225 # should not fail with file not found error
2226 fullname = os.path.join(self.dir, fn)
2227 os.statvfs(fullname)
2228
2229 def test_stat(self):
2230 for fn in self.unicodefn:
2231 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002232
Brian Curtineb24d742010-04-12 17:16:38 +00002233@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2234class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002235 def _kill(self, sig):
2236 # Start sys.executable as a subprocess and communicate from the
2237 # subprocess to the parent that the interpreter is ready. When it
2238 # becomes ready, send *sig* via os.kill to the subprocess and check
2239 # that the return code is equal to *sig*.
2240 import ctypes
2241 from ctypes import wintypes
2242 import msvcrt
2243
2244 # Since we can't access the contents of the process' stdout until the
2245 # process has exited, use PeekNamedPipe to see what's inside stdout
2246 # without waiting. This is done so we can tell that the interpreter
2247 # is started and running at a point where it could handle a signal.
2248 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2249 PeekNamedPipe.restype = wintypes.BOOL
2250 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2251 ctypes.POINTER(ctypes.c_char), # stdout buf
2252 wintypes.DWORD, # Buffer size
2253 ctypes.POINTER(wintypes.DWORD), # bytes read
2254 ctypes.POINTER(wintypes.DWORD), # bytes avail
2255 ctypes.POINTER(wintypes.DWORD)) # bytes left
2256 msg = "running"
2257 proc = subprocess.Popen([sys.executable, "-c",
2258 "import sys;"
2259 "sys.stdout.write('{}');"
2260 "sys.stdout.flush();"
2261 "input()".format(msg)],
2262 stdout=subprocess.PIPE,
2263 stderr=subprocess.PIPE,
2264 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002265 self.addCleanup(proc.stdout.close)
2266 self.addCleanup(proc.stderr.close)
2267 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002268
2269 count, max = 0, 100
2270 while count < max and proc.poll() is None:
2271 # Create a string buffer to store the result of stdout from the pipe
2272 buf = ctypes.create_string_buffer(len(msg))
2273 # Obtain the text currently in proc.stdout
2274 # Bytes read/avail/left are left as NULL and unused
2275 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2276 buf, ctypes.sizeof(buf), None, None, None)
2277 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2278 if buf.value:
2279 self.assertEqual(msg, buf.value.decode())
2280 break
2281 time.sleep(0.1)
2282 count += 1
2283 else:
2284 self.fail("Did not receive communication from the subprocess")
2285
Brian Curtineb24d742010-04-12 17:16:38 +00002286 os.kill(proc.pid, sig)
2287 self.assertEqual(proc.wait(), sig)
2288
2289 def test_kill_sigterm(self):
2290 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002291 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002292
2293 def test_kill_int(self):
2294 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002295 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002296
2297 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002298 tagname = "test_os_%s" % uuid.uuid1()
2299 m = mmap.mmap(-1, 1, tagname)
2300 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002301 # Run a script which has console control handling enabled.
2302 proc = subprocess.Popen([sys.executable,
2303 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002304 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002305 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2306 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002307 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002308 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002309 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002310 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002311 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002312 count += 1
2313 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002314 # Forcefully kill the process if we weren't able to signal it.
2315 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002316 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002317 os.kill(proc.pid, event)
2318 # proc.send_signal(event) could also be done here.
2319 # Allow time for the signal to be passed and the process to exit.
2320 time.sleep(0.5)
2321 if not proc.poll():
2322 # Forcefully kill the process if we weren't able to signal it.
2323 os.kill(proc.pid, signal.SIGINT)
2324 self.fail("subprocess did not stop on {}".format(name))
2325
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002326 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002327 def test_CTRL_C_EVENT(self):
2328 from ctypes import wintypes
2329 import ctypes
2330
2331 # Make a NULL value by creating a pointer with no argument.
2332 NULL = ctypes.POINTER(ctypes.c_int)()
2333 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2334 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2335 wintypes.BOOL)
2336 SetConsoleCtrlHandler.restype = wintypes.BOOL
2337
2338 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002339 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002340 # by subprocesses.
2341 SetConsoleCtrlHandler(NULL, 0)
2342
2343 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2344
2345 def test_CTRL_BREAK_EVENT(self):
2346 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2347
2348
Brian Curtind40e6f72010-07-08 21:39:08 +00002349@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002350class Win32ListdirTests(unittest.TestCase):
2351 """Test listdir on Windows."""
2352
2353 def setUp(self):
2354 self.created_paths = []
2355 for i in range(2):
2356 dir_name = 'SUB%d' % i
2357 dir_path = os.path.join(support.TESTFN, dir_name)
2358 file_name = 'FILE%d' % i
2359 file_path = os.path.join(support.TESTFN, file_name)
2360 os.makedirs(dir_path)
2361 with open(file_path, 'w') as f:
2362 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2363 self.created_paths.extend([dir_name, file_name])
2364 self.created_paths.sort()
2365
2366 def tearDown(self):
2367 shutil.rmtree(support.TESTFN)
2368
2369 def test_listdir_no_extended_path(self):
2370 """Test when the path is not an "extended" path."""
2371 # unicode
2372 self.assertEqual(
2373 sorted(os.listdir(support.TESTFN)),
2374 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002375
Tim Golden781bbeb2013-10-25 20:24:06 +01002376 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002377 self.assertEqual(
2378 sorted(os.listdir(os.fsencode(support.TESTFN))),
2379 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002380
2381 def test_listdir_extended_path(self):
2382 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002383 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002384 # unicode
2385 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2386 self.assertEqual(
2387 sorted(os.listdir(path)),
2388 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002389
Tim Golden781bbeb2013-10-25 20:24:06 +01002390 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002391 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2392 self.assertEqual(
2393 sorted(os.listdir(path)),
2394 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002395
2396
Berker Peksage0b5b202018-08-15 13:03:41 +03002397@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2398class ReadlinkTests(unittest.TestCase):
2399 filelink = 'readlinktest'
2400 filelink_target = os.path.abspath(__file__)
2401 filelinkb = os.fsencode(filelink)
2402 filelinkb_target = os.fsencode(filelink_target)
2403
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002404 def assertPathEqual(self, left, right):
2405 left = os.path.normcase(left)
2406 right = os.path.normcase(right)
2407 if sys.platform == 'win32':
2408 # Bad practice to blindly strip the prefix as it may be required to
2409 # correctly refer to the file, but we're only comparing paths here.
2410 has_prefix = lambda p: p.startswith(
2411 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2412 if has_prefix(left):
2413 left = left[4:]
2414 if has_prefix(right):
2415 right = right[4:]
2416 self.assertEqual(left, right)
2417
Berker Peksage0b5b202018-08-15 13:03:41 +03002418 def setUp(self):
2419 self.assertTrue(os.path.exists(self.filelink_target))
2420 self.assertTrue(os.path.exists(self.filelinkb_target))
2421 self.assertFalse(os.path.exists(self.filelink))
2422 self.assertFalse(os.path.exists(self.filelinkb))
2423
2424 def test_not_symlink(self):
2425 filelink_target = FakePath(self.filelink_target)
2426 self.assertRaises(OSError, os.readlink, self.filelink_target)
2427 self.assertRaises(OSError, os.readlink, filelink_target)
2428
2429 def test_missing_link(self):
2430 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2431 self.assertRaises(FileNotFoundError, os.readlink,
2432 FakePath('missing-link'))
2433
2434 @support.skip_unless_symlink
2435 def test_pathlike(self):
2436 os.symlink(self.filelink_target, self.filelink)
2437 self.addCleanup(support.unlink, self.filelink)
2438 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002439 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002440
2441 @support.skip_unless_symlink
2442 def test_pathlike_bytes(self):
2443 os.symlink(self.filelinkb_target, self.filelinkb)
2444 self.addCleanup(support.unlink, self.filelinkb)
2445 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002446 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002447 self.assertIsInstance(path, bytes)
2448
2449 @support.skip_unless_symlink
2450 def test_bytes(self):
2451 os.symlink(self.filelinkb_target, self.filelinkb)
2452 self.addCleanup(support.unlink, self.filelinkb)
2453 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002454 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002455 self.assertIsInstance(path, bytes)
2456
2457
Tim Golden781bbeb2013-10-25 20:24:06 +01002458@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002459@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002460class Win32SymlinkTests(unittest.TestCase):
2461 filelink = 'filelinktest'
2462 filelink_target = os.path.abspath(__file__)
2463 dirlink = 'dirlinktest'
2464 dirlink_target = os.path.dirname(filelink_target)
2465 missing_link = 'missing link'
2466
2467 def setUp(self):
2468 assert os.path.exists(self.dirlink_target)
2469 assert os.path.exists(self.filelink_target)
2470 assert not os.path.exists(self.dirlink)
2471 assert not os.path.exists(self.filelink)
2472 assert not os.path.exists(self.missing_link)
2473
2474 def tearDown(self):
2475 if os.path.exists(self.filelink):
2476 os.remove(self.filelink)
2477 if os.path.exists(self.dirlink):
2478 os.rmdir(self.dirlink)
2479 if os.path.lexists(self.missing_link):
2480 os.remove(self.missing_link)
2481
2482 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002483 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002484 self.assertTrue(os.path.exists(self.dirlink))
2485 self.assertTrue(os.path.isdir(self.dirlink))
2486 self.assertTrue(os.path.islink(self.dirlink))
2487 self.check_stat(self.dirlink, self.dirlink_target)
2488
2489 def test_file_link(self):
2490 os.symlink(self.filelink_target, self.filelink)
2491 self.assertTrue(os.path.exists(self.filelink))
2492 self.assertTrue(os.path.isfile(self.filelink))
2493 self.assertTrue(os.path.islink(self.filelink))
2494 self.check_stat(self.filelink, self.filelink_target)
2495
2496 def _create_missing_dir_link(self):
2497 'Create a "directory" link to a non-existent target'
2498 linkname = self.missing_link
2499 if os.path.lexists(linkname):
2500 os.remove(linkname)
2501 target = r'c:\\target does not exist.29r3c740'
2502 assert not os.path.exists(target)
2503 target_is_dir = True
2504 os.symlink(target, linkname, target_is_dir)
2505
2506 def test_remove_directory_link_to_missing_target(self):
2507 self._create_missing_dir_link()
2508 # For compatibility with Unix, os.remove will check the
2509 # directory status and call RemoveDirectory if the symlink
2510 # was created with target_is_dir==True.
2511 os.remove(self.missing_link)
2512
Brian Curtind40e6f72010-07-08 21:39:08 +00002513 def test_isdir_on_directory_link_to_missing_target(self):
2514 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002515 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002516
Brian Curtind40e6f72010-07-08 21:39:08 +00002517 def test_rmdir_on_directory_link_to_missing_target(self):
2518 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002519 os.rmdir(self.missing_link)
2520
2521 def check_stat(self, link, target):
2522 self.assertEqual(os.stat(link), os.stat(target))
2523 self.assertNotEqual(os.lstat(link), os.stat(link))
2524
Brian Curtind25aef52011-06-13 15:16:04 -05002525 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002526 self.assertEqual(os.stat(bytes_link), os.stat(target))
2527 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002528
2529 def test_12084(self):
2530 level1 = os.path.abspath(support.TESTFN)
2531 level2 = os.path.join(level1, "level2")
2532 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002533 self.addCleanup(support.rmtree, level1)
2534
2535 os.mkdir(level1)
2536 os.mkdir(level2)
2537 os.mkdir(level3)
2538
2539 file1 = os.path.abspath(os.path.join(level1, "file1"))
2540 create_file(file1)
2541
2542 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002543 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002544 os.chdir(level2)
2545 link = os.path.join(level2, "link")
2546 os.symlink(os.path.relpath(file1), "link")
2547 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002548
Victor Stinnerae39d232016-03-24 17:12:55 +01002549 # Check os.stat calls from the same dir as the link
2550 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002551
Victor Stinnerae39d232016-03-24 17:12:55 +01002552 # Check os.stat calls from a dir below the link
2553 os.chdir(level1)
2554 self.assertEqual(os.stat(file1),
2555 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002556
Victor Stinnerae39d232016-03-24 17:12:55 +01002557 # Check os.stat calls from a dir above the link
2558 os.chdir(level3)
2559 self.assertEqual(os.stat(file1),
2560 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002561 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002562 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002563
SSE43c34aad2018-02-13 00:10:35 +07002564 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2565 and os.path.exists(r'C:\ProgramData'),
2566 'Test directories not found')
2567 def test_29248(self):
2568 # os.symlink() calls CreateSymbolicLink, which creates
2569 # the reparse data buffer with the print name stored
2570 # first, so the offset is always 0. CreateSymbolicLink
2571 # stores the "PrintName" DOS path (e.g. "C:\") first,
2572 # with an offset of 0, followed by the "SubstituteName"
2573 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2574 # the other hand, seems to have been created manually
2575 # with an inverted order.
2576 target = os.readlink(r'C:\Users\All Users')
2577 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2578
Steve Dower6921e732018-03-05 14:26:08 -08002579 def test_buffer_overflow(self):
2580 # Older versions would have a buffer overflow when detecting
2581 # whether a link source was a directory. This test ensures we
2582 # no longer crash, but does not otherwise validate the behavior
2583 segment = 'X' * 27
2584 path = os.path.join(*[segment] * 10)
2585 test_cases = [
2586 # overflow with absolute src
2587 ('\\' + path, segment),
2588 # overflow dest with relative src
2589 (segment, path),
2590 # overflow when joining src
2591 (path[:180], path[:180]),
2592 ]
2593 for src, dest in test_cases:
2594 try:
2595 os.symlink(src, dest)
2596 except FileNotFoundError:
2597 pass
2598 else:
2599 try:
2600 os.remove(dest)
2601 except OSError:
2602 pass
2603 # Also test with bytes, since that is a separate code path.
2604 try:
2605 os.symlink(os.fsencode(src), os.fsencode(dest))
2606 except FileNotFoundError:
2607 pass
2608 else:
2609 try:
2610 os.remove(dest)
2611 except OSError:
2612 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002613
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002614 def test_appexeclink(self):
2615 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002616 if not os.path.isdir(root):
2617 self.skipTest("test requires a WindowsApps directory")
2618
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002619 aliases = [os.path.join(root, a)
2620 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2621
2622 for alias in aliases:
2623 if support.verbose:
2624 print()
2625 print("Testing with", alias)
2626 st = os.lstat(alias)
2627 self.assertEqual(st, os.stat(alias))
2628 self.assertFalse(stat.S_ISLNK(st.st_mode))
2629 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2630 # testing the first one we see is sufficient
2631 break
2632 else:
2633 self.skipTest("test requires an app execution alias")
2634
Tim Golden0321cf22014-05-05 19:46:17 +01002635@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2636class Win32JunctionTests(unittest.TestCase):
2637 junction = 'junctiontest'
2638 junction_target = os.path.dirname(os.path.abspath(__file__))
2639
2640 def setUp(self):
2641 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002642 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002643
2644 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002645 if os.path.lexists(self.junction):
2646 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002647
2648 def test_create_junction(self):
2649 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002650 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002651 self.assertTrue(os.path.exists(self.junction))
2652 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002653 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2654 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002655
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002656 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002657 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002658 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2659 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002660
2661 def test_unlink_removes_junction(self):
2662 _winapi.CreateJunction(self.junction_target, self.junction)
2663 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002664 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002665
2666 os.unlink(self.junction)
2667 self.assertFalse(os.path.exists(self.junction))
2668
Mark Becwarb82bfac2019-02-02 16:08:23 -05002669@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2670class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002671 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002672 nt = support.import_module('nt')
2673 ctypes = support.import_module('ctypes')
2674 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002675
2676 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2677 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2678
2679 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2680 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2681 ctypes.wintypes.LPDWORD)
2682
2683 # This is a pseudo-handle that doesn't need to be closed
2684 hproc = kernel.GetCurrentProcess()
2685
2686 handle_count = ctypes.wintypes.DWORD()
2687 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2688 self.assertEqual(1, ok)
2689
2690 before_count = handle_count.value
2691
2692 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002693 filenames = [
2694 r'\\?\C:',
2695 r'\\?\NUL',
2696 r'\\?\CONIN',
2697 __file__,
2698 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002699
Berker Peksag6ef726a2019-04-22 18:46:28 +03002700 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002701 for name in filenames:
2702 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002703 nt._getfinalpathname(name)
2704 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002705 # Failure is expected
2706 pass
2707 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002708 os.stat(name)
2709 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002710 pass
2711
2712 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2713 self.assertEqual(1, ok)
2714
2715 handle_delta = handle_count.value - before_count
2716
2717 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002718
Jason R. Coombs3a092862013-05-27 23:21:28 -04002719@support.skip_unless_symlink
2720class NonLocalSymlinkTests(unittest.TestCase):
2721
2722 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002723 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002724 Create this structure:
2725
2726 base
2727 \___ some_dir
2728 """
2729 os.makedirs('base/some_dir')
2730
2731 def tearDown(self):
2732 shutil.rmtree('base')
2733
2734 def test_directory_link_nonlocal(self):
2735 """
2736 The symlink target should resolve relative to the link, not relative
2737 to the current directory.
2738
2739 Then, link base/some_link -> base/some_dir and ensure that some_link
2740 is resolved as a directory.
2741
2742 In issue13772, it was discovered that directory detection failed if
2743 the symlink target was not specified relative to the current
2744 directory, which was a defect in the implementation.
2745 """
2746 src = os.path.join('base', 'some_link')
2747 os.symlink('some_dir', src)
2748 assert os.path.isdir(src)
2749
2750
Victor Stinnere8d51452010-08-19 01:05:19 +00002751class FSEncodingTests(unittest.TestCase):
2752 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002753 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2754 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002755
Victor Stinnere8d51452010-08-19 01:05:19 +00002756 def test_identity(self):
2757 # assert fsdecode(fsencode(x)) == x
2758 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2759 try:
2760 bytesfn = os.fsencode(fn)
2761 except UnicodeEncodeError:
2762 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002763 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002764
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002765
Brett Cannonefb00c02012-02-29 18:31:31 -05002766
2767class DeviceEncodingTests(unittest.TestCase):
2768
2769 def test_bad_fd(self):
2770 # Return None when an fd doesn't actually exist.
2771 self.assertIsNone(os.device_encoding(123456))
2772
Paul Monson62dfd7d2019-04-25 11:36:45 -07002773 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002774 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002775 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002776 def test_device_encoding(self):
2777 encoding = os.device_encoding(0)
2778 self.assertIsNotNone(encoding)
2779 self.assertTrue(codecs.lookup(encoding))
2780
2781
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002782class PidTests(unittest.TestCase):
2783 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2784 def test_getppid(self):
2785 p = subprocess.Popen([sys.executable, '-c',
2786 'import os; print(os.getppid())'],
2787 stdout=subprocess.PIPE)
2788 stdout, _ = p.communicate()
2789 # We are the parent of our subprocess
2790 self.assertEqual(int(stdout), os.getpid())
2791
Victor Stinner9bee32b2020-04-22 16:30:35 +02002792 def check_waitpid(self, code, exitcode, callback=None):
2793 if sys.platform == 'win32':
2794 # On Windows, os.spawnv() simply joins arguments with spaces:
2795 # arguments need to be quoted
2796 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2797 else:
2798 args = [sys.executable, '-c', code]
2799 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002800
Victor Stinner9bee32b2020-04-22 16:30:35 +02002801 if callback is not None:
2802 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002803
Victor Stinner9bee32b2020-04-22 16:30:35 +02002804 # don't use support.wait_process() to test directly os.waitpid()
2805 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002806 pid2, status = os.waitpid(pid, 0)
2807 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2808 self.assertEqual(pid2, pid)
2809
Victor Stinner9bee32b2020-04-22 16:30:35 +02002810 def test_waitpid(self):
2811 self.check_waitpid(code='pass', exitcode=0)
2812
2813 def test_waitstatus_to_exitcode(self):
2814 exitcode = 23
2815 code = f'import sys; sys.exit({exitcode})'
2816 self.check_waitpid(code, exitcode=exitcode)
2817
2818 with self.assertRaises(TypeError):
2819 os.waitstatus_to_exitcode(0.0)
2820
2821 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2822 def test_waitpid_windows(self):
2823 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2824 # with exit code larger than INT_MAX.
2825 STATUS_CONTROL_C_EXIT = 0xC000013A
2826 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2827 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2828
2829 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2830 def test_waitstatus_to_exitcode_windows(self):
2831 max_exitcode = 2 ** 32 - 1
2832 for exitcode in (0, 1, 5, max_exitcode):
2833 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2834 exitcode)
2835
2836 # invalid values
2837 with self.assertRaises(ValueError):
2838 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2839 with self.assertRaises(OverflowError):
2840 os.waitstatus_to_exitcode(-1)
2841
Victor Stinner65a796e2020-04-01 18:49:29 +02002842 # Skip the test on Windows
2843 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2844 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002845 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002846 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002847
Victor Stinner9bee32b2020-04-22 16:30:35 +02002848 def kill_process(pid):
2849 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002850
Victor Stinner9bee32b2020-04-22 16:30:35 +02002851 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002852
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002853
Victor Stinner4659ccf2016-09-14 10:57:00 +02002854class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002855 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002856 self.exitcode = 17
2857
2858 filename = support.TESTFN
2859 self.addCleanup(support.unlink, filename)
2860
2861 if not with_env:
2862 code = 'import sys; sys.exit(%s)' % self.exitcode
2863 else:
2864 self.env = dict(os.environ)
2865 # create an unique key
2866 self.key = str(uuid.uuid4())
2867 self.env[self.key] = self.key
2868 # read the variable from os.environ to check that it exists
2869 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2870 % (self.key, self.exitcode))
2871
2872 with open(filename, "w") as fp:
2873 fp.write(code)
2874
Berker Peksag81816462016-09-15 20:19:47 +03002875 args = [sys.executable, filename]
2876 if use_bytes:
2877 args = [os.fsencode(a) for a in args]
2878 self.env = {os.fsencode(k): os.fsencode(v)
2879 for k, v in self.env.items()}
2880
2881 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002882
Berker Peksag4af23d72016-09-15 20:32:44 +03002883 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002884 def test_spawnl(self):
2885 args = self.create_args()
2886 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2887 self.assertEqual(exitcode, self.exitcode)
2888
Berker Peksag4af23d72016-09-15 20:32:44 +03002889 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002890 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002891 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002892 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2893 self.assertEqual(exitcode, self.exitcode)
2894
Berker Peksag4af23d72016-09-15 20:32:44 +03002895 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002896 def test_spawnlp(self):
2897 args = self.create_args()
2898 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2899 self.assertEqual(exitcode, self.exitcode)
2900
Berker Peksag4af23d72016-09-15 20:32:44 +03002901 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002902 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002903 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002904 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2905 self.assertEqual(exitcode, self.exitcode)
2906
Berker Peksag4af23d72016-09-15 20:32:44 +03002907 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002908 def test_spawnv(self):
2909 args = self.create_args()
2910 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2911 self.assertEqual(exitcode, self.exitcode)
2912
Victor Stinner9bee32b2020-04-22 16:30:35 +02002913 # Test for PyUnicode_FSConverter()
2914 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
2915 self.assertEqual(exitcode, self.exitcode)
2916
Berker Peksag4af23d72016-09-15 20:32:44 +03002917 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002918 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002919 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002920 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2921 self.assertEqual(exitcode, self.exitcode)
2922
Berker Peksag4af23d72016-09-15 20:32:44 +03002923 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002924 def test_spawnvp(self):
2925 args = self.create_args()
2926 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2927 self.assertEqual(exitcode, self.exitcode)
2928
Berker Peksag4af23d72016-09-15 20:32:44 +03002929 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002930 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002931 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002932 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2933 self.assertEqual(exitcode, self.exitcode)
2934
Berker Peksag4af23d72016-09-15 20:32:44 +03002935 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002936 def test_nowait(self):
2937 args = self.create_args()
2938 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02002939 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002940
Berker Peksag4af23d72016-09-15 20:32:44 +03002941 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002942 def test_spawnve_bytes(self):
2943 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2944 args = self.create_args(with_env=True, use_bytes=True)
2945 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2946 self.assertEqual(exitcode, self.exitcode)
2947
Steve Dower859fd7b2016-11-19 18:53:19 -08002948 @requires_os_func('spawnl')
2949 def test_spawnl_noargs(self):
2950 args = self.create_args()
2951 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002952 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002953
2954 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002955 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002956 args = self.create_args()
2957 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002958 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002959
2960 @requires_os_func('spawnv')
2961 def test_spawnv_noargs(self):
2962 args = self.create_args()
2963 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2964 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002965 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2966 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002967
2968 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002969 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002970 args = self.create_args()
2971 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2972 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002973 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2974 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002975
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002976 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002977 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002978
Ville Skyttä49b27342017-08-03 09:00:59 +03002979 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002980 newenv = os.environ.copy()
2981 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2982 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002983 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002984 except ValueError:
2985 pass
2986 else:
2987 self.assertEqual(exitcode, 127)
2988
Ville Skyttä49b27342017-08-03 09:00:59 +03002989 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002990 newenv = os.environ.copy()
2991 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2992 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002993 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002994 except ValueError:
2995 pass
2996 else:
2997 self.assertEqual(exitcode, 127)
2998
Ville Skyttä49b27342017-08-03 09:00:59 +03002999 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003000 newenv = os.environ.copy()
3001 newenv["FRUIT=ORANGE"] = "lemon"
3002 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003003 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003004 except ValueError:
3005 pass
3006 else:
3007 self.assertEqual(exitcode, 127)
3008
Ville Skyttä49b27342017-08-03 09:00:59 +03003009 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003010 filename = support.TESTFN
3011 self.addCleanup(support.unlink, filename)
3012 with open(filename, "w") as fp:
3013 fp.write('import sys, os\n'
3014 'if os.getenv("FRUIT") != "orange=lemon":\n'
3015 ' raise AssertionError')
3016 args = [sys.executable, filename]
3017 newenv = os.environ.copy()
3018 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003019 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003020 self.assertEqual(exitcode, 0)
3021
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003022 @requires_os_func('spawnve')
3023 def test_spawnve_invalid_env(self):
3024 self._test_invalid_env(os.spawnve)
3025
3026 @requires_os_func('spawnvpe')
3027 def test_spawnvpe_invalid_env(self):
3028 self._test_invalid_env(os.spawnvpe)
3029
Serhiy Storchaka77703942017-06-25 07:33:01 +03003030
Brian Curtin0151b8e2010-09-24 13:43:43 +00003031# The introduction of this TestCase caused at least two different errors on
3032# *nix buildbots. Temporarily skip this to let the buildbots move along.
3033@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003034@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3035class LoginTests(unittest.TestCase):
3036 def test_getlogin(self):
3037 user_name = os.getlogin()
3038 self.assertNotEqual(len(user_name), 0)
3039
3040
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003041@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3042 "needs os.getpriority and os.setpriority")
3043class ProgramPriorityTests(unittest.TestCase):
3044 """Tests for os.getpriority() and os.setpriority()."""
3045
3046 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003047
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003048 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3049 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3050 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003051 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3052 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003053 raise unittest.SkipTest("unable to reliably test setpriority "
3054 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003055 else:
3056 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003057 finally:
3058 try:
3059 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3060 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003061 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003062 raise
3063
3064
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003065class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003066
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003067 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003068
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003069 def __init__(self, conn):
3070 asynchat.async_chat.__init__(self, conn)
3071 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003072 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003073 self.closed = False
3074 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003075
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003076 def handle_read(self):
3077 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003078 if self.accumulate:
3079 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003080
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 def get_data(self):
3082 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003083
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003084 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003085 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003087
3088 def handle_error(self):
3089 raise
3090
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003091 def __init__(self, address):
3092 threading.Thread.__init__(self)
3093 asyncore.dispatcher.__init__(self)
3094 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3095 self.bind(address)
3096 self.listen(5)
3097 self.host, self.port = self.socket.getsockname()[:2]
3098 self.handler_instance = None
3099 self._active = False
3100 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003101
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003102 # --- public API
3103
3104 @property
3105 def running(self):
3106 return self._active
3107
3108 def start(self):
3109 assert not self.running
3110 self.__flag = threading.Event()
3111 threading.Thread.start(self)
3112 self.__flag.wait()
3113
3114 def stop(self):
3115 assert self.running
3116 self._active = False
3117 self.join()
3118
3119 def wait(self):
3120 # wait for handler connection to be closed, then stop the server
3121 while not getattr(self.handler_instance, "closed", False):
3122 time.sleep(0.001)
3123 self.stop()
3124
3125 # --- internals
3126
3127 def run(self):
3128 self._active = True
3129 self.__flag.set()
3130 while self._active and asyncore.socket_map:
3131 self._active_lock.acquire()
3132 asyncore.loop(timeout=0.001, count=1)
3133 self._active_lock.release()
3134 asyncore.close_all()
3135
3136 def handle_accept(self):
3137 conn, addr = self.accept()
3138 self.handler_instance = self.Handler(conn)
3139
3140 def handle_connect(self):
3141 self.close()
3142 handle_read = handle_connect
3143
3144 def writable(self):
3145 return 0
3146
3147 def handle_error(self):
3148 raise
3149
3150
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003151@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3152class TestSendfile(unittest.TestCase):
3153
Victor Stinner8c663fd2017-11-08 14:44:44 -08003154 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003155 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003156 not sys.platform.startswith("solaris") and \
3157 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003158 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3159 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003160 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3161 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003162
3163 @classmethod
3164 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003165 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01003166 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003167
3168 @classmethod
3169 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05003170 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003171 support.unlink(support.TESTFN)
3172
3173 def setUp(self):
3174 self.server = SendfileTestServer((support.HOST, 0))
3175 self.server.start()
3176 self.client = socket.socket()
3177 self.client.connect((self.server.host, self.server.port))
3178 self.client.settimeout(1)
3179 # synchronize by waiting for "220 ready" response
3180 self.client.recv(1024)
3181 self.sockno = self.client.fileno()
3182 self.file = open(support.TESTFN, 'rb')
3183 self.fileno = self.file.fileno()
3184
3185 def tearDown(self):
3186 self.file.close()
3187 self.client.close()
3188 if self.server.running:
3189 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003190 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003191
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003192 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003193 """A higher level wrapper representing how an application is
3194 supposed to use sendfile().
3195 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003196 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003197 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003198 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003199 except OSError as err:
3200 if err.errno == errno.ECONNRESET:
3201 # disconnected
3202 raise
3203 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3204 # we have to retry send data
3205 continue
3206 else:
3207 raise
3208
3209 def test_send_whole_file(self):
3210 # normal send
3211 total_sent = 0
3212 offset = 0
3213 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003214 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003215 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3216 if sent == 0:
3217 break
3218 offset += sent
3219 total_sent += sent
3220 self.assertTrue(sent <= nbytes)
3221 self.assertEqual(offset, total_sent)
3222
3223 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003224 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003225 self.client.close()
3226 self.server.wait()
3227 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003228 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003229 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003230
3231 def test_send_at_certain_offset(self):
3232 # start sending a file at a certain offset
3233 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003234 offset = len(self.DATA) // 2
3235 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003236 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003237 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003238 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3239 if sent == 0:
3240 break
3241 offset += sent
3242 total_sent += sent
3243 self.assertTrue(sent <= nbytes)
3244
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003245 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003246 self.client.close()
3247 self.server.wait()
3248 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003249 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003250 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003251 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003252 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003253
3254 def test_offset_overflow(self):
3255 # specify an offset > file size
3256 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003257 try:
3258 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3259 except OSError as e:
3260 # Solaris can raise EINVAL if offset >= file length, ignore.
3261 if e.errno != errno.EINVAL:
3262 raise
3263 else:
3264 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003265 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003266 self.client.close()
3267 self.server.wait()
3268 data = self.server.handler_instance.get_data()
3269 self.assertEqual(data, b'')
3270
3271 def test_invalid_offset(self):
3272 with self.assertRaises(OSError) as cm:
3273 os.sendfile(self.sockno, self.fileno, -1, 4096)
3274 self.assertEqual(cm.exception.errno, errno.EINVAL)
3275
Martin Panterbf19d162015-09-09 01:01:13 +00003276 def test_keywords(self):
3277 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003278 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3279 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003280 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003281 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3282 offset=0, count=4096,
3283 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003284
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003285 # --- headers / trailers tests
3286
Serhiy Storchaka43767632013-11-03 21:31:38 +02003287 @requires_headers_trailers
3288 def test_headers(self):
3289 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003290 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003291 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003292 headers=[b"x" * 512, b"y" * 256])
3293 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003294 total_sent += sent
3295 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003296 while total_sent < len(expected_data):
3297 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003298 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3299 offset, nbytes)
3300 if sent == 0:
3301 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003302 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003303 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003304 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003305
Serhiy Storchaka43767632013-11-03 21:31:38 +02003306 self.assertEqual(total_sent, len(expected_data))
3307 self.client.close()
3308 self.server.wait()
3309 data = self.server.handler_instance.get_data()
3310 self.assertEqual(hash(data), hash(expected_data))
3311
3312 @requires_headers_trailers
3313 def test_trailers(self):
3314 TESTFN2 = support.TESTFN + "2"
3315 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003316
3317 self.addCleanup(support.unlink, TESTFN2)
3318 create_file(TESTFN2, file_data)
3319
3320 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003321 os.sendfile(self.sockno, f.fileno(), 0, 5,
3322 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003323 self.client.close()
3324 self.server.wait()
3325 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003326 self.assertEqual(data, b"abcde123456789")
3327
3328 @requires_headers_trailers
3329 @requires_32b
3330 def test_headers_overflow_32bits(self):
3331 self.server.handler_instance.accumulate = False
3332 with self.assertRaises(OSError) as cm:
3333 os.sendfile(self.sockno, self.fileno, 0, 0,
3334 headers=[b"x" * 2**16] * 2**15)
3335 self.assertEqual(cm.exception.errno, errno.EINVAL)
3336
3337 @requires_headers_trailers
3338 @requires_32b
3339 def test_trailers_overflow_32bits(self):
3340 self.server.handler_instance.accumulate = False
3341 with self.assertRaises(OSError) as cm:
3342 os.sendfile(self.sockno, self.fileno, 0, 0,
3343 trailers=[b"x" * 2**16] * 2**15)
3344 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003345
Serhiy Storchaka43767632013-11-03 21:31:38 +02003346 @requires_headers_trailers
3347 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3348 'test needs os.SF_NODISKIO')
3349 def test_flags(self):
3350 try:
3351 os.sendfile(self.sockno, self.fileno, 0, 4096,
3352 flags=os.SF_NODISKIO)
3353 except OSError as err:
3354 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3355 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003356
3357
Larry Hastings9cf065c2012-06-22 16:30:09 -07003358def supports_extended_attributes():
3359 if not hasattr(os, "setxattr"):
3360 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003361
Larry Hastings9cf065c2012-06-22 16:30:09 -07003362 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003363 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003364 try:
3365 os.setxattr(fp.fileno(), b"user.test", b"")
3366 except OSError:
3367 return False
3368 finally:
3369 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003370
3371 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003372
3373
3374@unittest.skipUnless(supports_extended_attributes(),
3375 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003376# Kernels < 2.6.39 don't respect setxattr flags.
3377@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003378class ExtendedAttributeTests(unittest.TestCase):
3379
Larry Hastings9cf065c2012-06-22 16:30:09 -07003380 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003381 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003382 self.addCleanup(support.unlink, fn)
3383 create_file(fn)
3384
Benjamin Peterson799bd802011-08-31 22:15:17 -04003385 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003386 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003387 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003388
Victor Stinnerf12e5062011-10-16 22:12:03 +02003389 init_xattr = listxattr(fn)
3390 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003391
Larry Hastings9cf065c2012-06-22 16:30:09 -07003392 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003393 xattr = set(init_xattr)
3394 xattr.add("user.test")
3395 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003396 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3397 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3398 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003399
Benjamin Peterson799bd802011-08-31 22:15:17 -04003400 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003401 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003402 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003403
Benjamin Peterson799bd802011-08-31 22:15:17 -04003404 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003405 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003406 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003407
Larry Hastings9cf065c2012-06-22 16:30:09 -07003408 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003409 xattr.add("user.test2")
3410 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003411 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003412
Benjamin Peterson799bd802011-08-31 22:15:17 -04003413 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003414 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003415 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003416
Victor Stinnerf12e5062011-10-16 22:12:03 +02003417 xattr.remove("user.test")
3418 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003419 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3420 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3421 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3422 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003423 many = sorted("user.test{}".format(i) for i in range(100))
3424 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003425 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003426 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003427
Larry Hastings9cf065c2012-06-22 16:30:09 -07003428 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003429 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003430 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003431
3432 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3433 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003434
3435 def test_simple(self):
3436 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3437 os.listxattr)
3438
3439 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003440 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3441 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003442
3443 def test_fds(self):
3444 def getxattr(path, *args):
3445 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003446 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003447 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003448 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003449 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003450 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003451 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003452 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003453 def listxattr(path, *args):
3454 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003455 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003456 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3457
3458
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003459@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3460class TermsizeTests(unittest.TestCase):
3461 def test_does_not_crash(self):
3462 """Check if get_terminal_size() returns a meaningful value.
3463
3464 There's no easy portable way to actually check the size of the
3465 terminal, so let's check if it returns something sensible instead.
3466 """
3467 try:
3468 size = os.get_terminal_size()
3469 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003470 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003471 # Under win32 a generic OSError can be thrown if the
3472 # handle cannot be retrieved
3473 self.skipTest("failed to query terminal size")
3474 raise
3475
Antoine Pitroucfade362012-02-08 23:48:59 +01003476 self.assertGreaterEqual(size.columns, 0)
3477 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003478
3479 def test_stty_match(self):
3480 """Check if stty returns the same results
3481
3482 stty actually tests stdin, so get_terminal_size is invoked on
3483 stdin explicitly. If stty succeeded, then get_terminal_size()
3484 should work too.
3485 """
3486 try:
3487 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003488 except (FileNotFoundError, subprocess.CalledProcessError,
3489 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003490 self.skipTest("stty invocation failed")
3491 expected = (int(size[1]), int(size[0])) # reversed order
3492
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003493 try:
3494 actual = os.get_terminal_size(sys.__stdin__.fileno())
3495 except OSError as e:
3496 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3497 # Under win32 a generic OSError can be thrown if the
3498 # handle cannot be retrieved
3499 self.skipTest("failed to query terminal size")
3500 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003501 self.assertEqual(expected, actual)
3502
3503
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003504@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003505@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003506class MemfdCreateTests(unittest.TestCase):
3507 def test_memfd_create(self):
3508 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3509 self.assertNotEqual(fd, -1)
3510 self.addCleanup(os.close, fd)
3511 self.assertFalse(os.get_inheritable(fd))
3512 with open(fd, "wb", closefd=False) as f:
3513 f.write(b'memfd_create')
3514 self.assertEqual(f.tell(), 12)
3515
3516 fd2 = os.memfd_create("Hi")
3517 self.addCleanup(os.close, fd2)
3518 self.assertFalse(os.get_inheritable(fd2))
3519
3520
Victor Stinner292c8352012-10-30 02:17:38 +01003521class OSErrorTests(unittest.TestCase):
3522 def setUp(self):
3523 class Str(str):
3524 pass
3525
Victor Stinnerafe17062012-10-31 22:47:43 +01003526 self.bytes_filenames = []
3527 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003528 if support.TESTFN_UNENCODABLE is not None:
3529 decoded = support.TESTFN_UNENCODABLE
3530 else:
3531 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003532 self.unicode_filenames.append(decoded)
3533 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003534 if support.TESTFN_UNDECODABLE is not None:
3535 encoded = support.TESTFN_UNDECODABLE
3536 else:
3537 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003538 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003539 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003540 self.bytes_filenames.append(memoryview(encoded))
3541
3542 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003543
3544 def test_oserror_filename(self):
3545 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003546 (self.filenames, os.chdir,),
3547 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003548 (self.filenames, os.lstat,),
3549 (self.filenames, os.open, os.O_RDONLY),
3550 (self.filenames, os.rmdir,),
3551 (self.filenames, os.stat,),
3552 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003553 ]
3554 if sys.platform == "win32":
3555 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003556 (self.bytes_filenames, os.rename, b"dst"),
3557 (self.bytes_filenames, os.replace, b"dst"),
3558 (self.unicode_filenames, os.rename, "dst"),
3559 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003560 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003561 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003562 else:
3563 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003564 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003565 (self.filenames, os.rename, "dst"),
3566 (self.filenames, os.replace, "dst"),
3567 ))
3568 if hasattr(os, "chown"):
3569 funcs.append((self.filenames, os.chown, 0, 0))
3570 if hasattr(os, "lchown"):
3571 funcs.append((self.filenames, os.lchown, 0, 0))
3572 if hasattr(os, "truncate"):
3573 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003574 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003575 funcs.append((self.filenames, os.chflags, 0))
3576 if hasattr(os, "lchflags"):
3577 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003578 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003579 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003580 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003581 if sys.platform == "win32":
3582 funcs.append((self.bytes_filenames, os.link, b"dst"))
3583 funcs.append((self.unicode_filenames, os.link, "dst"))
3584 else:
3585 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003586 if hasattr(os, "listxattr"):
3587 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003588 (self.filenames, os.listxattr,),
3589 (self.filenames, os.getxattr, "user.test"),
3590 (self.filenames, os.setxattr, "user.test", b'user'),
3591 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003592 ))
3593 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003594 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003595 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003596 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003597
Steve Dowercc16be82016-09-08 10:35:16 -07003598
Victor Stinnerafe17062012-10-31 22:47:43 +01003599 for filenames, func, *func_args in funcs:
3600 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003601 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003602 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003603 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003604 else:
3605 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3606 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003607 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003608 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003609 except UnicodeDecodeError:
3610 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003611 else:
3612 self.fail("No exception thrown by {}".format(func))
3613
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003614class CPUCountTests(unittest.TestCase):
3615 def test_cpu_count(self):
3616 cpus = os.cpu_count()
3617 if cpus is not None:
3618 self.assertIsInstance(cpus, int)
3619 self.assertGreater(cpus, 0)
3620 else:
3621 self.skipTest("Could not determine the number of CPUs")
3622
Victor Stinnerdaf45552013-08-28 00:53:59 +02003623
3624class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003625 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003626 fd = os.open(__file__, os.O_RDONLY)
3627 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003628 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003629
Victor Stinnerdaf45552013-08-28 00:53:59 +02003630 os.set_inheritable(fd, True)
3631 self.assertEqual(os.get_inheritable(fd), True)
3632
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003633 @unittest.skipIf(fcntl is None, "need fcntl")
3634 def test_get_inheritable_cloexec(self):
3635 fd = os.open(__file__, os.O_RDONLY)
3636 self.addCleanup(os.close, fd)
3637 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003638
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003639 # clear FD_CLOEXEC flag
3640 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3641 flags &= ~fcntl.FD_CLOEXEC
3642 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003643
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003644 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003645
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003646 @unittest.skipIf(fcntl is None, "need fcntl")
3647 def test_set_inheritable_cloexec(self):
3648 fd = os.open(__file__, os.O_RDONLY)
3649 self.addCleanup(os.close, fd)
3650 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3651 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003652
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003653 os.set_inheritable(fd, True)
3654 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3655 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003656
Victor Stinnerdaf45552013-08-28 00:53:59 +02003657 def test_open(self):
3658 fd = os.open(__file__, os.O_RDONLY)
3659 self.addCleanup(os.close, fd)
3660 self.assertEqual(os.get_inheritable(fd), False)
3661
3662 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3663 def test_pipe(self):
3664 rfd, wfd = os.pipe()
3665 self.addCleanup(os.close, rfd)
3666 self.addCleanup(os.close, wfd)
3667 self.assertEqual(os.get_inheritable(rfd), False)
3668 self.assertEqual(os.get_inheritable(wfd), False)
3669
3670 def test_dup(self):
3671 fd1 = os.open(__file__, os.O_RDONLY)
3672 self.addCleanup(os.close, fd1)
3673
3674 fd2 = os.dup(fd1)
3675 self.addCleanup(os.close, fd2)
3676 self.assertEqual(os.get_inheritable(fd2), False)
3677
Zackery Spytz5be66602019-08-23 12:38:41 -06003678 def test_dup_standard_stream(self):
3679 fd = os.dup(1)
3680 self.addCleanup(os.close, fd)
3681 self.assertGreater(fd, 0)
3682
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003683 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3684 def test_dup_nul(self):
3685 # os.dup() was creating inheritable fds for character files.
3686 fd1 = os.open('NUL', os.O_RDONLY)
3687 self.addCleanup(os.close, fd1)
3688 fd2 = os.dup(fd1)
3689 self.addCleanup(os.close, fd2)
3690 self.assertFalse(os.get_inheritable(fd2))
3691
Victor Stinnerdaf45552013-08-28 00:53:59 +02003692 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3693 def test_dup2(self):
3694 fd = os.open(__file__, os.O_RDONLY)
3695 self.addCleanup(os.close, fd)
3696
3697 # inheritable by default
3698 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003699 self.addCleanup(os.close, fd2)
3700 self.assertEqual(os.dup2(fd, fd2), fd2)
3701 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003702
3703 # force non-inheritable
3704 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003705 self.addCleanup(os.close, fd3)
3706 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3707 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003708
3709 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3710 def test_openpty(self):
3711 master_fd, slave_fd = os.openpty()
3712 self.addCleanup(os.close, master_fd)
3713 self.addCleanup(os.close, slave_fd)
3714 self.assertEqual(os.get_inheritable(master_fd), False)
3715 self.assertEqual(os.get_inheritable(slave_fd), False)
3716
3717
Brett Cannon3f9183b2016-08-26 14:44:48 -07003718class PathTConverterTests(unittest.TestCase):
3719 # tuples of (function name, allows fd arguments, additional arguments to
3720 # function, cleanup function)
3721 functions = [
3722 ('stat', True, (), None),
3723 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003724 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003725 ('chflags', False, (0,), None),
3726 ('lchflags', False, (0,), None),
3727 ('open', False, (0,), getattr(os, 'close', None)),
3728 ]
3729
3730 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003731 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003732 if os.name == 'nt':
3733 bytes_fspath = bytes_filename = None
3734 else:
3735 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003736 bytes_fspath = FakePath(bytes_filename)
3737 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003738 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003739 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003740
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003741 int_fspath = FakePath(fd)
3742 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003743
3744 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3745 with self.subTest(name=name):
3746 try:
3747 fn = getattr(os, name)
3748 except AttributeError:
3749 continue
3750
Brett Cannon8f96a302016-08-26 19:30:11 -07003751 for path in (str_filename, bytes_filename, str_fspath,
3752 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003753 if path is None:
3754 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003755 with self.subTest(name=name, path=path):
3756 result = fn(path, *extra_args)
3757 if cleanup_fn is not None:
3758 cleanup_fn(result)
3759
3760 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003761 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003762 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003763
3764 if allow_fd:
3765 result = fn(fd, *extra_args) # should not fail
3766 if cleanup_fn is not None:
3767 cleanup_fn(result)
3768 else:
3769 with self.assertRaisesRegex(
3770 TypeError,
3771 'os.PathLike'):
3772 fn(fd, *extra_args)
3773
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003774 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003775 msg = r'__fspath__\(\) to return str or bytes, not %s'
3776 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003777 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003778 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003779 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003780 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003781 os.stat(FakePath(object()))
3782
Brett Cannon3f9183b2016-08-26 14:44:48 -07003783
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003784@unittest.skipUnless(hasattr(os, 'get_blocking'),
3785 'needs os.get_blocking() and os.set_blocking()')
3786class BlockingTests(unittest.TestCase):
3787 def test_blocking(self):
3788 fd = os.open(__file__, os.O_RDONLY)
3789 self.addCleanup(os.close, fd)
3790 self.assertEqual(os.get_blocking(fd), True)
3791
3792 os.set_blocking(fd, False)
3793 self.assertEqual(os.get_blocking(fd), False)
3794
3795 os.set_blocking(fd, True)
3796 self.assertEqual(os.get_blocking(fd), True)
3797
3798
Yury Selivanov97e2e062014-09-26 12:33:06 -04003799
3800class ExportsTests(unittest.TestCase):
3801 def test_os_all(self):
3802 self.assertIn('open', os.__all__)
3803 self.assertIn('walk', os.__all__)
3804
3805
Eddie Elizondob3966632019-11-05 07:16:14 -08003806class TestDirEntry(unittest.TestCase):
3807 def setUp(self):
3808 self.path = os.path.realpath(support.TESTFN)
3809 self.addCleanup(support.rmtree, self.path)
3810 os.mkdir(self.path)
3811
3812 def test_uninstantiable(self):
3813 self.assertRaises(TypeError, os.DirEntry)
3814
3815 def test_unpickable(self):
3816 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3817 entry = [entry for entry in os.scandir(self.path)].pop()
3818 self.assertIsInstance(entry, os.DirEntry)
3819 self.assertEqual(entry.name, "file.txt")
3820 import pickle
3821 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3822
3823
Victor Stinner6036e442015-03-08 01:58:04 +01003824class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003825 check_no_resource_warning = support.check_no_resource_warning
3826
Victor Stinner6036e442015-03-08 01:58:04 +01003827 def setUp(self):
3828 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003829 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003830 self.addCleanup(support.rmtree, self.path)
3831 os.mkdir(self.path)
3832
3833 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003834 path = self.bytes_path if isinstance(name, bytes) else self.path
3835 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003836 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003837 return filename
3838
3839 def get_entries(self, names):
3840 entries = dict((entry.name, entry)
3841 for entry in os.scandir(self.path))
3842 self.assertEqual(sorted(entries.keys()), names)
3843 return entries
3844
3845 def assert_stat_equal(self, stat1, stat2, skip_fields):
3846 if skip_fields:
3847 for attr in dir(stat1):
3848 if not attr.startswith("st_"):
3849 continue
3850 if attr in ("st_dev", "st_ino", "st_nlink"):
3851 continue
3852 self.assertEqual(getattr(stat1, attr),
3853 getattr(stat2, attr),
3854 (stat1, stat2, attr))
3855 else:
3856 self.assertEqual(stat1, stat2)
3857
Eddie Elizondob3966632019-11-05 07:16:14 -08003858 def test_uninstantiable(self):
3859 scandir_iter = os.scandir(self.path)
3860 self.assertRaises(TypeError, type(scandir_iter))
3861 scandir_iter.close()
3862
3863 def test_unpickable(self):
3864 filename = self.create_file("file.txt")
3865 scandir_iter = os.scandir(self.path)
3866 import pickle
3867 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3868 scandir_iter.close()
3869
Victor Stinner6036e442015-03-08 01:58:04 +01003870 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003871 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003872 self.assertEqual(entry.name, name)
3873 self.assertEqual(entry.path, os.path.join(self.path, name))
3874 self.assertEqual(entry.inode(),
3875 os.stat(entry.path, follow_symlinks=False).st_ino)
3876
3877 entry_stat = os.stat(entry.path)
3878 self.assertEqual(entry.is_dir(),
3879 stat.S_ISDIR(entry_stat.st_mode))
3880 self.assertEqual(entry.is_file(),
3881 stat.S_ISREG(entry_stat.st_mode))
3882 self.assertEqual(entry.is_symlink(),
3883 os.path.islink(entry.path))
3884
3885 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3886 self.assertEqual(entry.is_dir(follow_symlinks=False),
3887 stat.S_ISDIR(entry_lstat.st_mode))
3888 self.assertEqual(entry.is_file(follow_symlinks=False),
3889 stat.S_ISREG(entry_lstat.st_mode))
3890
3891 self.assert_stat_equal(entry.stat(),
3892 entry_stat,
3893 os.name == 'nt' and not is_symlink)
3894 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3895 entry_lstat,
3896 os.name == 'nt')
3897
3898 def test_attributes(self):
3899 link = hasattr(os, 'link')
3900 symlink = support.can_symlink()
3901
3902 dirname = os.path.join(self.path, "dir")
3903 os.mkdir(dirname)
3904 filename = self.create_file("file.txt")
3905 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003906 try:
3907 os.link(filename, os.path.join(self.path, "link_file.txt"))
3908 except PermissionError as e:
3909 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003910 if symlink:
3911 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3912 target_is_directory=True)
3913 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3914
3915 names = ['dir', 'file.txt']
3916 if link:
3917 names.append('link_file.txt')
3918 if symlink:
3919 names.extend(('symlink_dir', 'symlink_file.txt'))
3920 entries = self.get_entries(names)
3921
3922 entry = entries['dir']
3923 self.check_entry(entry, 'dir', True, False, False)
3924
3925 entry = entries['file.txt']
3926 self.check_entry(entry, 'file.txt', False, True, False)
3927
3928 if link:
3929 entry = entries['link_file.txt']
3930 self.check_entry(entry, 'link_file.txt', False, True, False)
3931
3932 if symlink:
3933 entry = entries['symlink_dir']
3934 self.check_entry(entry, 'symlink_dir', True, False, True)
3935
3936 entry = entries['symlink_file.txt']
3937 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3938
3939 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003940 path = self.bytes_path if isinstance(name, bytes) else self.path
3941 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003942 self.assertEqual(len(entries), 1)
3943
3944 entry = entries[0]
3945 self.assertEqual(entry.name, name)
3946 return entry
3947
Brett Cannon96881cd2016-06-10 14:37:21 -07003948 def create_file_entry(self, name='file.txt'):
3949 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003950 return self.get_entry(os.path.basename(filename))
3951
3952 def test_current_directory(self):
3953 filename = self.create_file()
3954 old_dir = os.getcwd()
3955 try:
3956 os.chdir(self.path)
3957
3958 # call scandir() without parameter: it must list the content
3959 # of the current directory
3960 entries = dict((entry.name, entry) for entry in os.scandir())
3961 self.assertEqual(sorted(entries.keys()),
3962 [os.path.basename(filename)])
3963 finally:
3964 os.chdir(old_dir)
3965
3966 def test_repr(self):
3967 entry = self.create_file_entry()
3968 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3969
Brett Cannon96881cd2016-06-10 14:37:21 -07003970 def test_fspath_protocol(self):
3971 entry = self.create_file_entry()
3972 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3973
3974 def test_fspath_protocol_bytes(self):
3975 bytes_filename = os.fsencode('bytesfile.txt')
3976 bytes_entry = self.create_file_entry(name=bytes_filename)
3977 fspath = os.fspath(bytes_entry)
3978 self.assertIsInstance(fspath, bytes)
3979 self.assertEqual(fspath,
3980 os.path.join(os.fsencode(self.path),bytes_filename))
3981
Victor Stinner6036e442015-03-08 01:58:04 +01003982 def test_removed_dir(self):
3983 path = os.path.join(self.path, 'dir')
3984
3985 os.mkdir(path)
3986 entry = self.get_entry('dir')
3987 os.rmdir(path)
3988
3989 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3990 if os.name == 'nt':
3991 self.assertTrue(entry.is_dir())
3992 self.assertFalse(entry.is_file())
3993 self.assertFalse(entry.is_symlink())
3994 if os.name == 'nt':
3995 self.assertRaises(FileNotFoundError, entry.inode)
3996 # don't fail
3997 entry.stat()
3998 entry.stat(follow_symlinks=False)
3999 else:
4000 self.assertGreater(entry.inode(), 0)
4001 self.assertRaises(FileNotFoundError, entry.stat)
4002 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4003
4004 def test_removed_file(self):
4005 entry = self.create_file_entry()
4006 os.unlink(entry.path)
4007
4008 self.assertFalse(entry.is_dir())
4009 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4010 if os.name == 'nt':
4011 self.assertTrue(entry.is_file())
4012 self.assertFalse(entry.is_symlink())
4013 if os.name == 'nt':
4014 self.assertRaises(FileNotFoundError, entry.inode)
4015 # don't fail
4016 entry.stat()
4017 entry.stat(follow_symlinks=False)
4018 else:
4019 self.assertGreater(entry.inode(), 0)
4020 self.assertRaises(FileNotFoundError, entry.stat)
4021 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4022
4023 def test_broken_symlink(self):
4024 if not support.can_symlink():
4025 return self.skipTest('cannot create symbolic link')
4026
4027 filename = self.create_file("file.txt")
4028 os.symlink(filename,
4029 os.path.join(self.path, "symlink.txt"))
4030 entries = self.get_entries(['file.txt', 'symlink.txt'])
4031 entry = entries['symlink.txt']
4032 os.unlink(filename)
4033
4034 self.assertGreater(entry.inode(), 0)
4035 self.assertFalse(entry.is_dir())
4036 self.assertFalse(entry.is_file()) # broken symlink returns False
4037 self.assertFalse(entry.is_dir(follow_symlinks=False))
4038 self.assertFalse(entry.is_file(follow_symlinks=False))
4039 self.assertTrue(entry.is_symlink())
4040 self.assertRaises(FileNotFoundError, entry.stat)
4041 # don't fail
4042 entry.stat(follow_symlinks=False)
4043
4044 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004045 self.create_file("file.txt")
4046
4047 path_bytes = os.fsencode(self.path)
4048 entries = list(os.scandir(path_bytes))
4049 self.assertEqual(len(entries), 1, entries)
4050 entry = entries[0]
4051
4052 self.assertEqual(entry.name, b'file.txt')
4053 self.assertEqual(entry.path,
4054 os.fsencode(os.path.join(self.path, 'file.txt')))
4055
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004056 def test_bytes_like(self):
4057 self.create_file("file.txt")
4058
4059 for cls in bytearray, memoryview:
4060 path_bytes = cls(os.fsencode(self.path))
4061 with self.assertWarns(DeprecationWarning):
4062 entries = list(os.scandir(path_bytes))
4063 self.assertEqual(len(entries), 1, entries)
4064 entry = entries[0]
4065
4066 self.assertEqual(entry.name, b'file.txt')
4067 self.assertEqual(entry.path,
4068 os.fsencode(os.path.join(self.path, 'file.txt')))
4069 self.assertIs(type(entry.name), bytes)
4070 self.assertIs(type(entry.path), bytes)
4071
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004072 @unittest.skipUnless(os.listdir in os.supports_fd,
4073 'fd support for listdir required for this test.')
4074 def test_fd(self):
4075 self.assertIn(os.scandir, os.supports_fd)
4076 self.create_file('file.txt')
4077 expected_names = ['file.txt']
4078 if support.can_symlink():
4079 os.symlink('file.txt', os.path.join(self.path, 'link'))
4080 expected_names.append('link')
4081
4082 fd = os.open(self.path, os.O_RDONLY)
4083 try:
4084 with os.scandir(fd) as it:
4085 entries = list(it)
4086 names = [entry.name for entry in entries]
4087 self.assertEqual(sorted(names), expected_names)
4088 self.assertEqual(names, os.listdir(fd))
4089 for entry in entries:
4090 self.assertEqual(entry.path, entry.name)
4091 self.assertEqual(os.fspath(entry), entry.name)
4092 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4093 if os.stat in os.supports_dir_fd:
4094 st = os.stat(entry.name, dir_fd=fd)
4095 self.assertEqual(entry.stat(), st)
4096 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4097 self.assertEqual(entry.stat(follow_symlinks=False), st)
4098 finally:
4099 os.close(fd)
4100
Victor Stinner6036e442015-03-08 01:58:04 +01004101 def test_empty_path(self):
4102 self.assertRaises(FileNotFoundError, os.scandir, '')
4103
4104 def test_consume_iterator_twice(self):
4105 self.create_file("file.txt")
4106 iterator = os.scandir(self.path)
4107
4108 entries = list(iterator)
4109 self.assertEqual(len(entries), 1, entries)
4110
4111 # check than consuming the iterator twice doesn't raise exception
4112 entries2 = list(iterator)
4113 self.assertEqual(len(entries2), 0, entries2)
4114
4115 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004116 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004117 self.assertRaises(TypeError, os.scandir, obj)
4118
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004119 def test_close(self):
4120 self.create_file("file.txt")
4121 self.create_file("file2.txt")
4122 iterator = os.scandir(self.path)
4123 next(iterator)
4124 iterator.close()
4125 # multiple closes
4126 iterator.close()
4127 with self.check_no_resource_warning():
4128 del iterator
4129
4130 def test_context_manager(self):
4131 self.create_file("file.txt")
4132 self.create_file("file2.txt")
4133 with os.scandir(self.path) as iterator:
4134 next(iterator)
4135 with self.check_no_resource_warning():
4136 del iterator
4137
4138 def test_context_manager_close(self):
4139 self.create_file("file.txt")
4140 self.create_file("file2.txt")
4141 with os.scandir(self.path) as iterator:
4142 next(iterator)
4143 iterator.close()
4144
4145 def test_context_manager_exception(self):
4146 self.create_file("file.txt")
4147 self.create_file("file2.txt")
4148 with self.assertRaises(ZeroDivisionError):
4149 with os.scandir(self.path) as iterator:
4150 next(iterator)
4151 1/0
4152 with self.check_no_resource_warning():
4153 del iterator
4154
4155 def test_resource_warning(self):
4156 self.create_file("file.txt")
4157 self.create_file("file2.txt")
4158 iterator = os.scandir(self.path)
4159 next(iterator)
4160 with self.assertWarns(ResourceWarning):
4161 del iterator
4162 support.gc_collect()
4163 # exhausted iterator
4164 iterator = os.scandir(self.path)
4165 list(iterator)
4166 with self.check_no_resource_warning():
4167 del iterator
4168
Victor Stinner6036e442015-03-08 01:58:04 +01004169
Ethan Furmancdc08792016-06-02 15:06:09 -07004170class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004171
4172 # Abstracted so it can be overridden to test pure Python implementation
4173 # if a C version is provided.
4174 fspath = staticmethod(os.fspath)
4175
Ethan Furmancdc08792016-06-02 15:06:09 -07004176 def test_return_bytes(self):
4177 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004178 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004179
4180 def test_return_string(self):
4181 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004182 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004183
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004184 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004185 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004186 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004187
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004188 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004189 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4190 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4191
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004192 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004193 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4194 self.assertTrue(issubclass(FakePath, os.PathLike))
4195 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004196
Ethan Furmancdc08792016-06-02 15:06:09 -07004197 def test_garbage_in_exception_out(self):
4198 vapor = type('blah', (), {})
4199 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004200 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004201
4202 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004203 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004204
Brett Cannon044283a2016-07-15 10:41:49 -07004205 def test_bad_pathlike(self):
4206 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004207 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004208 # __fspath__ attribute that is not callable.
4209 c = type('foo', (), {})
4210 c.__fspath__ = 1
4211 self.assertRaises(TypeError, self.fspath, c())
4212 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004213 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004214 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004215
Bar Hareleae87e32019-12-22 11:57:27 +02004216 def test_pathlike_subclasshook(self):
4217 # bpo-38878: subclasshook causes subclass checks
4218 # true on abstract implementation.
4219 class A(os.PathLike):
4220 pass
4221 self.assertFalse(issubclass(FakePath, A))
4222 self.assertTrue(issubclass(FakePath, os.PathLike))
4223
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004224 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004225 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004226
Victor Stinnerc29b5852017-11-02 07:28:27 -07004227
4228class TimesTests(unittest.TestCase):
4229 def test_times(self):
4230 times = os.times()
4231 self.assertIsInstance(times, os.times_result)
4232
4233 for field in ('user', 'system', 'children_user', 'children_system',
4234 'elapsed'):
4235 value = getattr(times, field)
4236 self.assertIsInstance(value, float)
4237
4238 if os.name == 'nt':
4239 self.assertEqual(times.children_user, 0)
4240 self.assertEqual(times.children_system, 0)
4241 self.assertEqual(times.elapsed, 0)
4242
4243
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004244# Only test if the C version is provided, otherwise TestPEP519 already tested
4245# the pure Python implementation.
4246if hasattr(os, "_fspath"):
4247 class TestPEP519PurePython(TestPEP519):
4248
4249 """Explicitly test the pure Python implementation of os.fspath()."""
4250
4251 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004252
4253
Fred Drake2e2be372001-09-20 21:33:42 +00004254if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004255 unittest.main()