blob: 15fd65b55149e77b37433f5f6148481db1cc00d6 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020012import itertools
13import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000014import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020015import os
16import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020017import shutil
18import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000019import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010020import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020021import subprocess
22import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010023import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020024import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070031from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020037try:
38 import fcntl
39except ImportError:
40 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010041try:
42 import _winapi
43except ImportError:
44 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020045try:
R David Murrayf2ad1732014-12-25 18:36:56 -050046 import pwd
47 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010048except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050049 all_users = []
50try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020051 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020052except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020053 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020054
Berker Peksagce643912015-05-06 06:33:17 +030055from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020056from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000057
Victor Stinner923590e2016-03-24 09:11:48 +010058
R David Murrayf2ad1732014-12-25 18:36:56 -050059root_in_posix = False
60if hasattr(os, 'geteuid'):
61 root_in_posix = (os.geteuid() == 0)
62
Mark Dickinson7cf03892010-04-16 13:45:35 +000063# Detect whether we're on a Linux system that uses the (now outdated
64# and unmaintained) linuxthreads threading library. There's an issue
65# when combining linuxthreads with a failed execv call: see
66# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020067if hasattr(sys, 'thread_info') and sys.thread_info.version:
68 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
69else:
70 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000071
Stefan Krahebee49a2013-01-17 15:31:00 +010072# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
73HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
74
Victor Stinner923590e2016-03-24 09:11:48 +010075
Berker Peksag4af23d72016-09-15 20:32:44 +030076def requires_os_func(name):
77 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
78
79
Victor Stinnerae39d232016-03-24 17:12:55 +010080def create_file(filename, content=b'content'):
81 with open(filename, "xb", 0) as fp:
82 fp.write(content)
83
84
Victor Stinner689830e2019-06-26 17:31:12 +020085class MiscTests(unittest.TestCase):
86 def test_getcwd(self):
87 cwd = os.getcwd()
88 self.assertIsInstance(cwd, str)
89
Victor Stinnerec3e20a2019-06-28 18:01:59 +020090 def test_getcwd_long_path(self):
91 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
92 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
93 # longer path if longer paths support is enabled. Internally, the os
94 # module uses MAXPATHLEN which is at least 1024.
95 #
96 # Use a directory name of 200 characters to fit into Windows MAX_PATH
97 # limit.
98 #
99 # On Windows, the test can stop when trying to create a path longer
100 # than MAX_PATH if long paths support is disabled:
101 # see RtlAreLongPathsEnabled().
102 min_len = 2000 # characters
103 dirlen = 200 # characters
104 dirname = 'python_test_dir_'
105 dirname = dirname + ('a' * (dirlen - len(dirname)))
106
107 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200108 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200109 expected = path
110
111 while True:
112 cwd = os.getcwd()
113 self.assertEqual(cwd, expected)
114
115 need = min_len - (len(cwd) + len(os.path.sep))
116 if need <= 0:
117 break
118 if len(dirname) > need and need > 0:
119 dirname = dirname[:need]
120
121 path = os.path.join(path, dirname)
122 try:
123 os.mkdir(path)
124 # On Windows, chdir() can fail
125 # even if mkdir() succeeded
126 os.chdir(path)
127 except FileNotFoundError:
128 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
129 # ERROR_FILENAME_EXCED_RANGE (206) errors
130 # ("The filename or extension is too long")
131 break
132 except OSError as exc:
133 if exc.errno == errno.ENAMETOOLONG:
134 break
135 else:
136 raise
137
138 expected = path
139
140 if support.verbose:
141 print(f"Tested current directory length: {len(cwd)}")
142
Victor Stinner689830e2019-06-26 17:31:12 +0200143 def test_getcwdb(self):
144 cwd = os.getcwdb()
145 self.assertIsInstance(cwd, bytes)
146 self.assertEqual(os.fsdecode(cwd), os.getcwd())
147
148
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000149# Tests creating TESTFN
150class FileTests(unittest.TestCase):
151 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000152 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000153 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000154 tearDown = setUp
155
156 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000157 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000158 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000159 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000160
Christian Heimesfdab48e2008-01-20 09:06:41 +0000161 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000162 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
163 # We must allocate two consecutive file descriptors, otherwise
164 # it will mess up other file descriptors (perhaps even the three
165 # standard ones).
166 second = os.dup(first)
167 try:
168 retries = 0
169 while second != first + 1:
170 os.close(first)
171 retries += 1
172 if retries > 10:
173 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000174 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000175 first, second = second, os.dup(second)
176 finally:
177 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000178 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000179 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000180 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000181
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000182 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000183 def test_rename(self):
184 path = support.TESTFN
185 old = sys.getrefcount(path)
186 self.assertRaises(TypeError, os.rename, path, 0)
187 new = sys.getrefcount(path)
188 self.assertEqual(old, new)
189
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000190 def test_read(self):
191 with open(support.TESTFN, "w+b") as fobj:
192 fobj.write(b"spam")
193 fobj.flush()
194 fd = fobj.fileno()
195 os.lseek(fd, 0, 0)
196 s = os.read(fd, 4)
197 self.assertEqual(type(s), bytes)
198 self.assertEqual(s, b"spam")
199
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200200 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200201 # Skip the test on 32-bit platforms: the number of bytes must fit in a
202 # Py_ssize_t type
203 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
204 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200205 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
206 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200207 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100208 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200209
210 # Issue #21932: Make sure that os.read() does not raise an
211 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200212 with open(support.TESTFN, "rb") as fp:
213 data = os.read(fp.fileno(), size)
214
Victor Stinner8c663fd2017-11-08 14:44:44 -0800215 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200216 # operating system is free to return less bytes than requested.
217 self.assertEqual(data, b'test')
218
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000219 def test_write(self):
220 # os.write() accepts bytes- and buffer-like objects but not strings
221 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
222 self.assertRaises(TypeError, os.write, fd, "beans")
223 os.write(fd, b"bacon\n")
224 os.write(fd, bytearray(b"eggs\n"))
225 os.write(fd, memoryview(b"spam\n"))
226 os.close(fd)
227 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000228 self.assertEqual(fobj.read().splitlines(),
229 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000230
Victor Stinnere0daff12011-03-20 23:36:35 +0100231 def write_windows_console(self, *args):
232 retcode = subprocess.call(args,
233 # use a new console to not flood the test output
234 creationflags=subprocess.CREATE_NEW_CONSOLE,
235 # use a shell to hide the console window (SW_HIDE)
236 shell=True)
237 self.assertEqual(retcode, 0)
238
239 @unittest.skipUnless(sys.platform == 'win32',
240 'test specific to the Windows console')
241 def test_write_windows_console(self):
242 # Issue #11395: the Windows console returns an error (12: not enough
243 # space error) on writing into stdout if stdout mode is binary and the
244 # length is greater than 66,000 bytes (or less, depending on heap
245 # usage).
246 code = "print('x' * 100000)"
247 self.write_windows_console(sys.executable, "-c", code)
248 self.write_windows_console(sys.executable, "-u", "-c", code)
249
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000250 def fdopen_helper(self, *args):
251 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200252 f = os.fdopen(fd, *args)
253 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000254
255 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
257 os.close(fd)
258
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000259 self.fdopen_helper()
260 self.fdopen_helper('r')
261 self.fdopen_helper('r', 100)
262
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100263 def test_replace(self):
264 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100265 self.addCleanup(support.unlink, support.TESTFN)
266 self.addCleanup(support.unlink, TESTFN2)
267
268 create_file(support.TESTFN, b"1")
269 create_file(TESTFN2, b"2")
270
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100271 os.replace(support.TESTFN, TESTFN2)
272 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
273 with open(TESTFN2, 'r') as f:
274 self.assertEqual(f.read(), "1")
275
Martin Panterbf19d162015-09-09 01:01:13 +0000276 def test_open_keywords(self):
277 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
278 dir_fd=None)
279 os.close(f)
280
281 def test_symlink_keywords(self):
282 symlink = support.get_attribute(os, "symlink")
283 try:
284 symlink(src='target', dst=support.TESTFN,
285 target_is_directory=False, dir_fd=None)
286 except (NotImplementedError, OSError):
287 pass # No OS support or unprivileged user
288
Pablo Galindoaac4d032019-05-31 19:39:47 +0100289 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
290 def test_copy_file_range_invalid_values(self):
291 with self.assertRaises(ValueError):
292 os.copy_file_range(0, 1, -10)
293
294 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
295 def test_copy_file_range(self):
296 TESTFN2 = support.TESTFN + ".3"
297 data = b'0123456789'
298
299 create_file(support.TESTFN, data)
300 self.addCleanup(support.unlink, support.TESTFN)
301
302 in_file = open(support.TESTFN, 'rb')
303 self.addCleanup(in_file.close)
304 in_fd = in_file.fileno()
305
306 out_file = open(TESTFN2, 'w+b')
307 self.addCleanup(support.unlink, TESTFN2)
308 self.addCleanup(out_file.close)
309 out_fd = out_file.fileno()
310
311 try:
312 i = os.copy_file_range(in_fd, out_fd, 5)
313 except OSError as e:
314 # Handle the case in which Python was compiled
315 # in a system with the syscall but without support
316 # in the kernel.
317 if e.errno != errno.ENOSYS:
318 raise
319 self.skipTest(e)
320 else:
321 # The number of copied bytes can be less than
322 # the number of bytes originally requested.
323 self.assertIn(i, range(0, 6));
324
325 with open(TESTFN2, 'rb') as in_file:
326 self.assertEqual(in_file.read(), data[:i])
327
328 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
329 def test_copy_file_range_offset(self):
330 TESTFN4 = support.TESTFN + ".4"
331 data = b'0123456789'
332 bytes_to_copy = 6
333 in_skip = 3
334 out_seek = 5
335
336 create_file(support.TESTFN, data)
337 self.addCleanup(support.unlink, support.TESTFN)
338
339 in_file = open(support.TESTFN, 'rb')
340 self.addCleanup(in_file.close)
341 in_fd = in_file.fileno()
342
343 out_file = open(TESTFN4, 'w+b')
344 self.addCleanup(support.unlink, TESTFN4)
345 self.addCleanup(out_file.close)
346 out_fd = out_file.fileno()
347
348 try:
349 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
350 offset_src=in_skip,
351 offset_dst=out_seek)
352 except OSError as e:
353 # Handle the case in which Python was compiled
354 # in a system with the syscall but without support
355 # in the kernel.
356 if e.errno != errno.ENOSYS:
357 raise
358 self.skipTest(e)
359 else:
360 # The number of copied bytes can be less than
361 # the number of bytes originally requested.
362 self.assertIn(i, range(0, bytes_to_copy+1));
363
364 with open(TESTFN4, 'rb') as in_file:
365 read = in_file.read()
366 # seeked bytes (5) are zero'ed
367 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
368 # 012 are skipped (in_skip)
369 # 345678 are copied in the file (in_skip + bytes_to_copy)
370 self.assertEqual(read[out_seek:],
371 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200372
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000373# Test attributes on return values from os.*stat* family.
374class StatAttributeTests(unittest.TestCase):
375 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200376 self.fname = support.TESTFN
377 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100378 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000379
Antoine Pitrou38425292010-09-21 18:19:07 +0000380 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000382
383 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000384 self.assertEqual(result[stat.ST_SIZE], 3)
385 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387 # Make sure all the attributes are there
388 members = dir(result)
389 for name in dir(stat):
390 if name[:3] == 'ST_':
391 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000392 if name.endswith("TIME"):
393 def trunc(x): return int(x)
394 else:
395 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000396 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000397 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000398 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000399
Larry Hastings6fe20b32012-04-19 15:07:49 -0700400 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700401 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700402 for name in 'st_atime st_mtime st_ctime'.split():
403 floaty = int(getattr(result, name) * 100000)
404 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700405 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700406
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000407 try:
408 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200409 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000410 except IndexError:
411 pass
412
413 # Make sure that assignment fails
414 try:
415 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200416 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000417 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000418 pass
419
420 try:
421 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200422 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000423 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000424 pass
425
426 try:
427 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200428 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000429 except AttributeError:
430 pass
431
432 # Use the stat_result constructor with a too-short tuple.
433 try:
434 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200435 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000436 except TypeError:
437 pass
438
Ezio Melotti42da6632011-03-15 05:18:48 +0200439 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000440 try:
441 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
442 except TypeError:
443 pass
444
Antoine Pitrou38425292010-09-21 18:19:07 +0000445 def test_stat_attributes(self):
446 self.check_stat_attributes(self.fname)
447
448 def test_stat_attributes_bytes(self):
449 try:
450 fname = self.fname.encode(sys.getfilesystemencoding())
451 except UnicodeEncodeError:
452 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700453 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000454
Christian Heimes25827622013-10-12 01:27:08 +0200455 def test_stat_result_pickle(self):
456 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200457 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
458 p = pickle.dumps(result, proto)
459 self.assertIn(b'stat_result', p)
460 if proto < 4:
461 self.assertIn(b'cos\nstat_result\n', p)
462 unpickled = pickle.loads(p)
463 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200464
Serhiy Storchaka43767632013-11-03 21:31:38 +0200465 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000466 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700467 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000468
469 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000470 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000471
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000472 # Make sure all the attributes are there.
473 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
474 'ffree', 'favail', 'flag', 'namemax')
475 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000476 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000477
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100478 self.assertTrue(isinstance(result.f_fsid, int))
479
480 # Test that the size of the tuple doesn't change
481 self.assertEqual(len(result), 10)
482
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000483 # Make sure that assignment really fails
484 try:
485 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200486 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000487 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000488 pass
489
490 try:
491 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200492 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000493 except AttributeError:
494 pass
495
496 # Use the constructor with a too-short tuple.
497 try:
498 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200499 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000500 except TypeError:
501 pass
502
Ezio Melotti42da6632011-03-15 05:18:48 +0200503 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000504 try:
505 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
506 except TypeError:
507 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000508
Christian Heimes25827622013-10-12 01:27:08 +0200509 @unittest.skipUnless(hasattr(os, 'statvfs'),
510 "need os.statvfs()")
511 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700512 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200513
Serhiy Storchakabad12572014-12-15 14:03:42 +0200514 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
515 p = pickle.dumps(result, proto)
516 self.assertIn(b'statvfs_result', p)
517 if proto < 4:
518 self.assertIn(b'cos\nstatvfs_result\n', p)
519 unpickled = pickle.loads(p)
520 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200521
Serhiy Storchaka43767632013-11-03 21:31:38 +0200522 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
523 def test_1686475(self):
524 # Verify that an open file can be stat'ed
525 try:
526 os.stat(r"c:\pagefile.sys")
527 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600528 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200529 except OSError as e:
530 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000531
Serhiy Storchaka43767632013-11-03 21:31:38 +0200532 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
533 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
534 def test_15261(self):
535 # Verify that stat'ing a closed fd does not cause crash
536 r, w = os.pipe()
537 try:
538 os.stat(r) # should not raise error
539 finally:
540 os.close(r)
541 os.close(w)
542 with self.assertRaises(OSError) as ctx:
543 os.stat(r)
544 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100545
Zachary Ware63f277b2014-06-19 09:46:37 -0500546 def check_file_attributes(self, result):
547 self.assertTrue(hasattr(result, 'st_file_attributes'))
548 self.assertTrue(isinstance(result.st_file_attributes, int))
549 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
550
551 @unittest.skipUnless(sys.platform == "win32",
552 "st_file_attributes is Win32 specific")
553 def test_file_attributes(self):
554 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
555 result = os.stat(self.fname)
556 self.check_file_attributes(result)
557 self.assertEqual(
558 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
559 0)
560
561 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200562 dirname = support.TESTFN + "dir"
563 os.mkdir(dirname)
564 self.addCleanup(os.rmdir, dirname)
565
566 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500567 self.check_file_attributes(result)
568 self.assertEqual(
569 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
570 stat.FILE_ATTRIBUTE_DIRECTORY)
571
Berker Peksag0b4dc482016-09-17 15:49:59 +0300572 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
573 def test_access_denied(self):
574 # Default to FindFirstFile WIN32_FIND_DATA when access is
575 # denied. See issue 28075.
576 # os.environ['TEMP'] should be located on a volume that
577 # supports file ACLs.
578 fname = os.path.join(os.environ['TEMP'], self.fname)
579 self.addCleanup(support.unlink, fname)
580 create_file(fname, b'ABC')
581 # Deny the right to [S]YNCHRONIZE on the file to
582 # force CreateFile to fail with ERROR_ACCESS_DENIED.
583 DETACHED_PROCESS = 8
584 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500585 # bpo-30584: Use security identifier *S-1-5-32-545 instead
586 # of localized "Users" to not depend on the locale.
587 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300588 creationflags=DETACHED_PROCESS
589 )
590 result = os.stat(fname)
591 self.assertNotEqual(result.st_size, 0)
592
Victor Stinner47aacc82015-06-12 17:26:23 +0200593
594class UtimeTests(unittest.TestCase):
595 def setUp(self):
596 self.dirname = support.TESTFN
597 self.fname = os.path.join(self.dirname, "f1")
598
599 self.addCleanup(support.rmtree, self.dirname)
600 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100601 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200602
Victor Stinner47aacc82015-06-12 17:26:23 +0200603 def support_subsecond(self, filename):
604 # Heuristic to check if the filesystem supports timestamp with
605 # subsecond resolution: check if float and int timestamps are different
606 st = os.stat(filename)
607 return ((st.st_atime != st[7])
608 or (st.st_mtime != st[8])
609 or (st.st_ctime != st[9]))
610
611 def _test_utime(self, set_time, filename=None):
612 if not filename:
613 filename = self.fname
614
615 support_subsecond = self.support_subsecond(filename)
616 if support_subsecond:
617 # Timestamp with a resolution of 1 microsecond (10^-6).
618 #
619 # The resolution of the C internal function used by os.utime()
620 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
621 # test with a resolution of 1 ns requires more work:
622 # see the issue #15745.
623 atime_ns = 1002003000 # 1.002003 seconds
624 mtime_ns = 4005006000 # 4.005006 seconds
625 else:
626 # use a resolution of 1 second
627 atime_ns = 5 * 10**9
628 mtime_ns = 8 * 10**9
629
630 set_time(filename, (atime_ns, mtime_ns))
631 st = os.stat(filename)
632
633 if support_subsecond:
634 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
635 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
636 else:
637 self.assertEqual(st.st_atime, atime_ns * 1e-9)
638 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
639 self.assertEqual(st.st_atime_ns, atime_ns)
640 self.assertEqual(st.st_mtime_ns, mtime_ns)
641
642 def test_utime(self):
643 def set_time(filename, ns):
644 # test the ns keyword parameter
645 os.utime(filename, ns=ns)
646 self._test_utime(set_time)
647
648 @staticmethod
649 def ns_to_sec(ns):
650 # Convert a number of nanosecond (int) to a number of seconds (float).
651 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
652 # issue, os.utime() rounds towards minus infinity.
653 return (ns * 1e-9) + 0.5e-9
654
655 def test_utime_by_indexed(self):
656 # pass times as floating point seconds as the second indexed parameter
657 def set_time(filename, ns):
658 atime_ns, mtime_ns = ns
659 atime = self.ns_to_sec(atime_ns)
660 mtime = self.ns_to_sec(mtime_ns)
661 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
662 # or utime(time_t)
663 os.utime(filename, (atime, mtime))
664 self._test_utime(set_time)
665
666 def test_utime_by_times(self):
667 def set_time(filename, ns):
668 atime_ns, mtime_ns = ns
669 atime = self.ns_to_sec(atime_ns)
670 mtime = self.ns_to_sec(mtime_ns)
671 # test the times keyword parameter
672 os.utime(filename, times=(atime, mtime))
673 self._test_utime(set_time)
674
675 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
676 "follow_symlinks support for utime required "
677 "for this test.")
678 def test_utime_nofollow_symlinks(self):
679 def set_time(filename, ns):
680 # use follow_symlinks=False to test utimensat(timespec)
681 # or lutimes(timeval)
682 os.utime(filename, ns=ns, follow_symlinks=False)
683 self._test_utime(set_time)
684
685 @unittest.skipUnless(os.utime in os.supports_fd,
686 "fd support for utime required for this test.")
687 def test_utime_fd(self):
688 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100689 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200690 # use a file descriptor to test futimens(timespec)
691 # or futimes(timeval)
692 os.utime(fp.fileno(), ns=ns)
693 self._test_utime(set_time)
694
695 @unittest.skipUnless(os.utime in os.supports_dir_fd,
696 "dir_fd support for utime required for this test.")
697 def test_utime_dir_fd(self):
698 def set_time(filename, ns):
699 dirname, name = os.path.split(filename)
700 dirfd = os.open(dirname, os.O_RDONLY)
701 try:
702 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
703 os.utime(name, dir_fd=dirfd, ns=ns)
704 finally:
705 os.close(dirfd)
706 self._test_utime(set_time)
707
708 def test_utime_directory(self):
709 def set_time(filename, ns):
710 # test calling os.utime() on a directory
711 os.utime(filename, ns=ns)
712 self._test_utime(set_time, filename=self.dirname)
713
714 def _test_utime_current(self, set_time):
715 # Get the system clock
716 current = time.time()
717
718 # Call os.utime() to set the timestamp to the current system clock
719 set_time(self.fname)
720
721 if not self.support_subsecond(self.fname):
722 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700723 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200724 # On Windows, the usual resolution of time.time() is 15.6 ms.
725 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700726 #
727 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
728 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200729 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200730 st = os.stat(self.fname)
731 msg = ("st_time=%r, current=%r, dt=%r"
732 % (st.st_mtime, current, st.st_mtime - current))
733 self.assertAlmostEqual(st.st_mtime, current,
734 delta=delta, msg=msg)
735
736 def test_utime_current(self):
737 def set_time(filename):
738 # Set to the current time in the new way
739 os.utime(self.fname)
740 self._test_utime_current(set_time)
741
742 def test_utime_current_old(self):
743 def set_time(filename):
744 # Set to the current time in the old explicit way.
745 os.utime(self.fname, None)
746 self._test_utime_current(set_time)
747
748 def get_file_system(self, path):
749 if sys.platform == 'win32':
750 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
751 import ctypes
752 kernel32 = ctypes.windll.kernel32
753 buf = ctypes.create_unicode_buffer("", 100)
754 ok = kernel32.GetVolumeInformationW(root, None, 0,
755 None, None, None,
756 buf, len(buf))
757 if ok:
758 return buf.value
759 # return None if the filesystem is unknown
760
761 def test_large_time(self):
762 # Many filesystems are limited to the year 2038. At least, the test
763 # pass with NTFS filesystem.
764 if self.get_file_system(self.dirname) != "NTFS":
765 self.skipTest("requires NTFS")
766
767 large = 5000000000 # some day in 2128
768 os.utime(self.fname, (large, large))
769 self.assertEqual(os.stat(self.fname).st_mtime, large)
770
771 def test_utime_invalid_arguments(self):
772 # seconds and nanoseconds parameters are mutually exclusive
773 with self.assertRaises(ValueError):
774 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200775 with self.assertRaises(TypeError):
776 os.utime(self.fname, [5, 5])
777 with self.assertRaises(TypeError):
778 os.utime(self.fname, (5,))
779 with self.assertRaises(TypeError):
780 os.utime(self.fname, (5, 5, 5))
781 with self.assertRaises(TypeError):
782 os.utime(self.fname, ns=[5, 5])
783 with self.assertRaises(TypeError):
784 os.utime(self.fname, ns=(5,))
785 with self.assertRaises(TypeError):
786 os.utime(self.fname, ns=(5, 5, 5))
787
788 if os.utime not in os.supports_follow_symlinks:
789 with self.assertRaises(NotImplementedError):
790 os.utime(self.fname, (5, 5), follow_symlinks=False)
791 if os.utime not in os.supports_fd:
792 with open(self.fname, 'wb', 0) as fp:
793 with self.assertRaises(TypeError):
794 os.utime(fp.fileno(), (5, 5))
795 if os.utime not in os.supports_dir_fd:
796 with self.assertRaises(NotImplementedError):
797 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200798
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300799 @support.cpython_only
800 def test_issue31577(self):
801 # The interpreter shouldn't crash in case utime() received a bad
802 # ns argument.
803 def get_bad_int(divmod_ret_val):
804 class BadInt:
805 def __divmod__(*args):
806 return divmod_ret_val
807 return BadInt()
808 with self.assertRaises(TypeError):
809 os.utime(self.fname, ns=(get_bad_int(42), 1))
810 with self.assertRaises(TypeError):
811 os.utime(self.fname, ns=(get_bad_int(()), 1))
812 with self.assertRaises(TypeError):
813 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
814
Victor Stinner47aacc82015-06-12 17:26:23 +0200815
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000816from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000817
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000818class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000819 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000820 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000821
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000822 def setUp(self):
823 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000824 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000825 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000826 for key, value in self._reference().items():
827 os.environ[key] = value
828
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000829 def tearDown(self):
830 os.environ.clear()
831 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000832 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000833 os.environb.clear()
834 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000835
Christian Heimes90333392007-11-01 19:08:42 +0000836 def _reference(self):
837 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
838
839 def _empty_mapping(self):
840 os.environ.clear()
841 return os.environ
842
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000843 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200844 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
845 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000846 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000847 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300848 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200849 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300850 value = popen.read().strip()
851 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000852
Xavier de Gayed1415312016-07-22 12:15:29 +0200853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000855 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200856 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
857 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300858 it = iter(popen)
859 self.assertEqual(next(it), "line1\n")
860 self.assertEqual(next(it), "line2\n")
861 self.assertEqual(next(it), "line3\n")
862 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000863
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000864 # Verify environ keys and values from the OS are of the
865 # correct str type.
866 def test_keyvalue_types(self):
867 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000868 self.assertEqual(type(key), str)
869 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000870
Christian Heimes90333392007-11-01 19:08:42 +0000871 def test_items(self):
872 for key, value in self._reference().items():
873 self.assertEqual(os.environ.get(key), value)
874
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000875 # Issue 7310
876 def test___repr__(self):
877 """Check that the repr() of os.environ looks like environ({...})."""
878 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000879 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
880 '{!r}: {!r}'.format(key, value)
881 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000882
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000883 def test_get_exec_path(self):
884 defpath_list = os.defpath.split(os.pathsep)
885 test_path = ['/monty', '/python', '', '/flying/circus']
886 test_env = {'PATH': os.pathsep.join(test_path)}
887
888 saved_environ = os.environ
889 try:
890 os.environ = dict(test_env)
891 # Test that defaulting to os.environ works.
892 self.assertSequenceEqual(test_path, os.get_exec_path())
893 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
894 finally:
895 os.environ = saved_environ
896
897 # No PATH environment variable
898 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
899 # Empty PATH environment variable
900 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
901 # Supplied PATH environment variable
902 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
903
Victor Stinnerb745a742010-05-18 17:17:23 +0000904 if os.supports_bytes_environ:
905 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000906 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000907 # ignore BytesWarning warning
908 with warnings.catch_warnings(record=True):
909 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000910 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000911 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000912 pass
913 else:
914 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000915
916 # bytes key and/or value
917 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
918 ['abc'])
919 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
920 ['abc'])
921 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
922 ['abc'])
923
924 @unittest.skipUnless(os.supports_bytes_environ,
925 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000926 def test_environb(self):
927 # os.environ -> os.environb
928 value = 'euro\u20ac'
929 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000930 value_bytes = value.encode(sys.getfilesystemencoding(),
931 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000932 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000933 msg = "U+20AC character is not encodable to %s" % (
934 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000935 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000936 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(os.environ['unicode'], value)
938 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000939
940 # os.environb -> os.environ
941 value = b'\xff'
942 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000943 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000944 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000945 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000946
Victor Stinner13ff2452018-01-22 18:32:50 +0100947 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100948 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100949 def test_unset_error(self):
950 if sys.platform == "win32":
951 # an environment variable is limited to 32,767 characters
952 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100953 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100954 else:
955 # "=" is not allowed in a variable name
956 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100957 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100958
Victor Stinner6d101392013-04-14 16:35:04 +0200959 def test_key_type(self):
960 missing = 'missingkey'
961 self.assertNotIn(missing, os.environ)
962
Victor Stinner839e5ea2013-04-14 16:43:03 +0200963 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200964 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200965 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200966 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200967
Victor Stinner839e5ea2013-04-14 16:43:03 +0200968 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200969 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200970 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200971 self.assertTrue(cm.exception.__suppress_context__)
972
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300973 def _test_environ_iteration(self, collection):
974 iterator = iter(collection)
975 new_key = "__new_key__"
976
977 next(iterator) # start iteration over os.environ.items
978
979 # add a new key in os.environ mapping
980 os.environ[new_key] = "test_environ_iteration"
981
982 try:
983 next(iterator) # force iteration over modified mapping
984 self.assertEqual(os.environ[new_key], "test_environ_iteration")
985 finally:
986 del os.environ[new_key]
987
988 def test_iter_error_when_changing_os_environ(self):
989 self._test_environ_iteration(os.environ)
990
991 def test_iter_error_when_changing_os_environ_items(self):
992 self._test_environ_iteration(os.environ.items())
993
994 def test_iter_error_when_changing_os_environ_values(self):
995 self._test_environ_iteration(os.environ.values())
996
Victor Stinner6d101392013-04-14 16:35:04 +0200997
Tim Petersc4e09402003-04-25 07:11:48 +0000998class WalkTests(unittest.TestCase):
999 """Tests for os.walk()."""
1000
Victor Stinner0561c532015-03-12 10:28:24 +01001001 # Wrapper to hide minor differences between os.walk and os.fwalk
1002 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001003 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001004 if 'follow_symlinks' in kwargs:
1005 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001006 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001007
Charles-François Natali7372b062012-02-05 15:15:38 +01001008 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001009 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001010 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001011
1012 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001013 # TESTFN/
1014 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001015 # tmp1
1016 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001017 # tmp2
1018 # SUB11/ no kids
1019 # SUB2/ a file kid and a dirsymlink kid
1020 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001021 # SUB21/ not readable
1022 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001023 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001024 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001025 # broken_link2
1026 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001027 # TEST2/
1028 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001029 self.walk_path = join(support.TESTFN, "TEST1")
1030 self.sub1_path = join(self.walk_path, "SUB1")
1031 self.sub11_path = join(self.sub1_path, "SUB11")
1032 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001033 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001034 tmp1_path = join(self.walk_path, "tmp1")
1035 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001036 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001037 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001038 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001039 t2_path = join(support.TESTFN, "TEST2")
1040 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001041 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001042 broken_link2_path = join(sub2_path, "broken_link2")
1043 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001044
1045 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001046 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001047 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001048 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001049 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001050
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001051 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001052 with open(path, "x") as f:
1053 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001054
Victor Stinner0561c532015-03-12 10:28:24 +01001055 if support.can_symlink():
1056 os.symlink(os.path.abspath(t2_path), self.link_path)
1057 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001058 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1059 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001060 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001061 ["broken_link", "broken_link2", "broken_link3",
1062 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001063 else:
pxinwr3e028b22019-02-15 13:04:47 +08001064 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001065
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001066 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001067 try:
1068 os.listdir(sub21_path)
1069 except PermissionError:
1070 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1071 else:
1072 os.chmod(sub21_path, stat.S_IRWXU)
1073 os.unlink(tmp5_path)
1074 os.rmdir(sub21_path)
1075 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001076
Victor Stinner0561c532015-03-12 10:28:24 +01001077 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001078 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001079 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001080
Tim Petersc4e09402003-04-25 07:11:48 +00001081 self.assertEqual(len(all), 4)
1082 # We can't know which order SUB1 and SUB2 will appear in.
1083 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1084 # flipped: TESTFN, SUB2, SUB1, SUB11
1085 flipped = all[0][1][0] != "SUB1"
1086 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001087 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001088 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001089 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1090 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1091 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1092 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001093
Brett Cannon3f9183b2016-08-26 14:44:48 -07001094 def test_walk_prune(self, walk_path=None):
1095 if walk_path is None:
1096 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001097 # Prune the search.
1098 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001099 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001100 all.append((root, dirs, files))
1101 # Don't descend into SUB1.
1102 if 'SUB1' in dirs:
1103 # Note that this also mutates the dirs we appended to all!
1104 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001105
Victor Stinner0561c532015-03-12 10:28:24 +01001106 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001107 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001108
1109 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001110 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001111 self.assertEqual(all[1], self.sub2_tree)
1112
Brett Cannon3f9183b2016-08-26 14:44:48 -07001113 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001114 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001115
Victor Stinner0561c532015-03-12 10:28:24 +01001116 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001117 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001118 all = list(self.walk(self.walk_path, topdown=False))
1119
Victor Stinner53b0a412016-03-26 01:12:36 +01001120 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001121 # We can't know which order SUB1 and SUB2 will appear in.
1122 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1123 # flipped: SUB2, SUB11, SUB1, TESTFN
1124 flipped = all[3][1][0] != "SUB1"
1125 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001126 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001127 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001128 self.assertEqual(all[3],
1129 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1130 self.assertEqual(all[flipped],
1131 (self.sub11_path, [], []))
1132 self.assertEqual(all[flipped + 1],
1133 (self.sub1_path, ["SUB11"], ["tmp2"]))
1134 self.assertEqual(all[2 - 2 * flipped],
1135 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001136
Victor Stinner0561c532015-03-12 10:28:24 +01001137 def test_walk_symlink(self):
1138 if not support.can_symlink():
1139 self.skipTest("need symlink support")
1140
1141 # Walk, following symlinks.
1142 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1143 for root, dirs, files in walk_it:
1144 if root == self.link_path:
1145 self.assertEqual(dirs, [])
1146 self.assertEqual(files, ["tmp4"])
1147 break
1148 else:
1149 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001150
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001151 def test_walk_bad_dir(self):
1152 # Walk top-down.
1153 errors = []
1154 walk_it = self.walk(self.walk_path, onerror=errors.append)
1155 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001156 self.assertEqual(errors, [])
1157 dir1 = 'SUB1'
1158 path1 = os.path.join(root, dir1)
1159 path1new = os.path.join(root, dir1 + '.new')
1160 os.rename(path1, path1new)
1161 try:
1162 roots = [r for r, d, f in walk_it]
1163 self.assertTrue(errors)
1164 self.assertNotIn(path1, roots)
1165 self.assertNotIn(path1new, roots)
1166 for dir2 in dirs:
1167 if dir2 != dir1:
1168 self.assertIn(os.path.join(root, dir2), roots)
1169 finally:
1170 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001171
Charles-François Natali7372b062012-02-05 15:15:38 +01001172
1173@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1174class FwalkTests(WalkTests):
1175 """Tests for os.fwalk()."""
1176
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001177 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001178 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001179 yield (root, dirs, files)
1180
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001181 def fwalk(self, *args, **kwargs):
1182 return os.fwalk(*args, **kwargs)
1183
Larry Hastingsc48fe982012-06-25 04:49:05 -07001184 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1185 """
1186 compare with walk() results.
1187 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001188 walk_kwargs = walk_kwargs.copy()
1189 fwalk_kwargs = fwalk_kwargs.copy()
1190 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1191 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1192 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001193
Charles-François Natali7372b062012-02-05 15:15:38 +01001194 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001195 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001196 expected[root] = (set(dirs), set(files))
1197
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001198 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001199 self.assertIn(root, expected)
1200 self.assertEqual(expected[root], (set(dirs), set(files)))
1201
Larry Hastingsc48fe982012-06-25 04:49:05 -07001202 def test_compare_to_walk(self):
1203 kwargs = {'top': support.TESTFN}
1204 self._compare_to_walk(kwargs, kwargs)
1205
Charles-François Natali7372b062012-02-05 15:15:38 +01001206 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001207 try:
1208 fd = os.open(".", os.O_RDONLY)
1209 walk_kwargs = {'top': support.TESTFN}
1210 fwalk_kwargs = walk_kwargs.copy()
1211 fwalk_kwargs['dir_fd'] = fd
1212 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1213 finally:
1214 os.close(fd)
1215
1216 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001217 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001218 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1219 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001220 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001221 # check that the FD is valid
1222 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001223 # redundant check
1224 os.stat(rootfd)
1225 # check that listdir() returns consistent information
1226 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001227
1228 def test_fd_leak(self):
1229 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1230 # we both check that calling fwalk() a large number of times doesn't
1231 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1232 minfd = os.dup(1)
1233 os.close(minfd)
1234 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001235 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001236 pass
1237 newfd = os.dup(1)
1238 self.addCleanup(os.close, newfd)
1239 self.assertEqual(newfd, minfd)
1240
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001241class BytesWalkTests(WalkTests):
1242 """Tests for os.walk() with bytes."""
1243 def walk(self, top, **kwargs):
1244 if 'follow_symlinks' in kwargs:
1245 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1246 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1247 root = os.fsdecode(broot)
1248 dirs = list(map(os.fsdecode, bdirs))
1249 files = list(map(os.fsdecode, bfiles))
1250 yield (root, dirs, files)
1251 bdirs[:] = list(map(os.fsencode, dirs))
1252 bfiles[:] = list(map(os.fsencode, files))
1253
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001254@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1255class BytesFwalkTests(FwalkTests):
1256 """Tests for os.walk() with bytes."""
1257 def fwalk(self, top='.', *args, **kwargs):
1258 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1259 root = os.fsdecode(broot)
1260 dirs = list(map(os.fsdecode, bdirs))
1261 files = list(map(os.fsdecode, bfiles))
1262 yield (root, dirs, files, topfd)
1263 bdirs[:] = list(map(os.fsencode, dirs))
1264 bfiles[:] = list(map(os.fsencode, files))
1265
Charles-François Natali7372b062012-02-05 15:15:38 +01001266
Guido van Rossume7ba4952007-06-06 23:52:48 +00001267class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001269 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001270
1271 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001272 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001273 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1274 os.makedirs(path) # Should work
1275 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1276 os.makedirs(path)
1277
1278 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001279 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001280 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1281 os.makedirs(path)
1282 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1283 'dir5', 'dir6')
1284 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001285
Serhiy Storchakae304e332017-03-24 13:27:42 +02001286 def test_mode(self):
1287 with support.temp_umask(0o002):
1288 base = support.TESTFN
1289 parent = os.path.join(base, 'dir1')
1290 path = os.path.join(parent, 'dir2')
1291 os.makedirs(path, 0o555)
1292 self.assertTrue(os.path.exists(path))
1293 self.assertTrue(os.path.isdir(path))
1294 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001295 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1296 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001297
Terry Reedy5a22b652010-12-02 07:05:56 +00001298 def test_exist_ok_existing_directory(self):
1299 path = os.path.join(support.TESTFN, 'dir1')
1300 mode = 0o777
1301 old_mask = os.umask(0o022)
1302 os.makedirs(path, mode)
1303 self.assertRaises(OSError, os.makedirs, path, mode)
1304 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001305 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001306 os.makedirs(path, mode=mode, exist_ok=True)
1307 os.umask(old_mask)
1308
Martin Pantera82642f2015-11-19 04:48:44 +00001309 # Issue #25583: A drive root could raise PermissionError on Windows
1310 os.makedirs(os.path.abspath('/'), exist_ok=True)
1311
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001312 def test_exist_ok_s_isgid_directory(self):
1313 path = os.path.join(support.TESTFN, 'dir1')
1314 S_ISGID = stat.S_ISGID
1315 mode = 0o777
1316 old_mask = os.umask(0o022)
1317 try:
1318 existing_testfn_mode = stat.S_IMODE(
1319 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001320 try:
1321 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001322 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001323 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001324 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1325 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1326 # The os should apply S_ISGID from the parent dir for us, but
1327 # this test need not depend on that behavior. Be explicit.
1328 os.makedirs(path, mode | S_ISGID)
1329 # http://bugs.python.org/issue14992
1330 # Should not fail when the bit is already set.
1331 os.makedirs(path, mode, exist_ok=True)
1332 # remove the bit.
1333 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001334 # May work even when the bit is not already set when demanded.
1335 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001336 finally:
1337 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001338
1339 def test_exist_ok_existing_regular_file(self):
1340 base = support.TESTFN
1341 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001342 with open(path, 'w') as f:
1343 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001344 self.assertRaises(OSError, os.makedirs, path)
1345 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1346 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1347 os.remove(path)
1348
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001349 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001350 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001351 'dir4', 'dir5', 'dir6')
1352 # If the tests failed, the bottom-most directory ('../dir6')
1353 # may not have been created, so we look for the outermost directory
1354 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001355 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001356 path = os.path.dirname(path)
1357
1358 os.removedirs(path)
1359
Andrew Svetlov405faed2012-12-25 12:18:09 +02001360
R David Murrayf2ad1732014-12-25 18:36:56 -05001361@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1362class ChownFileTests(unittest.TestCase):
1363
Berker Peksag036a71b2015-07-21 09:29:48 +03001364 @classmethod
1365 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001366 os.mkdir(support.TESTFN)
1367
1368 def test_chown_uid_gid_arguments_must_be_index(self):
1369 stat = os.stat(support.TESTFN)
1370 uid = stat.st_uid
1371 gid = stat.st_gid
1372 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1373 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1374 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1375 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1376 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1377
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001378 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1379 def test_chown_gid(self):
1380 groups = os.getgroups()
1381 if len(groups) < 2:
1382 self.skipTest("test needs at least 2 groups")
1383
R David Murrayf2ad1732014-12-25 18:36:56 -05001384 gid_1, gid_2 = groups[:2]
1385 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001386
R David Murrayf2ad1732014-12-25 18:36:56 -05001387 os.chown(support.TESTFN, uid, gid_1)
1388 gid = os.stat(support.TESTFN).st_gid
1389 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001390
R David Murrayf2ad1732014-12-25 18:36:56 -05001391 os.chown(support.TESTFN, uid, gid_2)
1392 gid = os.stat(support.TESTFN).st_gid
1393 self.assertEqual(gid, gid_2)
1394
1395 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1396 "test needs root privilege and more than one user")
1397 def test_chown_with_root(self):
1398 uid_1, uid_2 = all_users[:2]
1399 gid = os.stat(support.TESTFN).st_gid
1400 os.chown(support.TESTFN, uid_1, gid)
1401 uid = os.stat(support.TESTFN).st_uid
1402 self.assertEqual(uid, uid_1)
1403 os.chown(support.TESTFN, uid_2, gid)
1404 uid = os.stat(support.TESTFN).st_uid
1405 self.assertEqual(uid, uid_2)
1406
1407 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1408 "test needs non-root account and more than one user")
1409 def test_chown_without_permission(self):
1410 uid_1, uid_2 = all_users[:2]
1411 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001412 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001413 os.chown(support.TESTFN, uid_1, gid)
1414 os.chown(support.TESTFN, uid_2, gid)
1415
Berker Peksag036a71b2015-07-21 09:29:48 +03001416 @classmethod
1417 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001418 os.rmdir(support.TESTFN)
1419
1420
Andrew Svetlov405faed2012-12-25 12:18:09 +02001421class RemoveDirsTests(unittest.TestCase):
1422 def setUp(self):
1423 os.makedirs(support.TESTFN)
1424
1425 def tearDown(self):
1426 support.rmtree(support.TESTFN)
1427
1428 def test_remove_all(self):
1429 dira = os.path.join(support.TESTFN, 'dira')
1430 os.mkdir(dira)
1431 dirb = os.path.join(dira, 'dirb')
1432 os.mkdir(dirb)
1433 os.removedirs(dirb)
1434 self.assertFalse(os.path.exists(dirb))
1435 self.assertFalse(os.path.exists(dira))
1436 self.assertFalse(os.path.exists(support.TESTFN))
1437
1438 def test_remove_partial(self):
1439 dira = os.path.join(support.TESTFN, 'dira')
1440 os.mkdir(dira)
1441 dirb = os.path.join(dira, 'dirb')
1442 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001443 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001444 os.removedirs(dirb)
1445 self.assertFalse(os.path.exists(dirb))
1446 self.assertTrue(os.path.exists(dira))
1447 self.assertTrue(os.path.exists(support.TESTFN))
1448
1449 def test_remove_nothing(self):
1450 dira = os.path.join(support.TESTFN, 'dira')
1451 os.mkdir(dira)
1452 dirb = os.path.join(dira, 'dirb')
1453 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001454 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001455 with self.assertRaises(OSError):
1456 os.removedirs(dirb)
1457 self.assertTrue(os.path.exists(dirb))
1458 self.assertTrue(os.path.exists(dira))
1459 self.assertTrue(os.path.exists(support.TESTFN))
1460
1461
Guido van Rossume7ba4952007-06-06 23:52:48 +00001462class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001463 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001464 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001465 f.write(b'hello')
1466 f.close()
1467 with open(os.devnull, 'rb') as f:
1468 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001469
Andrew Svetlov405faed2012-12-25 12:18:09 +02001470
Guido van Rossume7ba4952007-06-06 23:52:48 +00001471class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001472 def test_urandom_length(self):
1473 self.assertEqual(len(os.urandom(0)), 0)
1474 self.assertEqual(len(os.urandom(1)), 1)
1475 self.assertEqual(len(os.urandom(10)), 10)
1476 self.assertEqual(len(os.urandom(100)), 100)
1477 self.assertEqual(len(os.urandom(1000)), 1000)
1478
1479 def test_urandom_value(self):
1480 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001481 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001482 data2 = os.urandom(16)
1483 self.assertNotEqual(data1, data2)
1484
1485 def get_urandom_subprocess(self, count):
1486 code = '\n'.join((
1487 'import os, sys',
1488 'data = os.urandom(%s)' % count,
1489 'sys.stdout.buffer.write(data)',
1490 'sys.stdout.buffer.flush()'))
1491 out = assert_python_ok('-c', code)
1492 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001493 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001494 return stdout
1495
1496 def test_urandom_subprocess(self):
1497 data1 = self.get_urandom_subprocess(16)
1498 data2 = self.get_urandom_subprocess(16)
1499 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001500
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001501
Victor Stinner9b1f4742016-09-06 16:18:52 -07001502@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1503class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001504 @classmethod
1505 def setUpClass(cls):
1506 try:
1507 os.getrandom(1)
1508 except OSError as exc:
1509 if exc.errno == errno.ENOSYS:
1510 # Python compiled on a more recent Linux version
1511 # than the current Linux kernel
1512 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1513 else:
1514 raise
1515
Victor Stinner9b1f4742016-09-06 16:18:52 -07001516 def test_getrandom_type(self):
1517 data = os.getrandom(16)
1518 self.assertIsInstance(data, bytes)
1519 self.assertEqual(len(data), 16)
1520
1521 def test_getrandom0(self):
1522 empty = os.getrandom(0)
1523 self.assertEqual(empty, b'')
1524
1525 def test_getrandom_random(self):
1526 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1527
1528 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1529 # resource /dev/random
1530
1531 def test_getrandom_nonblock(self):
1532 # The call must not fail. Check also that the flag exists
1533 try:
1534 os.getrandom(1, os.GRND_NONBLOCK)
1535 except BlockingIOError:
1536 # System urandom is not initialized yet
1537 pass
1538
1539 def test_getrandom_value(self):
1540 data1 = os.getrandom(16)
1541 data2 = os.getrandom(16)
1542 self.assertNotEqual(data1, data2)
1543
1544
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001545# os.urandom() doesn't use a file descriptor when it is implemented with the
1546# getentropy() function, the getrandom() function or the getrandom() syscall
1547OS_URANDOM_DONT_USE_FD = (
1548 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1549 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1550 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001551
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001552@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1553 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001554@unittest.skipIf(sys.platform == "vxworks",
1555 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001556class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001557 @unittest.skipUnless(resource, "test requires the resource module")
1558 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001559 # Check urandom() failing when it is not able to open /dev/random.
1560 # We spawn a new process to make the test more robust (if getrlimit()
1561 # failed to restore the file descriptor limit after this, the whole
1562 # test suite would crash; this actually happened on the OS X Tiger
1563 # buildbot).
1564 code = """if 1:
1565 import errno
1566 import os
1567 import resource
1568
1569 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1570 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1571 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001572 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001573 except OSError as e:
1574 assert e.errno == errno.EMFILE, e.errno
1575 else:
1576 raise AssertionError("OSError not raised")
1577 """
1578 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001579
Antoine Pitroue472aea2014-04-26 14:33:03 +02001580 def test_urandom_fd_closed(self):
1581 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1582 # closed.
1583 code = """if 1:
1584 import os
1585 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001586 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001587 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001588 with test.support.SuppressCrashReport():
1589 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001590 sys.stdout.buffer.write(os.urandom(4))
1591 """
1592 rc, out, err = assert_python_ok('-Sc', code)
1593
1594 def test_urandom_fd_reopened(self):
1595 # Issue #21207: urandom() should detect its fd to /dev/urandom
1596 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001597 self.addCleanup(support.unlink, support.TESTFN)
1598 create_file(support.TESTFN, b"x" * 256)
1599
Antoine Pitroue472aea2014-04-26 14:33:03 +02001600 code = """if 1:
1601 import os
1602 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001603 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001604 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001605 with test.support.SuppressCrashReport():
1606 for fd in range(3, 256):
1607 try:
1608 os.close(fd)
1609 except OSError:
1610 pass
1611 else:
1612 # Found the urandom fd (XXX hopefully)
1613 break
1614 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001615 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001616 new_fd = f.fileno()
1617 # Issue #26935: posix allows new_fd and fd to be equal but
1618 # some libc implementations have dup2 return an error in this
1619 # case.
1620 if new_fd != fd:
1621 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001622 sys.stdout.buffer.write(os.urandom(4))
1623 sys.stdout.buffer.write(os.urandom(4))
1624 """.format(TESTFN=support.TESTFN)
1625 rc, out, err = assert_python_ok('-Sc', code)
1626 self.assertEqual(len(out), 8)
1627 self.assertNotEqual(out[0:4], out[4:8])
1628 rc, out2, err2 = assert_python_ok('-Sc', code)
1629 self.assertEqual(len(out2), 8)
1630 self.assertNotEqual(out2, out)
1631
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001632
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001633@contextlib.contextmanager
1634def _execvpe_mockup(defpath=None):
1635 """
1636 Stubs out execv and execve functions when used as context manager.
1637 Records exec calls. The mock execv and execve functions always raise an
1638 exception as they would normally never return.
1639 """
1640 # A list of tuples containing (function name, first arg, args)
1641 # of calls to execv or execve that have been made.
1642 calls = []
1643
1644 def mock_execv(name, *args):
1645 calls.append(('execv', name, args))
1646 raise RuntimeError("execv called")
1647
1648 def mock_execve(name, *args):
1649 calls.append(('execve', name, args))
1650 raise OSError(errno.ENOTDIR, "execve called")
1651
1652 try:
1653 orig_execv = os.execv
1654 orig_execve = os.execve
1655 orig_defpath = os.defpath
1656 os.execv = mock_execv
1657 os.execve = mock_execve
1658 if defpath is not None:
1659 os.defpath = defpath
1660 yield calls
1661 finally:
1662 os.execv = orig_execv
1663 os.execve = orig_execve
1664 os.defpath = orig_defpath
1665
pxinwrf2d7ac72019-05-21 18:46:37 +08001666@unittest.skipUnless(hasattr(os, 'execv'),
1667 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001668class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001669 @unittest.skipIf(USING_LINUXTHREADS,
1670 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001671 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001672 self.assertRaises(OSError, os.execvpe, 'no such app-',
1673 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001674
Steve Dowerbce26262016-11-19 19:17:26 -08001675 def test_execv_with_bad_arglist(self):
1676 self.assertRaises(ValueError, os.execv, 'notepad', ())
1677 self.assertRaises(ValueError, os.execv, 'notepad', [])
1678 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1679 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1680
Thomas Heller6790d602007-08-30 17:15:14 +00001681 def test_execvpe_with_bad_arglist(self):
1682 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001683 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1684 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001685
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001686 @unittest.skipUnless(hasattr(os, '_execvpe'),
1687 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001688 def _test_internal_execvpe(self, test_type):
1689 program_path = os.sep + 'absolutepath'
1690 if test_type is bytes:
1691 program = b'executable'
1692 fullpath = os.path.join(os.fsencode(program_path), program)
1693 native_fullpath = fullpath
1694 arguments = [b'progname', 'arg1', 'arg2']
1695 else:
1696 program = 'executable'
1697 arguments = ['progname', 'arg1', 'arg2']
1698 fullpath = os.path.join(program_path, program)
1699 if os.name != "nt":
1700 native_fullpath = os.fsencode(fullpath)
1701 else:
1702 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001703 env = {'spam': 'beans'}
1704
Victor Stinnerb745a742010-05-18 17:17:23 +00001705 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001706 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001707 self.assertRaises(RuntimeError,
1708 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001709 self.assertEqual(len(calls), 1)
1710 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1711
Victor Stinnerb745a742010-05-18 17:17:23 +00001712 # test os._execvpe() with a relative path:
1713 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001714 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001715 self.assertRaises(OSError,
1716 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001717 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001718 self.assertSequenceEqual(calls[0],
1719 ('execve', native_fullpath, (arguments, env)))
1720
1721 # test os._execvpe() with a relative path:
1722 # os.get_exec_path() reads the 'PATH' variable
1723 with _execvpe_mockup() as calls:
1724 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001725 if test_type is bytes:
1726 env_path[b'PATH'] = program_path
1727 else:
1728 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001729 self.assertRaises(OSError,
1730 os._execvpe, program, arguments, env=env_path)
1731 self.assertEqual(len(calls), 1)
1732 self.assertSequenceEqual(calls[0],
1733 ('execve', native_fullpath, (arguments, env_path)))
1734
1735 def test_internal_execvpe_str(self):
1736 self._test_internal_execvpe(str)
1737 if os.name != "nt":
1738 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001739
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001740 def test_execve_invalid_env(self):
1741 args = [sys.executable, '-c', 'pass']
1742
Ville Skyttä49b27342017-08-03 09:00:59 +03001743 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001744 newenv = os.environ.copy()
1745 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1746 with self.assertRaises(ValueError):
1747 os.execve(args[0], args, newenv)
1748
Ville Skyttä49b27342017-08-03 09:00:59 +03001749 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001750 newenv = os.environ.copy()
1751 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1752 with self.assertRaises(ValueError):
1753 os.execve(args[0], args, newenv)
1754
Ville Skyttä49b27342017-08-03 09:00:59 +03001755 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001756 newenv = os.environ.copy()
1757 newenv["FRUIT=ORANGE"] = "lemon"
1758 with self.assertRaises(ValueError):
1759 os.execve(args[0], args, newenv)
1760
Alexey Izbyshev83460312018-10-20 03:28:22 +03001761 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1762 def test_execve_with_empty_path(self):
1763 # bpo-32890: Check GetLastError() misuse
1764 try:
1765 os.execve('', ['arg'], {})
1766 except OSError as e:
1767 self.assertTrue(e.winerror is None or e.winerror != 0)
1768 else:
1769 self.fail('No OSError raised')
1770
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001771
Serhiy Storchaka43767632013-11-03 21:31:38 +02001772@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001773class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001774 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001775 try:
1776 os.stat(support.TESTFN)
1777 except FileNotFoundError:
1778 exists = False
1779 except OSError as exc:
1780 exists = True
1781 self.fail("file %s must not exist; os.stat failed with %s"
1782 % (support.TESTFN, exc))
1783 else:
1784 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001785
Thomas Wouters477c8d52006-05-27 19:21:47 +00001786 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001787 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001788
1789 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001790 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001791
1792 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001793 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001794
1795 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001796 self.addCleanup(support.unlink, support.TESTFN)
1797
Victor Stinnere77c9742016-03-25 10:28:23 +01001798 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001799 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001800
1801 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001802 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001803
Thomas Wouters477c8d52006-05-27 19:21:47 +00001804 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001805 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001806
Victor Stinnere77c9742016-03-25 10:28:23 +01001807
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001808class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001809 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001810 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1811 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001812 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001813 def get_single(f):
1814 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001815 if hasattr(os, f):
1816 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001817 return helper
1818 for f in singles:
1819 locals()["test_"+f] = get_single(f)
1820
Benjamin Peterson7522c742009-01-19 21:00:09 +00001821 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001822 try:
1823 f(support.make_bad_fd(), *args)
1824 except OSError as e:
1825 self.assertEqual(e.errno, errno.EBADF)
1826 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001827 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001828 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001829
Serhiy Storchaka43767632013-11-03 21:31:38 +02001830 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001831 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001833
Serhiy Storchaka43767632013-11-03 21:31:38 +02001834 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001835 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 fd = support.make_bad_fd()
1837 # Make sure none of the descriptors we are about to close are
1838 # currently valid (issue 6542).
1839 for i in range(10):
1840 try: os.fstat(fd+i)
1841 except OSError:
1842 pass
1843 else:
1844 break
1845 if i < 2:
1846 raise unittest.SkipTest(
1847 "Unable to acquire a range of invalid file descriptors")
1848 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001849
Serhiy Storchaka43767632013-11-03 21:31:38 +02001850 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001851 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001852 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001853
Serhiy Storchaka43767632013-11-03 21:31:38 +02001854 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001855 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001856 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001857
Serhiy Storchaka43767632013-11-03 21:31:38 +02001858 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001859 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001860 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001861
Serhiy Storchaka43767632013-11-03 21:31:38 +02001862 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001863 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001864 self.check(os.pathconf, "PC_NAME_MAX")
1865 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001866
Serhiy Storchaka43767632013-11-03 21:31:38 +02001867 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001868 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001869 self.check(os.truncate, 0)
1870 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001871
Serhiy Storchaka43767632013-11-03 21:31:38 +02001872 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001873 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001874 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001875
Serhiy Storchaka43767632013-11-03 21:31:38 +02001876 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001877 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001878 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001879
Victor Stinner57ddf782014-01-08 15:21:28 +01001880 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1881 def test_readv(self):
1882 buf = bytearray(10)
1883 self.check(os.readv, [buf])
1884
Serhiy Storchaka43767632013-11-03 21:31:38 +02001885 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001886 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001887 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001888
Serhiy Storchaka43767632013-11-03 21:31:38 +02001889 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001890 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001892
Victor Stinner57ddf782014-01-08 15:21:28 +01001893 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1894 def test_writev(self):
1895 self.check(os.writev, [b'abc'])
1896
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001897 def test_inheritable(self):
1898 self.check(os.get_inheritable)
1899 self.check(os.set_inheritable, True)
1900
1901 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1902 'needs os.get_blocking() and os.set_blocking()')
1903 def test_blocking(self):
1904 self.check(os.get_blocking)
1905 self.check(os.set_blocking, True)
1906
Brian Curtin1b9df392010-11-24 20:24:31 +00001907
1908class LinkTests(unittest.TestCase):
1909 def setUp(self):
1910 self.file1 = support.TESTFN
1911 self.file2 = os.path.join(support.TESTFN + "2")
1912
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001913 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001914 for file in (self.file1, self.file2):
1915 if os.path.exists(file):
1916 os.unlink(file)
1917
Brian Curtin1b9df392010-11-24 20:24:31 +00001918 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001919 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001920
xdegaye6a55d092017-11-12 17:57:04 +01001921 try:
1922 os.link(file1, file2)
1923 except PermissionError as e:
1924 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001925 with open(file1, "r") as f1, open(file2, "r") as f2:
1926 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1927
1928 def test_link(self):
1929 self._test_link(self.file1, self.file2)
1930
1931 def test_link_bytes(self):
1932 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1933 bytes(self.file2, sys.getfilesystemencoding()))
1934
Brian Curtinf498b752010-11-30 15:54:04 +00001935 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001936 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001937 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001938 except UnicodeError:
1939 raise unittest.SkipTest("Unable to encode for this platform.")
1940
Brian Curtinf498b752010-11-30 15:54:04 +00001941 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001942 self.file2 = self.file1 + "2"
1943 self._test_link(self.file1, self.file2)
1944
Serhiy Storchaka43767632013-11-03 21:31:38 +02001945@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1946class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001947 # uid_t and gid_t are 32-bit unsigned integers on Linux
1948 UID_OVERFLOW = (1 << 32)
1949 GID_OVERFLOW = (1 << 32)
1950
Serhiy Storchaka43767632013-11-03 21:31:38 +02001951 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1952 def test_setuid(self):
1953 if os.getuid() != 0:
1954 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001955 self.assertRaises(TypeError, os.setuid, 'not an int')
1956 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001957
Serhiy Storchaka43767632013-11-03 21:31:38 +02001958 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1959 def test_setgid(self):
1960 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1961 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001962 self.assertRaises(TypeError, os.setgid, 'not an int')
1963 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001964
Serhiy Storchaka43767632013-11-03 21:31:38 +02001965 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1966 def test_seteuid(self):
1967 if os.getuid() != 0:
1968 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001969 self.assertRaises(TypeError, os.setegid, 'not an int')
1970 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001971
Serhiy Storchaka43767632013-11-03 21:31:38 +02001972 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1973 def test_setegid(self):
1974 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1975 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001976 self.assertRaises(TypeError, os.setegid, 'not an int')
1977 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001978
Serhiy Storchaka43767632013-11-03 21:31:38 +02001979 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1980 def test_setreuid(self):
1981 if os.getuid() != 0:
1982 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001983 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1984 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1985 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1986 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001987
Serhiy Storchaka43767632013-11-03 21:31:38 +02001988 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1989 def test_setreuid_neg1(self):
1990 # Needs to accept -1. We run this in a subprocess to avoid
1991 # altering the test runner's process state (issue8045).
1992 subprocess.check_call([
1993 sys.executable, '-c',
1994 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001995
Serhiy Storchaka43767632013-11-03 21:31:38 +02001996 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1997 def test_setregid(self):
1998 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1999 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002000 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2001 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2002 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2003 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002004
Serhiy Storchaka43767632013-11-03 21:31:38 +02002005 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2006 def test_setregid_neg1(self):
2007 # Needs to accept -1. We run this in a subprocess to avoid
2008 # altering the test runner's process state (issue8045).
2009 subprocess.check_call([
2010 sys.executable, '-c',
2011 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002012
Serhiy Storchaka43767632013-11-03 21:31:38 +02002013@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2014class Pep383Tests(unittest.TestCase):
2015 def setUp(self):
2016 if support.TESTFN_UNENCODABLE:
2017 self.dir = support.TESTFN_UNENCODABLE
2018 elif support.TESTFN_NONASCII:
2019 self.dir = support.TESTFN_NONASCII
2020 else:
2021 self.dir = support.TESTFN
2022 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002023
Serhiy Storchaka43767632013-11-03 21:31:38 +02002024 bytesfn = []
2025 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002026 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002027 fn = os.fsencode(fn)
2028 except UnicodeEncodeError:
2029 return
2030 bytesfn.append(fn)
2031 add_filename(support.TESTFN_UNICODE)
2032 if support.TESTFN_UNENCODABLE:
2033 add_filename(support.TESTFN_UNENCODABLE)
2034 if support.TESTFN_NONASCII:
2035 add_filename(support.TESTFN_NONASCII)
2036 if not bytesfn:
2037 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002038
Serhiy Storchaka43767632013-11-03 21:31:38 +02002039 self.unicodefn = set()
2040 os.mkdir(self.dir)
2041 try:
2042 for fn in bytesfn:
2043 support.create_empty_file(os.path.join(self.bdir, fn))
2044 fn = os.fsdecode(fn)
2045 if fn in self.unicodefn:
2046 raise ValueError("duplicate filename")
2047 self.unicodefn.add(fn)
2048 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002049 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002050 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002051
Serhiy Storchaka43767632013-11-03 21:31:38 +02002052 def tearDown(self):
2053 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002054
Serhiy Storchaka43767632013-11-03 21:31:38 +02002055 def test_listdir(self):
2056 expected = self.unicodefn
2057 found = set(os.listdir(self.dir))
2058 self.assertEqual(found, expected)
2059 # test listdir without arguments
2060 current_directory = os.getcwd()
2061 try:
2062 os.chdir(os.sep)
2063 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2064 finally:
2065 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002066
Serhiy Storchaka43767632013-11-03 21:31:38 +02002067 def test_open(self):
2068 for fn in self.unicodefn:
2069 f = open(os.path.join(self.dir, fn), 'rb')
2070 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002071
Serhiy Storchaka43767632013-11-03 21:31:38 +02002072 @unittest.skipUnless(hasattr(os, 'statvfs'),
2073 "need os.statvfs()")
2074 def test_statvfs(self):
2075 # issue #9645
2076 for fn in self.unicodefn:
2077 # should not fail with file not found error
2078 fullname = os.path.join(self.dir, fn)
2079 os.statvfs(fullname)
2080
2081 def test_stat(self):
2082 for fn in self.unicodefn:
2083 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002084
Brian Curtineb24d742010-04-12 17:16:38 +00002085@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2086class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002087 def _kill(self, sig):
2088 # Start sys.executable as a subprocess and communicate from the
2089 # subprocess to the parent that the interpreter is ready. When it
2090 # becomes ready, send *sig* via os.kill to the subprocess and check
2091 # that the return code is equal to *sig*.
2092 import ctypes
2093 from ctypes import wintypes
2094 import msvcrt
2095
2096 # Since we can't access the contents of the process' stdout until the
2097 # process has exited, use PeekNamedPipe to see what's inside stdout
2098 # without waiting. This is done so we can tell that the interpreter
2099 # is started and running at a point where it could handle a signal.
2100 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2101 PeekNamedPipe.restype = wintypes.BOOL
2102 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2103 ctypes.POINTER(ctypes.c_char), # stdout buf
2104 wintypes.DWORD, # Buffer size
2105 ctypes.POINTER(wintypes.DWORD), # bytes read
2106 ctypes.POINTER(wintypes.DWORD), # bytes avail
2107 ctypes.POINTER(wintypes.DWORD)) # bytes left
2108 msg = "running"
2109 proc = subprocess.Popen([sys.executable, "-c",
2110 "import sys;"
2111 "sys.stdout.write('{}');"
2112 "sys.stdout.flush();"
2113 "input()".format(msg)],
2114 stdout=subprocess.PIPE,
2115 stderr=subprocess.PIPE,
2116 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002117 self.addCleanup(proc.stdout.close)
2118 self.addCleanup(proc.stderr.close)
2119 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002120
2121 count, max = 0, 100
2122 while count < max and proc.poll() is None:
2123 # Create a string buffer to store the result of stdout from the pipe
2124 buf = ctypes.create_string_buffer(len(msg))
2125 # Obtain the text currently in proc.stdout
2126 # Bytes read/avail/left are left as NULL and unused
2127 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2128 buf, ctypes.sizeof(buf), None, None, None)
2129 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2130 if buf.value:
2131 self.assertEqual(msg, buf.value.decode())
2132 break
2133 time.sleep(0.1)
2134 count += 1
2135 else:
2136 self.fail("Did not receive communication from the subprocess")
2137
Brian Curtineb24d742010-04-12 17:16:38 +00002138 os.kill(proc.pid, sig)
2139 self.assertEqual(proc.wait(), sig)
2140
2141 def test_kill_sigterm(self):
2142 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002143 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002144
2145 def test_kill_int(self):
2146 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002147 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002148
2149 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002150 tagname = "test_os_%s" % uuid.uuid1()
2151 m = mmap.mmap(-1, 1, tagname)
2152 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002153 # Run a script which has console control handling enabled.
2154 proc = subprocess.Popen([sys.executable,
2155 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002156 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002157 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2158 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002159 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002160 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002161 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002162 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002163 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002164 count += 1
2165 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002166 # Forcefully kill the process if we weren't able to signal it.
2167 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002168 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002169 os.kill(proc.pid, event)
2170 # proc.send_signal(event) could also be done here.
2171 # Allow time for the signal to be passed and the process to exit.
2172 time.sleep(0.5)
2173 if not proc.poll():
2174 # Forcefully kill the process if we weren't able to signal it.
2175 os.kill(proc.pid, signal.SIGINT)
2176 self.fail("subprocess did not stop on {}".format(name))
2177
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002178 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002179 def test_CTRL_C_EVENT(self):
2180 from ctypes import wintypes
2181 import ctypes
2182
2183 # Make a NULL value by creating a pointer with no argument.
2184 NULL = ctypes.POINTER(ctypes.c_int)()
2185 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2186 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2187 wintypes.BOOL)
2188 SetConsoleCtrlHandler.restype = wintypes.BOOL
2189
2190 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002191 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002192 # by subprocesses.
2193 SetConsoleCtrlHandler(NULL, 0)
2194
2195 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2196
2197 def test_CTRL_BREAK_EVENT(self):
2198 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2199
2200
Brian Curtind40e6f72010-07-08 21:39:08 +00002201@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002202class Win32ListdirTests(unittest.TestCase):
2203 """Test listdir on Windows."""
2204
2205 def setUp(self):
2206 self.created_paths = []
2207 for i in range(2):
2208 dir_name = 'SUB%d' % i
2209 dir_path = os.path.join(support.TESTFN, dir_name)
2210 file_name = 'FILE%d' % i
2211 file_path = os.path.join(support.TESTFN, file_name)
2212 os.makedirs(dir_path)
2213 with open(file_path, 'w') as f:
2214 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2215 self.created_paths.extend([dir_name, file_name])
2216 self.created_paths.sort()
2217
2218 def tearDown(self):
2219 shutil.rmtree(support.TESTFN)
2220
2221 def test_listdir_no_extended_path(self):
2222 """Test when the path is not an "extended" path."""
2223 # unicode
2224 self.assertEqual(
2225 sorted(os.listdir(support.TESTFN)),
2226 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002227
Tim Golden781bbeb2013-10-25 20:24:06 +01002228 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002229 self.assertEqual(
2230 sorted(os.listdir(os.fsencode(support.TESTFN))),
2231 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002232
2233 def test_listdir_extended_path(self):
2234 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002235 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002236 # unicode
2237 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2238 self.assertEqual(
2239 sorted(os.listdir(path)),
2240 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002241
Tim Golden781bbeb2013-10-25 20:24:06 +01002242 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002243 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2244 self.assertEqual(
2245 sorted(os.listdir(path)),
2246 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002247
2248
Berker Peksage0b5b202018-08-15 13:03:41 +03002249@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2250class ReadlinkTests(unittest.TestCase):
2251 filelink = 'readlinktest'
2252 filelink_target = os.path.abspath(__file__)
2253 filelinkb = os.fsencode(filelink)
2254 filelinkb_target = os.fsencode(filelink_target)
2255
2256 def setUp(self):
2257 self.assertTrue(os.path.exists(self.filelink_target))
2258 self.assertTrue(os.path.exists(self.filelinkb_target))
2259 self.assertFalse(os.path.exists(self.filelink))
2260 self.assertFalse(os.path.exists(self.filelinkb))
2261
2262 def test_not_symlink(self):
2263 filelink_target = FakePath(self.filelink_target)
2264 self.assertRaises(OSError, os.readlink, self.filelink_target)
2265 self.assertRaises(OSError, os.readlink, filelink_target)
2266
2267 def test_missing_link(self):
2268 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2269 self.assertRaises(FileNotFoundError, os.readlink,
2270 FakePath('missing-link'))
2271
2272 @support.skip_unless_symlink
2273 def test_pathlike(self):
2274 os.symlink(self.filelink_target, self.filelink)
2275 self.addCleanup(support.unlink, self.filelink)
2276 filelink = FakePath(self.filelink)
2277 self.assertEqual(os.readlink(filelink), self.filelink_target)
2278
2279 @support.skip_unless_symlink
2280 def test_pathlike_bytes(self):
2281 os.symlink(self.filelinkb_target, self.filelinkb)
2282 self.addCleanup(support.unlink, self.filelinkb)
2283 path = os.readlink(FakePath(self.filelinkb))
2284 self.assertEqual(path, self.filelinkb_target)
2285 self.assertIsInstance(path, bytes)
2286
2287 @support.skip_unless_symlink
2288 def test_bytes(self):
2289 os.symlink(self.filelinkb_target, self.filelinkb)
2290 self.addCleanup(support.unlink, self.filelinkb)
2291 path = os.readlink(self.filelinkb)
2292 self.assertEqual(path, self.filelinkb_target)
2293 self.assertIsInstance(path, bytes)
2294
2295
Tim Golden781bbeb2013-10-25 20:24:06 +01002296@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002297@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002298class Win32SymlinkTests(unittest.TestCase):
2299 filelink = 'filelinktest'
2300 filelink_target = os.path.abspath(__file__)
2301 dirlink = 'dirlinktest'
2302 dirlink_target = os.path.dirname(filelink_target)
2303 missing_link = 'missing link'
2304
2305 def setUp(self):
2306 assert os.path.exists(self.dirlink_target)
2307 assert os.path.exists(self.filelink_target)
2308 assert not os.path.exists(self.dirlink)
2309 assert not os.path.exists(self.filelink)
2310 assert not os.path.exists(self.missing_link)
2311
2312 def tearDown(self):
2313 if os.path.exists(self.filelink):
2314 os.remove(self.filelink)
2315 if os.path.exists(self.dirlink):
2316 os.rmdir(self.dirlink)
2317 if os.path.lexists(self.missing_link):
2318 os.remove(self.missing_link)
2319
2320 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002321 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002322 self.assertTrue(os.path.exists(self.dirlink))
2323 self.assertTrue(os.path.isdir(self.dirlink))
2324 self.assertTrue(os.path.islink(self.dirlink))
2325 self.check_stat(self.dirlink, self.dirlink_target)
2326
2327 def test_file_link(self):
2328 os.symlink(self.filelink_target, self.filelink)
2329 self.assertTrue(os.path.exists(self.filelink))
2330 self.assertTrue(os.path.isfile(self.filelink))
2331 self.assertTrue(os.path.islink(self.filelink))
2332 self.check_stat(self.filelink, self.filelink_target)
2333
2334 def _create_missing_dir_link(self):
2335 'Create a "directory" link to a non-existent target'
2336 linkname = self.missing_link
2337 if os.path.lexists(linkname):
2338 os.remove(linkname)
2339 target = r'c:\\target does not exist.29r3c740'
2340 assert not os.path.exists(target)
2341 target_is_dir = True
2342 os.symlink(target, linkname, target_is_dir)
2343
2344 def test_remove_directory_link_to_missing_target(self):
2345 self._create_missing_dir_link()
2346 # For compatibility with Unix, os.remove will check the
2347 # directory status and call RemoveDirectory if the symlink
2348 # was created with target_is_dir==True.
2349 os.remove(self.missing_link)
2350
2351 @unittest.skip("currently fails; consider for improvement")
2352 def test_isdir_on_directory_link_to_missing_target(self):
2353 self._create_missing_dir_link()
2354 # consider having isdir return true for directory links
2355 self.assertTrue(os.path.isdir(self.missing_link))
2356
2357 @unittest.skip("currently fails; consider for improvement")
2358 def test_rmdir_on_directory_link_to_missing_target(self):
2359 self._create_missing_dir_link()
2360 # consider allowing rmdir to remove directory links
2361 os.rmdir(self.missing_link)
2362
2363 def check_stat(self, link, target):
2364 self.assertEqual(os.stat(link), os.stat(target))
2365 self.assertNotEqual(os.lstat(link), os.stat(link))
2366
Brian Curtind25aef52011-06-13 15:16:04 -05002367 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002368 self.assertEqual(os.stat(bytes_link), os.stat(target))
2369 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002370
2371 def test_12084(self):
2372 level1 = os.path.abspath(support.TESTFN)
2373 level2 = os.path.join(level1, "level2")
2374 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002375 self.addCleanup(support.rmtree, level1)
2376
2377 os.mkdir(level1)
2378 os.mkdir(level2)
2379 os.mkdir(level3)
2380
2381 file1 = os.path.abspath(os.path.join(level1, "file1"))
2382 create_file(file1)
2383
2384 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002385 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002386 os.chdir(level2)
2387 link = os.path.join(level2, "link")
2388 os.symlink(os.path.relpath(file1), "link")
2389 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002390
Victor Stinnerae39d232016-03-24 17:12:55 +01002391 # Check os.stat calls from the same dir as the link
2392 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002393
Victor Stinnerae39d232016-03-24 17:12:55 +01002394 # Check os.stat calls from a dir below the link
2395 os.chdir(level1)
2396 self.assertEqual(os.stat(file1),
2397 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002398
Victor Stinnerae39d232016-03-24 17:12:55 +01002399 # Check os.stat calls from a dir above the link
2400 os.chdir(level3)
2401 self.assertEqual(os.stat(file1),
2402 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002403 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002404 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002405
SSE43c34aad2018-02-13 00:10:35 +07002406 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2407 and os.path.exists(r'C:\ProgramData'),
2408 'Test directories not found')
2409 def test_29248(self):
2410 # os.symlink() calls CreateSymbolicLink, which creates
2411 # the reparse data buffer with the print name stored
2412 # first, so the offset is always 0. CreateSymbolicLink
2413 # stores the "PrintName" DOS path (e.g. "C:\") first,
2414 # with an offset of 0, followed by the "SubstituteName"
2415 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2416 # the other hand, seems to have been created manually
2417 # with an inverted order.
2418 target = os.readlink(r'C:\Users\All Users')
2419 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2420
Steve Dower6921e732018-03-05 14:26:08 -08002421 def test_buffer_overflow(self):
2422 # Older versions would have a buffer overflow when detecting
2423 # whether a link source was a directory. This test ensures we
2424 # no longer crash, but does not otherwise validate the behavior
2425 segment = 'X' * 27
2426 path = os.path.join(*[segment] * 10)
2427 test_cases = [
2428 # overflow with absolute src
2429 ('\\' + path, segment),
2430 # overflow dest with relative src
2431 (segment, path),
2432 # overflow when joining src
2433 (path[:180], path[:180]),
2434 ]
2435 for src, dest in test_cases:
2436 try:
2437 os.symlink(src, dest)
2438 except FileNotFoundError:
2439 pass
2440 else:
2441 try:
2442 os.remove(dest)
2443 except OSError:
2444 pass
2445 # Also test with bytes, since that is a separate code path.
2446 try:
2447 os.symlink(os.fsencode(src), os.fsencode(dest))
2448 except FileNotFoundError:
2449 pass
2450 else:
2451 try:
2452 os.remove(dest)
2453 except OSError:
2454 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002455
Tim Golden0321cf22014-05-05 19:46:17 +01002456@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2457class Win32JunctionTests(unittest.TestCase):
2458 junction = 'junctiontest'
2459 junction_target = os.path.dirname(os.path.abspath(__file__))
2460
2461 def setUp(self):
2462 assert os.path.exists(self.junction_target)
2463 assert not os.path.exists(self.junction)
2464
2465 def tearDown(self):
2466 if os.path.exists(self.junction):
2467 # os.rmdir delegates to Windows' RemoveDirectoryW,
2468 # which removes junction points safely.
2469 os.rmdir(self.junction)
2470
2471 def test_create_junction(self):
2472 _winapi.CreateJunction(self.junction_target, self.junction)
2473 self.assertTrue(os.path.exists(self.junction))
2474 self.assertTrue(os.path.isdir(self.junction))
2475
2476 # Junctions are not recognized as links.
2477 self.assertFalse(os.path.islink(self.junction))
2478
2479 def test_unlink_removes_junction(self):
2480 _winapi.CreateJunction(self.junction_target, self.junction)
2481 self.assertTrue(os.path.exists(self.junction))
2482
2483 os.unlink(self.junction)
2484 self.assertFalse(os.path.exists(self.junction))
2485
Mark Becwarb82bfac2019-02-02 16:08:23 -05002486@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2487class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002488 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002489 nt = support.import_module('nt')
2490 ctypes = support.import_module('ctypes')
2491 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002492
2493 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2494 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2495
2496 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2497 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2498 ctypes.wintypes.LPDWORD)
2499
2500 # This is a pseudo-handle that doesn't need to be closed
2501 hproc = kernel.GetCurrentProcess()
2502
2503 handle_count = ctypes.wintypes.DWORD()
2504 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2505 self.assertEqual(1, ok)
2506
2507 before_count = handle_count.value
2508
2509 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002510 filenames = [
2511 r'\\?\C:',
2512 r'\\?\NUL',
2513 r'\\?\CONIN',
2514 __file__,
2515 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002516
Berker Peksag6ef726a2019-04-22 18:46:28 +03002517 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002518 for name in filenames:
2519 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002520 nt._getfinalpathname(name)
2521 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002522 # Failure is expected
2523 pass
2524 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002525 os.stat(name)
2526 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002527 pass
2528
2529 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2530 self.assertEqual(1, ok)
2531
2532 handle_delta = handle_count.value - before_count
2533
2534 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002535
Jason R. Coombs3a092862013-05-27 23:21:28 -04002536@support.skip_unless_symlink
2537class NonLocalSymlinkTests(unittest.TestCase):
2538
2539 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002540 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002541 Create this structure:
2542
2543 base
2544 \___ some_dir
2545 """
2546 os.makedirs('base/some_dir')
2547
2548 def tearDown(self):
2549 shutil.rmtree('base')
2550
2551 def test_directory_link_nonlocal(self):
2552 """
2553 The symlink target should resolve relative to the link, not relative
2554 to the current directory.
2555
2556 Then, link base/some_link -> base/some_dir and ensure that some_link
2557 is resolved as a directory.
2558
2559 In issue13772, it was discovered that directory detection failed if
2560 the symlink target was not specified relative to the current
2561 directory, which was a defect in the implementation.
2562 """
2563 src = os.path.join('base', 'some_link')
2564 os.symlink('some_dir', src)
2565 assert os.path.isdir(src)
2566
2567
Victor Stinnere8d51452010-08-19 01:05:19 +00002568class FSEncodingTests(unittest.TestCase):
2569 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002570 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2571 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002572
Victor Stinnere8d51452010-08-19 01:05:19 +00002573 def test_identity(self):
2574 # assert fsdecode(fsencode(x)) == x
2575 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2576 try:
2577 bytesfn = os.fsencode(fn)
2578 except UnicodeEncodeError:
2579 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002580 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002581
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002582
Brett Cannonefb00c02012-02-29 18:31:31 -05002583
2584class DeviceEncodingTests(unittest.TestCase):
2585
2586 def test_bad_fd(self):
2587 # Return None when an fd doesn't actually exist.
2588 self.assertIsNone(os.device_encoding(123456))
2589
Paul Monson62dfd7d2019-04-25 11:36:45 -07002590 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002591 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002592 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002593 def test_device_encoding(self):
2594 encoding = os.device_encoding(0)
2595 self.assertIsNotNone(encoding)
2596 self.assertTrue(codecs.lookup(encoding))
2597
2598
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002599class PidTests(unittest.TestCase):
2600 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2601 def test_getppid(self):
2602 p = subprocess.Popen([sys.executable, '-c',
2603 'import os; print(os.getppid())'],
2604 stdout=subprocess.PIPE)
2605 stdout, _ = p.communicate()
2606 # We are the parent of our subprocess
2607 self.assertEqual(int(stdout), os.getpid())
2608
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002609 def test_waitpid(self):
2610 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002611 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002612 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002613 status = os.waitpid(pid, 0)
2614 self.assertEqual(status, (pid, 0))
2615
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002616
Victor Stinner4659ccf2016-09-14 10:57:00 +02002617class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002618 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002619 self.exitcode = 17
2620
2621 filename = support.TESTFN
2622 self.addCleanup(support.unlink, filename)
2623
2624 if not with_env:
2625 code = 'import sys; sys.exit(%s)' % self.exitcode
2626 else:
2627 self.env = dict(os.environ)
2628 # create an unique key
2629 self.key = str(uuid.uuid4())
2630 self.env[self.key] = self.key
2631 # read the variable from os.environ to check that it exists
2632 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2633 % (self.key, self.exitcode))
2634
2635 with open(filename, "w") as fp:
2636 fp.write(code)
2637
Berker Peksag81816462016-09-15 20:19:47 +03002638 args = [sys.executable, filename]
2639 if use_bytes:
2640 args = [os.fsencode(a) for a in args]
2641 self.env = {os.fsencode(k): os.fsencode(v)
2642 for k, v in self.env.items()}
2643
2644 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002645
Berker Peksag4af23d72016-09-15 20:32:44 +03002646 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002647 def test_spawnl(self):
2648 args = self.create_args()
2649 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2650 self.assertEqual(exitcode, self.exitcode)
2651
Berker Peksag4af23d72016-09-15 20:32:44 +03002652 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002653 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002654 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002655 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2656 self.assertEqual(exitcode, self.exitcode)
2657
Berker Peksag4af23d72016-09-15 20:32:44 +03002658 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002659 def test_spawnlp(self):
2660 args = self.create_args()
2661 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2662 self.assertEqual(exitcode, self.exitcode)
2663
Berker Peksag4af23d72016-09-15 20:32:44 +03002664 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002665 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002666 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002667 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2668 self.assertEqual(exitcode, self.exitcode)
2669
Berker Peksag4af23d72016-09-15 20:32:44 +03002670 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002671 def test_spawnv(self):
2672 args = self.create_args()
2673 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2674 self.assertEqual(exitcode, self.exitcode)
2675
Berker Peksag4af23d72016-09-15 20:32:44 +03002676 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002677 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002678 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002679 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2680 self.assertEqual(exitcode, self.exitcode)
2681
Berker Peksag4af23d72016-09-15 20:32:44 +03002682 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002683 def test_spawnvp(self):
2684 args = self.create_args()
2685 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2686 self.assertEqual(exitcode, self.exitcode)
2687
Berker Peksag4af23d72016-09-15 20:32:44 +03002688 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002689 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002690 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002691 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2692 self.assertEqual(exitcode, self.exitcode)
2693
Berker Peksag4af23d72016-09-15 20:32:44 +03002694 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002695 def test_nowait(self):
2696 args = self.create_args()
2697 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2698 result = os.waitpid(pid, 0)
2699 self.assertEqual(result[0], pid)
2700 status = result[1]
2701 if hasattr(os, 'WIFEXITED'):
2702 self.assertTrue(os.WIFEXITED(status))
2703 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2704 else:
2705 self.assertEqual(status, self.exitcode << 8)
2706
Berker Peksag4af23d72016-09-15 20:32:44 +03002707 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002708 def test_spawnve_bytes(self):
2709 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2710 args = self.create_args(with_env=True, use_bytes=True)
2711 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2712 self.assertEqual(exitcode, self.exitcode)
2713
Steve Dower859fd7b2016-11-19 18:53:19 -08002714 @requires_os_func('spawnl')
2715 def test_spawnl_noargs(self):
2716 args = self.create_args()
2717 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002718 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002719
2720 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002721 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002722 args = self.create_args()
2723 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002724 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002725
2726 @requires_os_func('spawnv')
2727 def test_spawnv_noargs(self):
2728 args = self.create_args()
2729 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2730 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002731 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2732 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002733
2734 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002735 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002736 args = self.create_args()
2737 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2738 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002739 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2740 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002741
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002742 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002743 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002744
Ville Skyttä49b27342017-08-03 09:00:59 +03002745 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002746 newenv = os.environ.copy()
2747 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2748 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002749 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002750 except ValueError:
2751 pass
2752 else:
2753 self.assertEqual(exitcode, 127)
2754
Ville Skyttä49b27342017-08-03 09:00:59 +03002755 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002756 newenv = os.environ.copy()
2757 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2758 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002759 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002760 except ValueError:
2761 pass
2762 else:
2763 self.assertEqual(exitcode, 127)
2764
Ville Skyttä49b27342017-08-03 09:00:59 +03002765 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002766 newenv = os.environ.copy()
2767 newenv["FRUIT=ORANGE"] = "lemon"
2768 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002769 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002770 except ValueError:
2771 pass
2772 else:
2773 self.assertEqual(exitcode, 127)
2774
Ville Skyttä49b27342017-08-03 09:00:59 +03002775 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002776 filename = support.TESTFN
2777 self.addCleanup(support.unlink, filename)
2778 with open(filename, "w") as fp:
2779 fp.write('import sys, os\n'
2780 'if os.getenv("FRUIT") != "orange=lemon":\n'
2781 ' raise AssertionError')
2782 args = [sys.executable, filename]
2783 newenv = os.environ.copy()
2784 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002785 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002786 self.assertEqual(exitcode, 0)
2787
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002788 @requires_os_func('spawnve')
2789 def test_spawnve_invalid_env(self):
2790 self._test_invalid_env(os.spawnve)
2791
2792 @requires_os_func('spawnvpe')
2793 def test_spawnvpe_invalid_env(self):
2794 self._test_invalid_env(os.spawnvpe)
2795
Serhiy Storchaka77703942017-06-25 07:33:01 +03002796
Brian Curtin0151b8e2010-09-24 13:43:43 +00002797# The introduction of this TestCase caused at least two different errors on
2798# *nix buildbots. Temporarily skip this to let the buildbots move along.
2799@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002800@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2801class LoginTests(unittest.TestCase):
2802 def test_getlogin(self):
2803 user_name = os.getlogin()
2804 self.assertNotEqual(len(user_name), 0)
2805
2806
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002807@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2808 "needs os.getpriority and os.setpriority")
2809class ProgramPriorityTests(unittest.TestCase):
2810 """Tests for os.getpriority() and os.setpriority()."""
2811
2812 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002813
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002814 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2815 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2816 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002817 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2818 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002819 raise unittest.SkipTest("unable to reliably test setpriority "
2820 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002821 else:
2822 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002823 finally:
2824 try:
2825 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2826 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002827 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002828 raise
2829
2830
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002831class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002832
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002833 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002834
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 def __init__(self, conn):
2836 asynchat.async_chat.__init__(self, conn)
2837 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002838 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002839 self.closed = False
2840 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002841
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002842 def handle_read(self):
2843 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002844 if self.accumulate:
2845 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002846
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002847 def get_data(self):
2848 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002849
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002850 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002851 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002852 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002853
2854 def handle_error(self):
2855 raise
2856
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002857 def __init__(self, address):
2858 threading.Thread.__init__(self)
2859 asyncore.dispatcher.__init__(self)
2860 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2861 self.bind(address)
2862 self.listen(5)
2863 self.host, self.port = self.socket.getsockname()[:2]
2864 self.handler_instance = None
2865 self._active = False
2866 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002867
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002868 # --- public API
2869
2870 @property
2871 def running(self):
2872 return self._active
2873
2874 def start(self):
2875 assert not self.running
2876 self.__flag = threading.Event()
2877 threading.Thread.start(self)
2878 self.__flag.wait()
2879
2880 def stop(self):
2881 assert self.running
2882 self._active = False
2883 self.join()
2884
2885 def wait(self):
2886 # wait for handler connection to be closed, then stop the server
2887 while not getattr(self.handler_instance, "closed", False):
2888 time.sleep(0.001)
2889 self.stop()
2890
2891 # --- internals
2892
2893 def run(self):
2894 self._active = True
2895 self.__flag.set()
2896 while self._active and asyncore.socket_map:
2897 self._active_lock.acquire()
2898 asyncore.loop(timeout=0.001, count=1)
2899 self._active_lock.release()
2900 asyncore.close_all()
2901
2902 def handle_accept(self):
2903 conn, addr = self.accept()
2904 self.handler_instance = self.Handler(conn)
2905
2906 def handle_connect(self):
2907 self.close()
2908 handle_read = handle_connect
2909
2910 def writable(self):
2911 return 0
2912
2913 def handle_error(self):
2914 raise
2915
2916
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002917@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2918class TestSendfile(unittest.TestCase):
2919
Victor Stinner8c663fd2017-11-08 14:44:44 -08002920 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002921 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002922 not sys.platform.startswith("solaris") and \
2923 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002924 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2925 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002926 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2927 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002928
2929 @classmethod
2930 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002931 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002932 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002933
2934 @classmethod
2935 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002936 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002937 support.unlink(support.TESTFN)
2938
2939 def setUp(self):
2940 self.server = SendfileTestServer((support.HOST, 0))
2941 self.server.start()
2942 self.client = socket.socket()
2943 self.client.connect((self.server.host, self.server.port))
2944 self.client.settimeout(1)
2945 # synchronize by waiting for "220 ready" response
2946 self.client.recv(1024)
2947 self.sockno = self.client.fileno()
2948 self.file = open(support.TESTFN, 'rb')
2949 self.fileno = self.file.fileno()
2950
2951 def tearDown(self):
2952 self.file.close()
2953 self.client.close()
2954 if self.server.running:
2955 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002956 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002957
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002958 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002959 """A higher level wrapper representing how an application is
2960 supposed to use sendfile().
2961 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002962 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002963 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002964 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002965 except OSError as err:
2966 if err.errno == errno.ECONNRESET:
2967 # disconnected
2968 raise
2969 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2970 # we have to retry send data
2971 continue
2972 else:
2973 raise
2974
2975 def test_send_whole_file(self):
2976 # normal send
2977 total_sent = 0
2978 offset = 0
2979 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002980 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002981 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2982 if sent == 0:
2983 break
2984 offset += sent
2985 total_sent += sent
2986 self.assertTrue(sent <= nbytes)
2987 self.assertEqual(offset, total_sent)
2988
2989 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002990 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002991 self.client.close()
2992 self.server.wait()
2993 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002994 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002995 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002996
2997 def test_send_at_certain_offset(self):
2998 # start sending a file at a certain offset
2999 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003000 offset = len(self.DATA) // 2
3001 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003002 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003003 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003004 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3005 if sent == 0:
3006 break
3007 offset += sent
3008 total_sent += sent
3009 self.assertTrue(sent <= nbytes)
3010
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003011 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003012 self.client.close()
3013 self.server.wait()
3014 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003015 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003016 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003017 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003018 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003019
3020 def test_offset_overflow(self):
3021 # specify an offset > file size
3022 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003023 try:
3024 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3025 except OSError as e:
3026 # Solaris can raise EINVAL if offset >= file length, ignore.
3027 if e.errno != errno.EINVAL:
3028 raise
3029 else:
3030 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003031 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003032 self.client.close()
3033 self.server.wait()
3034 data = self.server.handler_instance.get_data()
3035 self.assertEqual(data, b'')
3036
3037 def test_invalid_offset(self):
3038 with self.assertRaises(OSError) as cm:
3039 os.sendfile(self.sockno, self.fileno, -1, 4096)
3040 self.assertEqual(cm.exception.errno, errno.EINVAL)
3041
Martin Panterbf19d162015-09-09 01:01:13 +00003042 def test_keywords(self):
3043 # Keyword arguments should be supported
3044 os.sendfile(out=self.sockno, offset=0, count=4096,
3045 **{'in': self.fileno})
3046 if self.SUPPORT_HEADERS_TRAILERS:
3047 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003048 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003049
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003050 # --- headers / trailers tests
3051
Serhiy Storchaka43767632013-11-03 21:31:38 +02003052 @requires_headers_trailers
3053 def test_headers(self):
3054 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003055 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003056 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003057 headers=[b"x" * 512, b"y" * 256])
3058 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003059 total_sent += sent
3060 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003061 while total_sent < len(expected_data):
3062 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003063 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3064 offset, nbytes)
3065 if sent == 0:
3066 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003067 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003068 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003069 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003070
Serhiy Storchaka43767632013-11-03 21:31:38 +02003071 self.assertEqual(total_sent, len(expected_data))
3072 self.client.close()
3073 self.server.wait()
3074 data = self.server.handler_instance.get_data()
3075 self.assertEqual(hash(data), hash(expected_data))
3076
3077 @requires_headers_trailers
3078 def test_trailers(self):
3079 TESTFN2 = support.TESTFN + "2"
3080 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003081
3082 self.addCleanup(support.unlink, TESTFN2)
3083 create_file(TESTFN2, file_data)
3084
3085 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003086 os.sendfile(self.sockno, f.fileno(), 0, 5,
3087 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003088 self.client.close()
3089 self.server.wait()
3090 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003091 self.assertEqual(data, b"abcde123456789")
3092
3093 @requires_headers_trailers
3094 @requires_32b
3095 def test_headers_overflow_32bits(self):
3096 self.server.handler_instance.accumulate = False
3097 with self.assertRaises(OSError) as cm:
3098 os.sendfile(self.sockno, self.fileno, 0, 0,
3099 headers=[b"x" * 2**16] * 2**15)
3100 self.assertEqual(cm.exception.errno, errno.EINVAL)
3101
3102 @requires_headers_trailers
3103 @requires_32b
3104 def test_trailers_overflow_32bits(self):
3105 self.server.handler_instance.accumulate = False
3106 with self.assertRaises(OSError) as cm:
3107 os.sendfile(self.sockno, self.fileno, 0, 0,
3108 trailers=[b"x" * 2**16] * 2**15)
3109 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003110
Serhiy Storchaka43767632013-11-03 21:31:38 +02003111 @requires_headers_trailers
3112 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3113 'test needs os.SF_NODISKIO')
3114 def test_flags(self):
3115 try:
3116 os.sendfile(self.sockno, self.fileno, 0, 4096,
3117 flags=os.SF_NODISKIO)
3118 except OSError as err:
3119 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3120 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003121
3122
Larry Hastings9cf065c2012-06-22 16:30:09 -07003123def supports_extended_attributes():
3124 if not hasattr(os, "setxattr"):
3125 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003126
Larry Hastings9cf065c2012-06-22 16:30:09 -07003127 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003128 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003129 try:
3130 os.setxattr(fp.fileno(), b"user.test", b"")
3131 except OSError:
3132 return False
3133 finally:
3134 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003135
3136 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003137
3138
3139@unittest.skipUnless(supports_extended_attributes(),
3140 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003141# Kernels < 2.6.39 don't respect setxattr flags.
3142@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003143class ExtendedAttributeTests(unittest.TestCase):
3144
Larry Hastings9cf065c2012-06-22 16:30:09 -07003145 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003146 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003147 self.addCleanup(support.unlink, fn)
3148 create_file(fn)
3149
Benjamin Peterson799bd802011-08-31 22:15:17 -04003150 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003151 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003152 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003153
Victor Stinnerf12e5062011-10-16 22:12:03 +02003154 init_xattr = listxattr(fn)
3155 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003156
Larry Hastings9cf065c2012-06-22 16:30:09 -07003157 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003158 xattr = set(init_xattr)
3159 xattr.add("user.test")
3160 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003161 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3162 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3163 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003164
Benjamin Peterson799bd802011-08-31 22:15:17 -04003165 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003166 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003167 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003168
Benjamin Peterson799bd802011-08-31 22:15:17 -04003169 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003170 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003171 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003172
Larry Hastings9cf065c2012-06-22 16:30:09 -07003173 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003174 xattr.add("user.test2")
3175 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003176 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003177
Benjamin Peterson799bd802011-08-31 22:15:17 -04003178 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003179 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003180 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003181
Victor Stinnerf12e5062011-10-16 22:12:03 +02003182 xattr.remove("user.test")
3183 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003184 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3185 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3186 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3187 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003188 many = sorted("user.test{}".format(i) for i in range(100))
3189 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003190 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003191 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003192
Larry Hastings9cf065c2012-06-22 16:30:09 -07003193 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003194 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003195 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003196
3197 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3198 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003199
3200 def test_simple(self):
3201 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3202 os.listxattr)
3203
3204 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003205 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3206 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003207
3208 def test_fds(self):
3209 def getxattr(path, *args):
3210 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003211 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003212 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003213 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003214 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003215 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003216 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003217 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003218 def listxattr(path, *args):
3219 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003220 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003221 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3222
3223
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003224@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3225class TermsizeTests(unittest.TestCase):
3226 def test_does_not_crash(self):
3227 """Check if get_terminal_size() returns a meaningful value.
3228
3229 There's no easy portable way to actually check the size of the
3230 terminal, so let's check if it returns something sensible instead.
3231 """
3232 try:
3233 size = os.get_terminal_size()
3234 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003235 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003236 # Under win32 a generic OSError can be thrown if the
3237 # handle cannot be retrieved
3238 self.skipTest("failed to query terminal size")
3239 raise
3240
Antoine Pitroucfade362012-02-08 23:48:59 +01003241 self.assertGreaterEqual(size.columns, 0)
3242 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003243
3244 def test_stty_match(self):
3245 """Check if stty returns the same results
3246
3247 stty actually tests stdin, so get_terminal_size is invoked on
3248 stdin explicitly. If stty succeeded, then get_terminal_size()
3249 should work too.
3250 """
3251 try:
3252 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003253 except (FileNotFoundError, subprocess.CalledProcessError,
3254 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003255 self.skipTest("stty invocation failed")
3256 expected = (int(size[1]), int(size[0])) # reversed order
3257
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003258 try:
3259 actual = os.get_terminal_size(sys.__stdin__.fileno())
3260 except OSError as e:
3261 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3262 # Under win32 a generic OSError can be thrown if the
3263 # handle cannot be retrieved
3264 self.skipTest("failed to query terminal size")
3265 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003266 self.assertEqual(expected, actual)
3267
3268
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003269@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003270@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003271class MemfdCreateTests(unittest.TestCase):
3272 def test_memfd_create(self):
3273 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3274 self.assertNotEqual(fd, -1)
3275 self.addCleanup(os.close, fd)
3276 self.assertFalse(os.get_inheritable(fd))
3277 with open(fd, "wb", closefd=False) as f:
3278 f.write(b'memfd_create')
3279 self.assertEqual(f.tell(), 12)
3280
3281 fd2 = os.memfd_create("Hi")
3282 self.addCleanup(os.close, fd2)
3283 self.assertFalse(os.get_inheritable(fd2))
3284
3285
Victor Stinner292c8352012-10-30 02:17:38 +01003286class OSErrorTests(unittest.TestCase):
3287 def setUp(self):
3288 class Str(str):
3289 pass
3290
Victor Stinnerafe17062012-10-31 22:47:43 +01003291 self.bytes_filenames = []
3292 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003293 if support.TESTFN_UNENCODABLE is not None:
3294 decoded = support.TESTFN_UNENCODABLE
3295 else:
3296 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003297 self.unicode_filenames.append(decoded)
3298 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003299 if support.TESTFN_UNDECODABLE is not None:
3300 encoded = support.TESTFN_UNDECODABLE
3301 else:
3302 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003303 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003304 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003305 self.bytes_filenames.append(memoryview(encoded))
3306
3307 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003308
3309 def test_oserror_filename(self):
3310 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003311 (self.filenames, os.chdir,),
3312 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003313 (self.filenames, os.lstat,),
3314 (self.filenames, os.open, os.O_RDONLY),
3315 (self.filenames, os.rmdir,),
3316 (self.filenames, os.stat,),
3317 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003318 ]
3319 if sys.platform == "win32":
3320 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003321 (self.bytes_filenames, os.rename, b"dst"),
3322 (self.bytes_filenames, os.replace, b"dst"),
3323 (self.unicode_filenames, os.rename, "dst"),
3324 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003325 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003326 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003327 else:
3328 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003329 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003330 (self.filenames, os.rename, "dst"),
3331 (self.filenames, os.replace, "dst"),
3332 ))
3333 if hasattr(os, "chown"):
3334 funcs.append((self.filenames, os.chown, 0, 0))
3335 if hasattr(os, "lchown"):
3336 funcs.append((self.filenames, os.lchown, 0, 0))
3337 if hasattr(os, "truncate"):
3338 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003339 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003340 funcs.append((self.filenames, os.chflags, 0))
3341 if hasattr(os, "lchflags"):
3342 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003343 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003344 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003345 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003346 if sys.platform == "win32":
3347 funcs.append((self.bytes_filenames, os.link, b"dst"))
3348 funcs.append((self.unicode_filenames, os.link, "dst"))
3349 else:
3350 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003351 if hasattr(os, "listxattr"):
3352 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003353 (self.filenames, os.listxattr,),
3354 (self.filenames, os.getxattr, "user.test"),
3355 (self.filenames, os.setxattr, "user.test", b'user'),
3356 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003357 ))
3358 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003359 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003360 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003361 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003362
Steve Dowercc16be82016-09-08 10:35:16 -07003363
Victor Stinnerafe17062012-10-31 22:47:43 +01003364 for filenames, func, *func_args in funcs:
3365 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003366 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003367 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003368 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003369 else:
3370 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3371 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003372 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003373 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003374 except UnicodeDecodeError:
3375 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003376 else:
3377 self.fail("No exception thrown by {}".format(func))
3378
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003379class CPUCountTests(unittest.TestCase):
3380 def test_cpu_count(self):
3381 cpus = os.cpu_count()
3382 if cpus is not None:
3383 self.assertIsInstance(cpus, int)
3384 self.assertGreater(cpus, 0)
3385 else:
3386 self.skipTest("Could not determine the number of CPUs")
3387
Victor Stinnerdaf45552013-08-28 00:53:59 +02003388
3389class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003390 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003391 fd = os.open(__file__, os.O_RDONLY)
3392 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003393 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003394
Victor Stinnerdaf45552013-08-28 00:53:59 +02003395 os.set_inheritable(fd, True)
3396 self.assertEqual(os.get_inheritable(fd), True)
3397
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003398 @unittest.skipIf(fcntl is None, "need fcntl")
3399 def test_get_inheritable_cloexec(self):
3400 fd = os.open(__file__, os.O_RDONLY)
3401 self.addCleanup(os.close, fd)
3402 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003403
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003404 # clear FD_CLOEXEC flag
3405 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3406 flags &= ~fcntl.FD_CLOEXEC
3407 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003408
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003409 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003410
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003411 @unittest.skipIf(fcntl is None, "need fcntl")
3412 def test_set_inheritable_cloexec(self):
3413 fd = os.open(__file__, os.O_RDONLY)
3414 self.addCleanup(os.close, fd)
3415 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3416 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003417
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003418 os.set_inheritable(fd, True)
3419 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3420 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003421
Victor Stinnerdaf45552013-08-28 00:53:59 +02003422 def test_open(self):
3423 fd = os.open(__file__, os.O_RDONLY)
3424 self.addCleanup(os.close, fd)
3425 self.assertEqual(os.get_inheritable(fd), False)
3426
3427 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3428 def test_pipe(self):
3429 rfd, wfd = os.pipe()
3430 self.addCleanup(os.close, rfd)
3431 self.addCleanup(os.close, wfd)
3432 self.assertEqual(os.get_inheritable(rfd), False)
3433 self.assertEqual(os.get_inheritable(wfd), False)
3434
3435 def test_dup(self):
3436 fd1 = os.open(__file__, os.O_RDONLY)
3437 self.addCleanup(os.close, fd1)
3438
3439 fd2 = os.dup(fd1)
3440 self.addCleanup(os.close, fd2)
3441 self.assertEqual(os.get_inheritable(fd2), False)
3442
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003443 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3444 def test_dup_nul(self):
3445 # os.dup() was creating inheritable fds for character files.
3446 fd1 = os.open('NUL', os.O_RDONLY)
3447 self.addCleanup(os.close, fd1)
3448 fd2 = os.dup(fd1)
3449 self.addCleanup(os.close, fd2)
3450 self.assertFalse(os.get_inheritable(fd2))
3451
Victor Stinnerdaf45552013-08-28 00:53:59 +02003452 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3453 def test_dup2(self):
3454 fd = os.open(__file__, os.O_RDONLY)
3455 self.addCleanup(os.close, fd)
3456
3457 # inheritable by default
3458 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003459 self.addCleanup(os.close, fd2)
3460 self.assertEqual(os.dup2(fd, fd2), fd2)
3461 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003462
3463 # force non-inheritable
3464 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003465 self.addCleanup(os.close, fd3)
3466 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3467 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003468
3469 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3470 def test_openpty(self):
3471 master_fd, slave_fd = os.openpty()
3472 self.addCleanup(os.close, master_fd)
3473 self.addCleanup(os.close, slave_fd)
3474 self.assertEqual(os.get_inheritable(master_fd), False)
3475 self.assertEqual(os.get_inheritable(slave_fd), False)
3476
3477
Brett Cannon3f9183b2016-08-26 14:44:48 -07003478class PathTConverterTests(unittest.TestCase):
3479 # tuples of (function name, allows fd arguments, additional arguments to
3480 # function, cleanup function)
3481 functions = [
3482 ('stat', True, (), None),
3483 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003484 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003485 ('chflags', False, (0,), None),
3486 ('lchflags', False, (0,), None),
3487 ('open', False, (0,), getattr(os, 'close', None)),
3488 ]
3489
3490 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003491 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003492 if os.name == 'nt':
3493 bytes_fspath = bytes_filename = None
3494 else:
3495 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003496 bytes_fspath = FakePath(bytes_filename)
3497 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003498 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003499 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003500
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003501 int_fspath = FakePath(fd)
3502 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003503
3504 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3505 with self.subTest(name=name):
3506 try:
3507 fn = getattr(os, name)
3508 except AttributeError:
3509 continue
3510
Brett Cannon8f96a302016-08-26 19:30:11 -07003511 for path in (str_filename, bytes_filename, str_fspath,
3512 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003513 if path is None:
3514 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003515 with self.subTest(name=name, path=path):
3516 result = fn(path, *extra_args)
3517 if cleanup_fn is not None:
3518 cleanup_fn(result)
3519
3520 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003521 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003522 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003523
3524 if allow_fd:
3525 result = fn(fd, *extra_args) # should not fail
3526 if cleanup_fn is not None:
3527 cleanup_fn(result)
3528 else:
3529 with self.assertRaisesRegex(
3530 TypeError,
3531 'os.PathLike'):
3532 fn(fd, *extra_args)
3533
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003534 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003535 msg = r'__fspath__\(\) to return str or bytes, not %s'
3536 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003537 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003538 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003539 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003540 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003541 os.stat(FakePath(object()))
3542
Brett Cannon3f9183b2016-08-26 14:44:48 -07003543
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003544@unittest.skipUnless(hasattr(os, 'get_blocking'),
3545 'needs os.get_blocking() and os.set_blocking()')
3546class BlockingTests(unittest.TestCase):
3547 def test_blocking(self):
3548 fd = os.open(__file__, os.O_RDONLY)
3549 self.addCleanup(os.close, fd)
3550 self.assertEqual(os.get_blocking(fd), True)
3551
3552 os.set_blocking(fd, False)
3553 self.assertEqual(os.get_blocking(fd), False)
3554
3555 os.set_blocking(fd, True)
3556 self.assertEqual(os.get_blocking(fd), True)
3557
3558
Yury Selivanov97e2e062014-09-26 12:33:06 -04003559
3560class ExportsTests(unittest.TestCase):
3561 def test_os_all(self):
3562 self.assertIn('open', os.__all__)
3563 self.assertIn('walk', os.__all__)
3564
3565
Victor Stinner6036e442015-03-08 01:58:04 +01003566class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003567 check_no_resource_warning = support.check_no_resource_warning
3568
Victor Stinner6036e442015-03-08 01:58:04 +01003569 def setUp(self):
3570 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003571 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003572 self.addCleanup(support.rmtree, self.path)
3573 os.mkdir(self.path)
3574
3575 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003576 path = self.bytes_path if isinstance(name, bytes) else self.path
3577 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003578 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003579 return filename
3580
3581 def get_entries(self, names):
3582 entries = dict((entry.name, entry)
3583 for entry in os.scandir(self.path))
3584 self.assertEqual(sorted(entries.keys()), names)
3585 return entries
3586
3587 def assert_stat_equal(self, stat1, stat2, skip_fields):
3588 if skip_fields:
3589 for attr in dir(stat1):
3590 if not attr.startswith("st_"):
3591 continue
3592 if attr in ("st_dev", "st_ino", "st_nlink"):
3593 continue
3594 self.assertEqual(getattr(stat1, attr),
3595 getattr(stat2, attr),
3596 (stat1, stat2, attr))
3597 else:
3598 self.assertEqual(stat1, stat2)
3599
3600 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003601 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003602 self.assertEqual(entry.name, name)
3603 self.assertEqual(entry.path, os.path.join(self.path, name))
3604 self.assertEqual(entry.inode(),
3605 os.stat(entry.path, follow_symlinks=False).st_ino)
3606
3607 entry_stat = os.stat(entry.path)
3608 self.assertEqual(entry.is_dir(),
3609 stat.S_ISDIR(entry_stat.st_mode))
3610 self.assertEqual(entry.is_file(),
3611 stat.S_ISREG(entry_stat.st_mode))
3612 self.assertEqual(entry.is_symlink(),
3613 os.path.islink(entry.path))
3614
3615 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3616 self.assertEqual(entry.is_dir(follow_symlinks=False),
3617 stat.S_ISDIR(entry_lstat.st_mode))
3618 self.assertEqual(entry.is_file(follow_symlinks=False),
3619 stat.S_ISREG(entry_lstat.st_mode))
3620
3621 self.assert_stat_equal(entry.stat(),
3622 entry_stat,
3623 os.name == 'nt' and not is_symlink)
3624 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3625 entry_lstat,
3626 os.name == 'nt')
3627
3628 def test_attributes(self):
3629 link = hasattr(os, 'link')
3630 symlink = support.can_symlink()
3631
3632 dirname = os.path.join(self.path, "dir")
3633 os.mkdir(dirname)
3634 filename = self.create_file("file.txt")
3635 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003636 try:
3637 os.link(filename, os.path.join(self.path, "link_file.txt"))
3638 except PermissionError as e:
3639 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003640 if symlink:
3641 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3642 target_is_directory=True)
3643 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3644
3645 names = ['dir', 'file.txt']
3646 if link:
3647 names.append('link_file.txt')
3648 if symlink:
3649 names.extend(('symlink_dir', 'symlink_file.txt'))
3650 entries = self.get_entries(names)
3651
3652 entry = entries['dir']
3653 self.check_entry(entry, 'dir', True, False, False)
3654
3655 entry = entries['file.txt']
3656 self.check_entry(entry, 'file.txt', False, True, False)
3657
3658 if link:
3659 entry = entries['link_file.txt']
3660 self.check_entry(entry, 'link_file.txt', False, True, False)
3661
3662 if symlink:
3663 entry = entries['symlink_dir']
3664 self.check_entry(entry, 'symlink_dir', True, False, True)
3665
3666 entry = entries['symlink_file.txt']
3667 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3668
3669 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003670 path = self.bytes_path if isinstance(name, bytes) else self.path
3671 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003672 self.assertEqual(len(entries), 1)
3673
3674 entry = entries[0]
3675 self.assertEqual(entry.name, name)
3676 return entry
3677
Brett Cannon96881cd2016-06-10 14:37:21 -07003678 def create_file_entry(self, name='file.txt'):
3679 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003680 return self.get_entry(os.path.basename(filename))
3681
3682 def test_current_directory(self):
3683 filename = self.create_file()
3684 old_dir = os.getcwd()
3685 try:
3686 os.chdir(self.path)
3687
3688 # call scandir() without parameter: it must list the content
3689 # of the current directory
3690 entries = dict((entry.name, entry) for entry in os.scandir())
3691 self.assertEqual(sorted(entries.keys()),
3692 [os.path.basename(filename)])
3693 finally:
3694 os.chdir(old_dir)
3695
3696 def test_repr(self):
3697 entry = self.create_file_entry()
3698 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3699
Brett Cannon96881cd2016-06-10 14:37:21 -07003700 def test_fspath_protocol(self):
3701 entry = self.create_file_entry()
3702 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3703
3704 def test_fspath_protocol_bytes(self):
3705 bytes_filename = os.fsencode('bytesfile.txt')
3706 bytes_entry = self.create_file_entry(name=bytes_filename)
3707 fspath = os.fspath(bytes_entry)
3708 self.assertIsInstance(fspath, bytes)
3709 self.assertEqual(fspath,
3710 os.path.join(os.fsencode(self.path),bytes_filename))
3711
Victor Stinner6036e442015-03-08 01:58:04 +01003712 def test_removed_dir(self):
3713 path = os.path.join(self.path, 'dir')
3714
3715 os.mkdir(path)
3716 entry = self.get_entry('dir')
3717 os.rmdir(path)
3718
3719 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3720 if os.name == 'nt':
3721 self.assertTrue(entry.is_dir())
3722 self.assertFalse(entry.is_file())
3723 self.assertFalse(entry.is_symlink())
3724 if os.name == 'nt':
3725 self.assertRaises(FileNotFoundError, entry.inode)
3726 # don't fail
3727 entry.stat()
3728 entry.stat(follow_symlinks=False)
3729 else:
3730 self.assertGreater(entry.inode(), 0)
3731 self.assertRaises(FileNotFoundError, entry.stat)
3732 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3733
3734 def test_removed_file(self):
3735 entry = self.create_file_entry()
3736 os.unlink(entry.path)
3737
3738 self.assertFalse(entry.is_dir())
3739 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3740 if os.name == 'nt':
3741 self.assertTrue(entry.is_file())
3742 self.assertFalse(entry.is_symlink())
3743 if os.name == 'nt':
3744 self.assertRaises(FileNotFoundError, entry.inode)
3745 # don't fail
3746 entry.stat()
3747 entry.stat(follow_symlinks=False)
3748 else:
3749 self.assertGreater(entry.inode(), 0)
3750 self.assertRaises(FileNotFoundError, entry.stat)
3751 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3752
3753 def test_broken_symlink(self):
3754 if not support.can_symlink():
3755 return self.skipTest('cannot create symbolic link')
3756
3757 filename = self.create_file("file.txt")
3758 os.symlink(filename,
3759 os.path.join(self.path, "symlink.txt"))
3760 entries = self.get_entries(['file.txt', 'symlink.txt'])
3761 entry = entries['symlink.txt']
3762 os.unlink(filename)
3763
3764 self.assertGreater(entry.inode(), 0)
3765 self.assertFalse(entry.is_dir())
3766 self.assertFalse(entry.is_file()) # broken symlink returns False
3767 self.assertFalse(entry.is_dir(follow_symlinks=False))
3768 self.assertFalse(entry.is_file(follow_symlinks=False))
3769 self.assertTrue(entry.is_symlink())
3770 self.assertRaises(FileNotFoundError, entry.stat)
3771 # don't fail
3772 entry.stat(follow_symlinks=False)
3773
3774 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003775 self.create_file("file.txt")
3776
3777 path_bytes = os.fsencode(self.path)
3778 entries = list(os.scandir(path_bytes))
3779 self.assertEqual(len(entries), 1, entries)
3780 entry = entries[0]
3781
3782 self.assertEqual(entry.name, b'file.txt')
3783 self.assertEqual(entry.path,
3784 os.fsencode(os.path.join(self.path, 'file.txt')))
3785
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003786 def test_bytes_like(self):
3787 self.create_file("file.txt")
3788
3789 for cls in bytearray, memoryview:
3790 path_bytes = cls(os.fsencode(self.path))
3791 with self.assertWarns(DeprecationWarning):
3792 entries = list(os.scandir(path_bytes))
3793 self.assertEqual(len(entries), 1, entries)
3794 entry = entries[0]
3795
3796 self.assertEqual(entry.name, b'file.txt')
3797 self.assertEqual(entry.path,
3798 os.fsencode(os.path.join(self.path, 'file.txt')))
3799 self.assertIs(type(entry.name), bytes)
3800 self.assertIs(type(entry.path), bytes)
3801
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003802 @unittest.skipUnless(os.listdir in os.supports_fd,
3803 'fd support for listdir required for this test.')
3804 def test_fd(self):
3805 self.assertIn(os.scandir, os.supports_fd)
3806 self.create_file('file.txt')
3807 expected_names = ['file.txt']
3808 if support.can_symlink():
3809 os.symlink('file.txt', os.path.join(self.path, 'link'))
3810 expected_names.append('link')
3811
3812 fd = os.open(self.path, os.O_RDONLY)
3813 try:
3814 with os.scandir(fd) as it:
3815 entries = list(it)
3816 names = [entry.name for entry in entries]
3817 self.assertEqual(sorted(names), expected_names)
3818 self.assertEqual(names, os.listdir(fd))
3819 for entry in entries:
3820 self.assertEqual(entry.path, entry.name)
3821 self.assertEqual(os.fspath(entry), entry.name)
3822 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3823 if os.stat in os.supports_dir_fd:
3824 st = os.stat(entry.name, dir_fd=fd)
3825 self.assertEqual(entry.stat(), st)
3826 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3827 self.assertEqual(entry.stat(follow_symlinks=False), st)
3828 finally:
3829 os.close(fd)
3830
Victor Stinner6036e442015-03-08 01:58:04 +01003831 def test_empty_path(self):
3832 self.assertRaises(FileNotFoundError, os.scandir, '')
3833
3834 def test_consume_iterator_twice(self):
3835 self.create_file("file.txt")
3836 iterator = os.scandir(self.path)
3837
3838 entries = list(iterator)
3839 self.assertEqual(len(entries), 1, entries)
3840
3841 # check than consuming the iterator twice doesn't raise exception
3842 entries2 = list(iterator)
3843 self.assertEqual(len(entries2), 0, entries2)
3844
3845 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003846 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003847 self.assertRaises(TypeError, os.scandir, obj)
3848
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003849 def test_close(self):
3850 self.create_file("file.txt")
3851 self.create_file("file2.txt")
3852 iterator = os.scandir(self.path)
3853 next(iterator)
3854 iterator.close()
3855 # multiple closes
3856 iterator.close()
3857 with self.check_no_resource_warning():
3858 del iterator
3859
3860 def test_context_manager(self):
3861 self.create_file("file.txt")
3862 self.create_file("file2.txt")
3863 with os.scandir(self.path) as iterator:
3864 next(iterator)
3865 with self.check_no_resource_warning():
3866 del iterator
3867
3868 def test_context_manager_close(self):
3869 self.create_file("file.txt")
3870 self.create_file("file2.txt")
3871 with os.scandir(self.path) as iterator:
3872 next(iterator)
3873 iterator.close()
3874
3875 def test_context_manager_exception(self):
3876 self.create_file("file.txt")
3877 self.create_file("file2.txt")
3878 with self.assertRaises(ZeroDivisionError):
3879 with os.scandir(self.path) as iterator:
3880 next(iterator)
3881 1/0
3882 with self.check_no_resource_warning():
3883 del iterator
3884
3885 def test_resource_warning(self):
3886 self.create_file("file.txt")
3887 self.create_file("file2.txt")
3888 iterator = os.scandir(self.path)
3889 next(iterator)
3890 with self.assertWarns(ResourceWarning):
3891 del iterator
3892 support.gc_collect()
3893 # exhausted iterator
3894 iterator = os.scandir(self.path)
3895 list(iterator)
3896 with self.check_no_resource_warning():
3897 del iterator
3898
Victor Stinner6036e442015-03-08 01:58:04 +01003899
Ethan Furmancdc08792016-06-02 15:06:09 -07003900class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003901
3902 # Abstracted so it can be overridden to test pure Python implementation
3903 # if a C version is provided.
3904 fspath = staticmethod(os.fspath)
3905
Ethan Furmancdc08792016-06-02 15:06:09 -07003906 def test_return_bytes(self):
3907 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003908 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003909
3910 def test_return_string(self):
3911 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003912 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003913
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003914 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003915 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003916 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003917
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003918 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003919 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3920 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3921
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003922 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003923 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3924 self.assertTrue(issubclass(FakePath, os.PathLike))
3925 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003926
Ethan Furmancdc08792016-06-02 15:06:09 -07003927 def test_garbage_in_exception_out(self):
3928 vapor = type('blah', (), {})
3929 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003930 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003931
3932 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003933 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003934
Brett Cannon044283a2016-07-15 10:41:49 -07003935 def test_bad_pathlike(self):
3936 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003937 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003938 # __fspath__ attribute that is not callable.
3939 c = type('foo', (), {})
3940 c.__fspath__ = 1
3941 self.assertRaises(TypeError, self.fspath, c())
3942 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003943 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003944 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003945
Victor Stinnerc29b5852017-11-02 07:28:27 -07003946
3947class TimesTests(unittest.TestCase):
3948 def test_times(self):
3949 times = os.times()
3950 self.assertIsInstance(times, os.times_result)
3951
3952 for field in ('user', 'system', 'children_user', 'children_system',
3953 'elapsed'):
3954 value = getattr(times, field)
3955 self.assertIsInstance(value, float)
3956
3957 if os.name == 'nt':
3958 self.assertEqual(times.children_user, 0)
3959 self.assertEqual(times.children_system, 0)
3960 self.assertEqual(times.elapsed, 0)
3961
3962
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003963# Only test if the C version is provided, otherwise TestPEP519 already tested
3964# the pure Python implementation.
3965if hasattr(os, "_fspath"):
3966 class TestPEP519PurePython(TestPEP519):
3967
3968 """Explicitly test the pure Python implementation of os.fspath()."""
3969
3970 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003971
3972
Fred Drake2e2be372001-09-20 21:33:42 +00003973if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003974 unittest.main()