blob: 8ff0296fad0bb4cd519d57614b869ef55569083b [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200109 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Victor Stinner689830e2019-06-26 17:31:12 +0200144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Steve Dower772ec0f2019-09-04 14:42:54 -0700594 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
595 def test_stat_block_device(self):
596 # bpo-38030: os.stat fails for block devices
597 # Test a filename like "//./C:"
598 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
599 result = os.stat(fname)
600 self.assertEqual(result.st_mode, stat.S_IFBLK)
601
Victor Stinner47aacc82015-06-12 17:26:23 +0200602
603class UtimeTests(unittest.TestCase):
604 def setUp(self):
605 self.dirname = support.TESTFN
606 self.fname = os.path.join(self.dirname, "f1")
607
608 self.addCleanup(support.rmtree, self.dirname)
609 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100610 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200611
Victor Stinner47aacc82015-06-12 17:26:23 +0200612 def support_subsecond(self, filename):
613 # Heuristic to check if the filesystem supports timestamp with
614 # subsecond resolution: check if float and int timestamps are different
615 st = os.stat(filename)
616 return ((st.st_atime != st[7])
617 or (st.st_mtime != st[8])
618 or (st.st_ctime != st[9]))
619
620 def _test_utime(self, set_time, filename=None):
621 if not filename:
622 filename = self.fname
623
624 support_subsecond = self.support_subsecond(filename)
625 if support_subsecond:
626 # Timestamp with a resolution of 1 microsecond (10^-6).
627 #
628 # The resolution of the C internal function used by os.utime()
629 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
630 # test with a resolution of 1 ns requires more work:
631 # see the issue #15745.
632 atime_ns = 1002003000 # 1.002003 seconds
633 mtime_ns = 4005006000 # 4.005006 seconds
634 else:
635 # use a resolution of 1 second
636 atime_ns = 5 * 10**9
637 mtime_ns = 8 * 10**9
638
639 set_time(filename, (atime_ns, mtime_ns))
640 st = os.stat(filename)
641
642 if support_subsecond:
643 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
644 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
645 else:
646 self.assertEqual(st.st_atime, atime_ns * 1e-9)
647 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
648 self.assertEqual(st.st_atime_ns, atime_ns)
649 self.assertEqual(st.st_mtime_ns, mtime_ns)
650
651 def test_utime(self):
652 def set_time(filename, ns):
653 # test the ns keyword parameter
654 os.utime(filename, ns=ns)
655 self._test_utime(set_time)
656
657 @staticmethod
658 def ns_to_sec(ns):
659 # Convert a number of nanosecond (int) to a number of seconds (float).
660 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
661 # issue, os.utime() rounds towards minus infinity.
662 return (ns * 1e-9) + 0.5e-9
663
664 def test_utime_by_indexed(self):
665 # pass times as floating point seconds as the second indexed parameter
666 def set_time(filename, ns):
667 atime_ns, mtime_ns = ns
668 atime = self.ns_to_sec(atime_ns)
669 mtime = self.ns_to_sec(mtime_ns)
670 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
671 # or utime(time_t)
672 os.utime(filename, (atime, mtime))
673 self._test_utime(set_time)
674
675 def test_utime_by_times(self):
676 def set_time(filename, ns):
677 atime_ns, mtime_ns = ns
678 atime = self.ns_to_sec(atime_ns)
679 mtime = self.ns_to_sec(mtime_ns)
680 # test the times keyword parameter
681 os.utime(filename, times=(atime, mtime))
682 self._test_utime(set_time)
683
684 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
685 "follow_symlinks support for utime required "
686 "for this test.")
687 def test_utime_nofollow_symlinks(self):
688 def set_time(filename, ns):
689 # use follow_symlinks=False to test utimensat(timespec)
690 # or lutimes(timeval)
691 os.utime(filename, ns=ns, follow_symlinks=False)
692 self._test_utime(set_time)
693
694 @unittest.skipUnless(os.utime in os.supports_fd,
695 "fd support for utime required for this test.")
696 def test_utime_fd(self):
697 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100698 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200699 # use a file descriptor to test futimens(timespec)
700 # or futimes(timeval)
701 os.utime(fp.fileno(), ns=ns)
702 self._test_utime(set_time)
703
704 @unittest.skipUnless(os.utime in os.supports_dir_fd,
705 "dir_fd support for utime required for this test.")
706 def test_utime_dir_fd(self):
707 def set_time(filename, ns):
708 dirname, name = os.path.split(filename)
709 dirfd = os.open(dirname, os.O_RDONLY)
710 try:
711 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
712 os.utime(name, dir_fd=dirfd, ns=ns)
713 finally:
714 os.close(dirfd)
715 self._test_utime(set_time)
716
717 def test_utime_directory(self):
718 def set_time(filename, ns):
719 # test calling os.utime() on a directory
720 os.utime(filename, ns=ns)
721 self._test_utime(set_time, filename=self.dirname)
722
723 def _test_utime_current(self, set_time):
724 # Get the system clock
725 current = time.time()
726
727 # Call os.utime() to set the timestamp to the current system clock
728 set_time(self.fname)
729
730 if not self.support_subsecond(self.fname):
731 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700732 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200733 # On Windows, the usual resolution of time.time() is 15.6 ms.
734 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700735 #
736 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
737 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200738 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200739 st = os.stat(self.fname)
740 msg = ("st_time=%r, current=%r, dt=%r"
741 % (st.st_mtime, current, st.st_mtime - current))
742 self.assertAlmostEqual(st.st_mtime, current,
743 delta=delta, msg=msg)
744
745 def test_utime_current(self):
746 def set_time(filename):
747 # Set to the current time in the new way
748 os.utime(self.fname)
749 self._test_utime_current(set_time)
750
751 def test_utime_current_old(self):
752 def set_time(filename):
753 # Set to the current time in the old explicit way.
754 os.utime(self.fname, None)
755 self._test_utime_current(set_time)
756
757 def get_file_system(self, path):
758 if sys.platform == 'win32':
759 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
760 import ctypes
761 kernel32 = ctypes.windll.kernel32
762 buf = ctypes.create_unicode_buffer("", 100)
763 ok = kernel32.GetVolumeInformationW(root, None, 0,
764 None, None, None,
765 buf, len(buf))
766 if ok:
767 return buf.value
768 # return None if the filesystem is unknown
769
770 def test_large_time(self):
771 # Many filesystems are limited to the year 2038. At least, the test
772 # pass with NTFS filesystem.
773 if self.get_file_system(self.dirname) != "NTFS":
774 self.skipTest("requires NTFS")
775
776 large = 5000000000 # some day in 2128
777 os.utime(self.fname, (large, large))
778 self.assertEqual(os.stat(self.fname).st_mtime, large)
779
780 def test_utime_invalid_arguments(self):
781 # seconds and nanoseconds parameters are mutually exclusive
782 with self.assertRaises(ValueError):
783 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200784 with self.assertRaises(TypeError):
785 os.utime(self.fname, [5, 5])
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, (5,))
788 with self.assertRaises(TypeError):
789 os.utime(self.fname, (5, 5, 5))
790 with self.assertRaises(TypeError):
791 os.utime(self.fname, ns=[5, 5])
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, ns=(5,))
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, ns=(5, 5, 5))
796
797 if os.utime not in os.supports_follow_symlinks:
798 with self.assertRaises(NotImplementedError):
799 os.utime(self.fname, (5, 5), follow_symlinks=False)
800 if os.utime not in os.supports_fd:
801 with open(self.fname, 'wb', 0) as fp:
802 with self.assertRaises(TypeError):
803 os.utime(fp.fileno(), (5, 5))
804 if os.utime not in os.supports_dir_fd:
805 with self.assertRaises(NotImplementedError):
806 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200807
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300808 @support.cpython_only
809 def test_issue31577(self):
810 # The interpreter shouldn't crash in case utime() received a bad
811 # ns argument.
812 def get_bad_int(divmod_ret_val):
813 class BadInt:
814 def __divmod__(*args):
815 return divmod_ret_val
816 return BadInt()
817 with self.assertRaises(TypeError):
818 os.utime(self.fname, ns=(get_bad_int(42), 1))
819 with self.assertRaises(TypeError):
820 os.utime(self.fname, ns=(get_bad_int(()), 1))
821 with self.assertRaises(TypeError):
822 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
823
Victor Stinner47aacc82015-06-12 17:26:23 +0200824
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000825from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000826
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000827class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000828 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000829 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000830
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000831 def setUp(self):
832 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000835 for key, value in self._reference().items():
836 os.environ[key] = value
837
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000838 def tearDown(self):
839 os.environ.clear()
840 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000841 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000842 os.environb.clear()
843 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000844
Christian Heimes90333392007-11-01 19:08:42 +0000845 def _reference(self):
846 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
847
848 def _empty_mapping(self):
849 os.environ.clear()
850 return os.environ
851
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000852 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000855 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000856 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300857 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200858 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 value = popen.read().strip()
860 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000861
Xavier de Gayed1415312016-07-22 12:15:29 +0200862 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
863 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000864 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200865 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
866 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300867 it = iter(popen)
868 self.assertEqual(next(it), "line1\n")
869 self.assertEqual(next(it), "line2\n")
870 self.assertEqual(next(it), "line3\n")
871 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000872
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000873 # Verify environ keys and values from the OS are of the
874 # correct str type.
875 def test_keyvalue_types(self):
876 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000877 self.assertEqual(type(key), str)
878 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000879
Christian Heimes90333392007-11-01 19:08:42 +0000880 def test_items(self):
881 for key, value in self._reference().items():
882 self.assertEqual(os.environ.get(key), value)
883
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000884 # Issue 7310
885 def test___repr__(self):
886 """Check that the repr() of os.environ looks like environ({...})."""
887 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000888 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
889 '{!r}: {!r}'.format(key, value)
890 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000892 def test_get_exec_path(self):
893 defpath_list = os.defpath.split(os.pathsep)
894 test_path = ['/monty', '/python', '', '/flying/circus']
895 test_env = {'PATH': os.pathsep.join(test_path)}
896
897 saved_environ = os.environ
898 try:
899 os.environ = dict(test_env)
900 # Test that defaulting to os.environ works.
901 self.assertSequenceEqual(test_path, os.get_exec_path())
902 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
903 finally:
904 os.environ = saved_environ
905
906 # No PATH environment variable
907 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
908 # Empty PATH environment variable
909 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
910 # Supplied PATH environment variable
911 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
912
Victor Stinnerb745a742010-05-18 17:17:23 +0000913 if os.supports_bytes_environ:
914 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000915 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000916 # ignore BytesWarning warning
917 with warnings.catch_warnings(record=True):
918 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000919 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000920 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000921 pass
922 else:
923 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000924
925 # bytes key and/or value
926 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
927 ['abc'])
928 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
929 ['abc'])
930 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
931 ['abc'])
932
933 @unittest.skipUnless(os.supports_bytes_environ,
934 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000935 def test_environb(self):
936 # os.environ -> os.environb
937 value = 'euro\u20ac'
938 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000939 value_bytes = value.encode(sys.getfilesystemencoding(),
940 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000941 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000942 msg = "U+20AC character is not encodable to %s" % (
943 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000944 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['unicode'], value)
947 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000948
949 # os.environb -> os.environ
950 value = b'\xff'
951 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000953 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000955
Victor Stinner13ff2452018-01-22 18:32:50 +0100956 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100957 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100958 def test_unset_error(self):
959 if sys.platform == "win32":
960 # an environment variable is limited to 32,767 characters
961 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100962 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100963 else:
964 # "=" is not allowed in a variable name
965 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100966 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100967
Victor Stinner6d101392013-04-14 16:35:04 +0200968 def test_key_type(self):
969 missing = 'missingkey'
970 self.assertNotIn(missing, os.environ)
971
Victor Stinner839e5ea2013-04-14 16:43:03 +0200972 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200973 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200974 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200975 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200976
Victor Stinner839e5ea2013-04-14 16:43:03 +0200977 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200978 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200979 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200980 self.assertTrue(cm.exception.__suppress_context__)
981
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300982 def _test_environ_iteration(self, collection):
983 iterator = iter(collection)
984 new_key = "__new_key__"
985
986 next(iterator) # start iteration over os.environ.items
987
988 # add a new key in os.environ mapping
989 os.environ[new_key] = "test_environ_iteration"
990
991 try:
992 next(iterator) # force iteration over modified mapping
993 self.assertEqual(os.environ[new_key], "test_environ_iteration")
994 finally:
995 del os.environ[new_key]
996
997 def test_iter_error_when_changing_os_environ(self):
998 self._test_environ_iteration(os.environ)
999
1000 def test_iter_error_when_changing_os_environ_items(self):
1001 self._test_environ_iteration(os.environ.items())
1002
1003 def test_iter_error_when_changing_os_environ_values(self):
1004 self._test_environ_iteration(os.environ.values())
1005
Victor Stinner6d101392013-04-14 16:35:04 +02001006
Tim Petersc4e09402003-04-25 07:11:48 +00001007class WalkTests(unittest.TestCase):
1008 """Tests for os.walk()."""
1009
Victor Stinner0561c532015-03-12 10:28:24 +01001010 # Wrapper to hide minor differences between os.walk and os.fwalk
1011 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001012 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001013 if 'follow_symlinks' in kwargs:
1014 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001015 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001016
Charles-François Natali7372b062012-02-05 15:15:38 +01001017 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001018 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001019 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001020
1021 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001022 # TESTFN/
1023 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001024 # tmp1
1025 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001026 # tmp2
1027 # SUB11/ no kids
1028 # SUB2/ a file kid and a dirsymlink kid
1029 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001030 # SUB21/ not readable
1031 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001032 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001033 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001034 # broken_link2
1035 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001036 # TEST2/
1037 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001038 self.walk_path = join(support.TESTFN, "TEST1")
1039 self.sub1_path = join(self.walk_path, "SUB1")
1040 self.sub11_path = join(self.sub1_path, "SUB11")
1041 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001042 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001043 tmp1_path = join(self.walk_path, "tmp1")
1044 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001045 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001046 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001047 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001048 t2_path = join(support.TESTFN, "TEST2")
1049 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001050 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001051 broken_link2_path = join(sub2_path, "broken_link2")
1052 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001053
1054 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001055 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001056 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001057 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001058 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001059
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001060 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001061 with open(path, "x") as f:
1062 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001063
Victor Stinner0561c532015-03-12 10:28:24 +01001064 if support.can_symlink():
1065 os.symlink(os.path.abspath(t2_path), self.link_path)
1066 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001067 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1068 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001069 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001070 ["broken_link", "broken_link2", "broken_link3",
1071 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001072 else:
pxinwr3e028b22019-02-15 13:04:47 +08001073 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001074
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001075 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001076 try:
1077 os.listdir(sub21_path)
1078 except PermissionError:
1079 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1080 else:
1081 os.chmod(sub21_path, stat.S_IRWXU)
1082 os.unlink(tmp5_path)
1083 os.rmdir(sub21_path)
1084 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001085
Victor Stinner0561c532015-03-12 10:28:24 +01001086 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001087 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001088 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001089
Tim Petersc4e09402003-04-25 07:11:48 +00001090 self.assertEqual(len(all), 4)
1091 # We can't know which order SUB1 and SUB2 will appear in.
1092 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1093 # flipped: TESTFN, SUB2, SUB1, SUB11
1094 flipped = all[0][1][0] != "SUB1"
1095 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001096 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001097 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001098 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1099 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1100 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1101 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001102
Brett Cannon3f9183b2016-08-26 14:44:48 -07001103 def test_walk_prune(self, walk_path=None):
1104 if walk_path is None:
1105 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001106 # Prune the search.
1107 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001108 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001109 all.append((root, dirs, files))
1110 # Don't descend into SUB1.
1111 if 'SUB1' in dirs:
1112 # Note that this also mutates the dirs we appended to all!
1113 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001114
Victor Stinner0561c532015-03-12 10:28:24 +01001115 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001116 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001117
1118 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001119 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001120 self.assertEqual(all[1], self.sub2_tree)
1121
Brett Cannon3f9183b2016-08-26 14:44:48 -07001122 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001123 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001124
Victor Stinner0561c532015-03-12 10:28:24 +01001125 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001126 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001127 all = list(self.walk(self.walk_path, topdown=False))
1128
Victor Stinner53b0a412016-03-26 01:12:36 +01001129 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001130 # We can't know which order SUB1 and SUB2 will appear in.
1131 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1132 # flipped: SUB2, SUB11, SUB1, TESTFN
1133 flipped = all[3][1][0] != "SUB1"
1134 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001135 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001136 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001137 self.assertEqual(all[3],
1138 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1139 self.assertEqual(all[flipped],
1140 (self.sub11_path, [], []))
1141 self.assertEqual(all[flipped + 1],
1142 (self.sub1_path, ["SUB11"], ["tmp2"]))
1143 self.assertEqual(all[2 - 2 * flipped],
1144 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001145
Victor Stinner0561c532015-03-12 10:28:24 +01001146 def test_walk_symlink(self):
1147 if not support.can_symlink():
1148 self.skipTest("need symlink support")
1149
1150 # Walk, following symlinks.
1151 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1152 for root, dirs, files in walk_it:
1153 if root == self.link_path:
1154 self.assertEqual(dirs, [])
1155 self.assertEqual(files, ["tmp4"])
1156 break
1157 else:
1158 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001159
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001160 def test_walk_bad_dir(self):
1161 # Walk top-down.
1162 errors = []
1163 walk_it = self.walk(self.walk_path, onerror=errors.append)
1164 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001165 self.assertEqual(errors, [])
1166 dir1 = 'SUB1'
1167 path1 = os.path.join(root, dir1)
1168 path1new = os.path.join(root, dir1 + '.new')
1169 os.rename(path1, path1new)
1170 try:
1171 roots = [r for r, d, f in walk_it]
1172 self.assertTrue(errors)
1173 self.assertNotIn(path1, roots)
1174 self.assertNotIn(path1new, roots)
1175 for dir2 in dirs:
1176 if dir2 != dir1:
1177 self.assertIn(os.path.join(root, dir2), roots)
1178 finally:
1179 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001180
Charles-François Natali7372b062012-02-05 15:15:38 +01001181
1182@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1183class FwalkTests(WalkTests):
1184 """Tests for os.fwalk()."""
1185
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001186 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001187 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001188 yield (root, dirs, files)
1189
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001190 def fwalk(self, *args, **kwargs):
1191 return os.fwalk(*args, **kwargs)
1192
Larry Hastingsc48fe982012-06-25 04:49:05 -07001193 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1194 """
1195 compare with walk() results.
1196 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001197 walk_kwargs = walk_kwargs.copy()
1198 fwalk_kwargs = fwalk_kwargs.copy()
1199 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1200 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1201 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001202
Charles-François Natali7372b062012-02-05 15:15:38 +01001203 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001204 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001205 expected[root] = (set(dirs), set(files))
1206
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001207 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001208 self.assertIn(root, expected)
1209 self.assertEqual(expected[root], (set(dirs), set(files)))
1210
Larry Hastingsc48fe982012-06-25 04:49:05 -07001211 def test_compare_to_walk(self):
1212 kwargs = {'top': support.TESTFN}
1213 self._compare_to_walk(kwargs, kwargs)
1214
Charles-François Natali7372b062012-02-05 15:15:38 +01001215 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001216 try:
1217 fd = os.open(".", os.O_RDONLY)
1218 walk_kwargs = {'top': support.TESTFN}
1219 fwalk_kwargs = walk_kwargs.copy()
1220 fwalk_kwargs['dir_fd'] = fd
1221 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1222 finally:
1223 os.close(fd)
1224
1225 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001226 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001227 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1228 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001229 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001230 # check that the FD is valid
1231 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001232 # redundant check
1233 os.stat(rootfd)
1234 # check that listdir() returns consistent information
1235 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001236
1237 def test_fd_leak(self):
1238 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1239 # we both check that calling fwalk() a large number of times doesn't
1240 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1241 minfd = os.dup(1)
1242 os.close(minfd)
1243 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001244 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001245 pass
1246 newfd = os.dup(1)
1247 self.addCleanup(os.close, newfd)
1248 self.assertEqual(newfd, minfd)
1249
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001250class BytesWalkTests(WalkTests):
1251 """Tests for os.walk() with bytes."""
1252 def walk(self, top, **kwargs):
1253 if 'follow_symlinks' in kwargs:
1254 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1255 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1256 root = os.fsdecode(broot)
1257 dirs = list(map(os.fsdecode, bdirs))
1258 files = list(map(os.fsdecode, bfiles))
1259 yield (root, dirs, files)
1260 bdirs[:] = list(map(os.fsencode, dirs))
1261 bfiles[:] = list(map(os.fsencode, files))
1262
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001263@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1264class BytesFwalkTests(FwalkTests):
1265 """Tests for os.walk() with bytes."""
1266 def fwalk(self, top='.', *args, **kwargs):
1267 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1268 root = os.fsdecode(broot)
1269 dirs = list(map(os.fsdecode, bdirs))
1270 files = list(map(os.fsdecode, bfiles))
1271 yield (root, dirs, files, topfd)
1272 bdirs[:] = list(map(os.fsencode, dirs))
1273 bfiles[:] = list(map(os.fsencode, files))
1274
Charles-François Natali7372b062012-02-05 15:15:38 +01001275
Guido van Rossume7ba4952007-06-06 23:52:48 +00001276class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001277 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001278 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001279
1280 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001281 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001282 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1283 os.makedirs(path) # Should work
1284 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1285 os.makedirs(path)
1286
1287 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001288 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001289 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1290 os.makedirs(path)
1291 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1292 'dir5', 'dir6')
1293 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001294
Serhiy Storchakae304e332017-03-24 13:27:42 +02001295 def test_mode(self):
1296 with support.temp_umask(0o002):
1297 base = support.TESTFN
1298 parent = os.path.join(base, 'dir1')
1299 path = os.path.join(parent, 'dir2')
1300 os.makedirs(path, 0o555)
1301 self.assertTrue(os.path.exists(path))
1302 self.assertTrue(os.path.isdir(path))
1303 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001304 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1305 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001306
Terry Reedy5a22b652010-12-02 07:05:56 +00001307 def test_exist_ok_existing_directory(self):
1308 path = os.path.join(support.TESTFN, 'dir1')
1309 mode = 0o777
1310 old_mask = os.umask(0o022)
1311 os.makedirs(path, mode)
1312 self.assertRaises(OSError, os.makedirs, path, mode)
1313 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001314 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001315 os.makedirs(path, mode=mode, exist_ok=True)
1316 os.umask(old_mask)
1317
Martin Pantera82642f2015-11-19 04:48:44 +00001318 # Issue #25583: A drive root could raise PermissionError on Windows
1319 os.makedirs(os.path.abspath('/'), exist_ok=True)
1320
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001321 def test_exist_ok_s_isgid_directory(self):
1322 path = os.path.join(support.TESTFN, 'dir1')
1323 S_ISGID = stat.S_ISGID
1324 mode = 0o777
1325 old_mask = os.umask(0o022)
1326 try:
1327 existing_testfn_mode = stat.S_IMODE(
1328 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001329 try:
1330 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001331 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001332 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001333 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1334 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1335 # The os should apply S_ISGID from the parent dir for us, but
1336 # this test need not depend on that behavior. Be explicit.
1337 os.makedirs(path, mode | S_ISGID)
1338 # http://bugs.python.org/issue14992
1339 # Should not fail when the bit is already set.
1340 os.makedirs(path, mode, exist_ok=True)
1341 # remove the bit.
1342 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001343 # May work even when the bit is not already set when demanded.
1344 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001345 finally:
1346 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001347
1348 def test_exist_ok_existing_regular_file(self):
1349 base = support.TESTFN
1350 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001351 with open(path, 'w') as f:
1352 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001353 self.assertRaises(OSError, os.makedirs, path)
1354 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1355 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1356 os.remove(path)
1357
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001358 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001359 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001360 'dir4', 'dir5', 'dir6')
1361 # If the tests failed, the bottom-most directory ('../dir6')
1362 # may not have been created, so we look for the outermost directory
1363 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001364 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001365 path = os.path.dirname(path)
1366
1367 os.removedirs(path)
1368
Andrew Svetlov405faed2012-12-25 12:18:09 +02001369
R David Murrayf2ad1732014-12-25 18:36:56 -05001370@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1371class ChownFileTests(unittest.TestCase):
1372
Berker Peksag036a71b2015-07-21 09:29:48 +03001373 @classmethod
1374 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001375 os.mkdir(support.TESTFN)
1376
1377 def test_chown_uid_gid_arguments_must_be_index(self):
1378 stat = os.stat(support.TESTFN)
1379 uid = stat.st_uid
1380 gid = stat.st_gid
1381 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1382 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1383 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1384 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1385 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1386
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001387 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1388 def test_chown_gid(self):
1389 groups = os.getgroups()
1390 if len(groups) < 2:
1391 self.skipTest("test needs at least 2 groups")
1392
R David Murrayf2ad1732014-12-25 18:36:56 -05001393 gid_1, gid_2 = groups[:2]
1394 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001395
R David Murrayf2ad1732014-12-25 18:36:56 -05001396 os.chown(support.TESTFN, uid, gid_1)
1397 gid = os.stat(support.TESTFN).st_gid
1398 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001399
R David Murrayf2ad1732014-12-25 18:36:56 -05001400 os.chown(support.TESTFN, uid, gid_2)
1401 gid = os.stat(support.TESTFN).st_gid
1402 self.assertEqual(gid, gid_2)
1403
1404 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1405 "test needs root privilege and more than one user")
1406 def test_chown_with_root(self):
1407 uid_1, uid_2 = all_users[:2]
1408 gid = os.stat(support.TESTFN).st_gid
1409 os.chown(support.TESTFN, uid_1, gid)
1410 uid = os.stat(support.TESTFN).st_uid
1411 self.assertEqual(uid, uid_1)
1412 os.chown(support.TESTFN, uid_2, gid)
1413 uid = os.stat(support.TESTFN).st_uid
1414 self.assertEqual(uid, uid_2)
1415
1416 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1417 "test needs non-root account and more than one user")
1418 def test_chown_without_permission(self):
1419 uid_1, uid_2 = all_users[:2]
1420 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001421 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001422 os.chown(support.TESTFN, uid_1, gid)
1423 os.chown(support.TESTFN, uid_2, gid)
1424
Berker Peksag036a71b2015-07-21 09:29:48 +03001425 @classmethod
1426 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001427 os.rmdir(support.TESTFN)
1428
1429
Andrew Svetlov405faed2012-12-25 12:18:09 +02001430class RemoveDirsTests(unittest.TestCase):
1431 def setUp(self):
1432 os.makedirs(support.TESTFN)
1433
1434 def tearDown(self):
1435 support.rmtree(support.TESTFN)
1436
1437 def test_remove_all(self):
1438 dira = os.path.join(support.TESTFN, 'dira')
1439 os.mkdir(dira)
1440 dirb = os.path.join(dira, 'dirb')
1441 os.mkdir(dirb)
1442 os.removedirs(dirb)
1443 self.assertFalse(os.path.exists(dirb))
1444 self.assertFalse(os.path.exists(dira))
1445 self.assertFalse(os.path.exists(support.TESTFN))
1446
1447 def test_remove_partial(self):
1448 dira = os.path.join(support.TESTFN, 'dira')
1449 os.mkdir(dira)
1450 dirb = os.path.join(dira, 'dirb')
1451 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001452 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001453 os.removedirs(dirb)
1454 self.assertFalse(os.path.exists(dirb))
1455 self.assertTrue(os.path.exists(dira))
1456 self.assertTrue(os.path.exists(support.TESTFN))
1457
1458 def test_remove_nothing(self):
1459 dira = os.path.join(support.TESTFN, 'dira')
1460 os.mkdir(dira)
1461 dirb = os.path.join(dira, 'dirb')
1462 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001463 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001464 with self.assertRaises(OSError):
1465 os.removedirs(dirb)
1466 self.assertTrue(os.path.exists(dirb))
1467 self.assertTrue(os.path.exists(dira))
1468 self.assertTrue(os.path.exists(support.TESTFN))
1469
1470
Guido van Rossume7ba4952007-06-06 23:52:48 +00001471class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001472 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001473 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001474 f.write(b'hello')
1475 f.close()
1476 with open(os.devnull, 'rb') as f:
1477 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001478
Andrew Svetlov405faed2012-12-25 12:18:09 +02001479
Guido van Rossume7ba4952007-06-06 23:52:48 +00001480class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001481 def test_urandom_length(self):
1482 self.assertEqual(len(os.urandom(0)), 0)
1483 self.assertEqual(len(os.urandom(1)), 1)
1484 self.assertEqual(len(os.urandom(10)), 10)
1485 self.assertEqual(len(os.urandom(100)), 100)
1486 self.assertEqual(len(os.urandom(1000)), 1000)
1487
1488 def test_urandom_value(self):
1489 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001490 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001491 data2 = os.urandom(16)
1492 self.assertNotEqual(data1, data2)
1493
1494 def get_urandom_subprocess(self, count):
1495 code = '\n'.join((
1496 'import os, sys',
1497 'data = os.urandom(%s)' % count,
1498 'sys.stdout.buffer.write(data)',
1499 'sys.stdout.buffer.flush()'))
1500 out = assert_python_ok('-c', code)
1501 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001502 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001503 return stdout
1504
1505 def test_urandom_subprocess(self):
1506 data1 = self.get_urandom_subprocess(16)
1507 data2 = self.get_urandom_subprocess(16)
1508 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001509
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001510
Victor Stinner9b1f4742016-09-06 16:18:52 -07001511@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1512class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001513 @classmethod
1514 def setUpClass(cls):
1515 try:
1516 os.getrandom(1)
1517 except OSError as exc:
1518 if exc.errno == errno.ENOSYS:
1519 # Python compiled on a more recent Linux version
1520 # than the current Linux kernel
1521 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1522 else:
1523 raise
1524
Victor Stinner9b1f4742016-09-06 16:18:52 -07001525 def test_getrandom_type(self):
1526 data = os.getrandom(16)
1527 self.assertIsInstance(data, bytes)
1528 self.assertEqual(len(data), 16)
1529
1530 def test_getrandom0(self):
1531 empty = os.getrandom(0)
1532 self.assertEqual(empty, b'')
1533
1534 def test_getrandom_random(self):
1535 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1536
1537 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1538 # resource /dev/random
1539
1540 def test_getrandom_nonblock(self):
1541 # The call must not fail. Check also that the flag exists
1542 try:
1543 os.getrandom(1, os.GRND_NONBLOCK)
1544 except BlockingIOError:
1545 # System urandom is not initialized yet
1546 pass
1547
1548 def test_getrandom_value(self):
1549 data1 = os.getrandom(16)
1550 data2 = os.getrandom(16)
1551 self.assertNotEqual(data1, data2)
1552
1553
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001554# os.urandom() doesn't use a file descriptor when it is implemented with the
1555# getentropy() function, the getrandom() function or the getrandom() syscall
1556OS_URANDOM_DONT_USE_FD = (
1557 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1558 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1559 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001560
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001561@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1562 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001563@unittest.skipIf(sys.platform == "vxworks",
1564 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001565class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001566 @unittest.skipUnless(resource, "test requires the resource module")
1567 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001568 # Check urandom() failing when it is not able to open /dev/random.
1569 # We spawn a new process to make the test more robust (if getrlimit()
1570 # failed to restore the file descriptor limit after this, the whole
1571 # test suite would crash; this actually happened on the OS X Tiger
1572 # buildbot).
1573 code = """if 1:
1574 import errno
1575 import os
1576 import resource
1577
1578 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1579 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1580 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001581 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001582 except OSError as e:
1583 assert e.errno == errno.EMFILE, e.errno
1584 else:
1585 raise AssertionError("OSError not raised")
1586 """
1587 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001588
Antoine Pitroue472aea2014-04-26 14:33:03 +02001589 def test_urandom_fd_closed(self):
1590 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1591 # closed.
1592 code = """if 1:
1593 import os
1594 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001595 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001596 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001597 with test.support.SuppressCrashReport():
1598 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001599 sys.stdout.buffer.write(os.urandom(4))
1600 """
1601 rc, out, err = assert_python_ok('-Sc', code)
1602
1603 def test_urandom_fd_reopened(self):
1604 # Issue #21207: urandom() should detect its fd to /dev/urandom
1605 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001606 self.addCleanup(support.unlink, support.TESTFN)
1607 create_file(support.TESTFN, b"x" * 256)
1608
Antoine Pitroue472aea2014-04-26 14:33:03 +02001609 code = """if 1:
1610 import os
1611 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001612 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001613 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001614 with test.support.SuppressCrashReport():
1615 for fd in range(3, 256):
1616 try:
1617 os.close(fd)
1618 except OSError:
1619 pass
1620 else:
1621 # Found the urandom fd (XXX hopefully)
1622 break
1623 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001624 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001625 new_fd = f.fileno()
1626 # Issue #26935: posix allows new_fd and fd to be equal but
1627 # some libc implementations have dup2 return an error in this
1628 # case.
1629 if new_fd != fd:
1630 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001631 sys.stdout.buffer.write(os.urandom(4))
1632 sys.stdout.buffer.write(os.urandom(4))
1633 """.format(TESTFN=support.TESTFN)
1634 rc, out, err = assert_python_ok('-Sc', code)
1635 self.assertEqual(len(out), 8)
1636 self.assertNotEqual(out[0:4], out[4:8])
1637 rc, out2, err2 = assert_python_ok('-Sc', code)
1638 self.assertEqual(len(out2), 8)
1639 self.assertNotEqual(out2, out)
1640
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001641
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001642@contextlib.contextmanager
1643def _execvpe_mockup(defpath=None):
1644 """
1645 Stubs out execv and execve functions when used as context manager.
1646 Records exec calls. The mock execv and execve functions always raise an
1647 exception as they would normally never return.
1648 """
1649 # A list of tuples containing (function name, first arg, args)
1650 # of calls to execv or execve that have been made.
1651 calls = []
1652
1653 def mock_execv(name, *args):
1654 calls.append(('execv', name, args))
1655 raise RuntimeError("execv called")
1656
1657 def mock_execve(name, *args):
1658 calls.append(('execve', name, args))
1659 raise OSError(errno.ENOTDIR, "execve called")
1660
1661 try:
1662 orig_execv = os.execv
1663 orig_execve = os.execve
1664 orig_defpath = os.defpath
1665 os.execv = mock_execv
1666 os.execve = mock_execve
1667 if defpath is not None:
1668 os.defpath = defpath
1669 yield calls
1670 finally:
1671 os.execv = orig_execv
1672 os.execve = orig_execve
1673 os.defpath = orig_defpath
1674
pxinwrf2d7ac72019-05-21 18:46:37 +08001675@unittest.skipUnless(hasattr(os, 'execv'),
1676 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001677class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001678 @unittest.skipIf(USING_LINUXTHREADS,
1679 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001680 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001681 self.assertRaises(OSError, os.execvpe, 'no such app-',
1682 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001683
Steve Dowerbce26262016-11-19 19:17:26 -08001684 def test_execv_with_bad_arglist(self):
1685 self.assertRaises(ValueError, os.execv, 'notepad', ())
1686 self.assertRaises(ValueError, os.execv, 'notepad', [])
1687 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1688 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1689
Thomas Heller6790d602007-08-30 17:15:14 +00001690 def test_execvpe_with_bad_arglist(self):
1691 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001692 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1693 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001694
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001695 @unittest.skipUnless(hasattr(os, '_execvpe'),
1696 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001697 def _test_internal_execvpe(self, test_type):
1698 program_path = os.sep + 'absolutepath'
1699 if test_type is bytes:
1700 program = b'executable'
1701 fullpath = os.path.join(os.fsencode(program_path), program)
1702 native_fullpath = fullpath
1703 arguments = [b'progname', 'arg1', 'arg2']
1704 else:
1705 program = 'executable'
1706 arguments = ['progname', 'arg1', 'arg2']
1707 fullpath = os.path.join(program_path, program)
1708 if os.name != "nt":
1709 native_fullpath = os.fsencode(fullpath)
1710 else:
1711 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001712 env = {'spam': 'beans'}
1713
Victor Stinnerb745a742010-05-18 17:17:23 +00001714 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001715 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001716 self.assertRaises(RuntimeError,
1717 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001718 self.assertEqual(len(calls), 1)
1719 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1720
Victor Stinnerb745a742010-05-18 17:17:23 +00001721 # test os._execvpe() with a relative path:
1722 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001723 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001724 self.assertRaises(OSError,
1725 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001726 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001727 self.assertSequenceEqual(calls[0],
1728 ('execve', native_fullpath, (arguments, env)))
1729
1730 # test os._execvpe() with a relative path:
1731 # os.get_exec_path() reads the 'PATH' variable
1732 with _execvpe_mockup() as calls:
1733 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001734 if test_type is bytes:
1735 env_path[b'PATH'] = program_path
1736 else:
1737 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001738 self.assertRaises(OSError,
1739 os._execvpe, program, arguments, env=env_path)
1740 self.assertEqual(len(calls), 1)
1741 self.assertSequenceEqual(calls[0],
1742 ('execve', native_fullpath, (arguments, env_path)))
1743
1744 def test_internal_execvpe_str(self):
1745 self._test_internal_execvpe(str)
1746 if os.name != "nt":
1747 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001748
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001749 def test_execve_invalid_env(self):
1750 args = [sys.executable, '-c', 'pass']
1751
Ville Skyttä49b27342017-08-03 09:00:59 +03001752 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001753 newenv = os.environ.copy()
1754 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1755 with self.assertRaises(ValueError):
1756 os.execve(args[0], args, newenv)
1757
Ville Skyttä49b27342017-08-03 09:00:59 +03001758 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001759 newenv = os.environ.copy()
1760 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1761 with self.assertRaises(ValueError):
1762 os.execve(args[0], args, newenv)
1763
Ville Skyttä49b27342017-08-03 09:00:59 +03001764 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001765 newenv = os.environ.copy()
1766 newenv["FRUIT=ORANGE"] = "lemon"
1767 with self.assertRaises(ValueError):
1768 os.execve(args[0], args, newenv)
1769
Alexey Izbyshev83460312018-10-20 03:28:22 +03001770 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1771 def test_execve_with_empty_path(self):
1772 # bpo-32890: Check GetLastError() misuse
1773 try:
1774 os.execve('', ['arg'], {})
1775 except OSError as e:
1776 self.assertTrue(e.winerror is None or e.winerror != 0)
1777 else:
1778 self.fail('No OSError raised')
1779
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001780
Serhiy Storchaka43767632013-11-03 21:31:38 +02001781@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001782class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001783 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001784 try:
1785 os.stat(support.TESTFN)
1786 except FileNotFoundError:
1787 exists = False
1788 except OSError as exc:
1789 exists = True
1790 self.fail("file %s must not exist; os.stat failed with %s"
1791 % (support.TESTFN, exc))
1792 else:
1793 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001794
Thomas Wouters477c8d52006-05-27 19:21:47 +00001795 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001796 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001797
1798 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001799 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001800
1801 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001802 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001803
1804 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001805 self.addCleanup(support.unlink, support.TESTFN)
1806
Victor Stinnere77c9742016-03-25 10:28:23 +01001807 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001808 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001809
1810 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001811 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001812
Thomas Wouters477c8d52006-05-27 19:21:47 +00001813 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001814 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001815
Victor Stinnere77c9742016-03-25 10:28:23 +01001816
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001817class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001818 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001819 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1820 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001821 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001822 def get_single(f):
1823 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001824 if hasattr(os, f):
1825 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001826 return helper
1827 for f in singles:
1828 locals()["test_"+f] = get_single(f)
1829
Benjamin Peterson7522c742009-01-19 21:00:09 +00001830 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001831 try:
1832 f(support.make_bad_fd(), *args)
1833 except OSError as e:
1834 self.assertEqual(e.errno, errno.EBADF)
1835 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001836 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001837 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001838
Serhiy Storchaka43767632013-11-03 21:31:38 +02001839 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001840 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001841 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001842
Serhiy Storchaka43767632013-11-03 21:31:38 +02001843 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001844 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001845 fd = support.make_bad_fd()
1846 # Make sure none of the descriptors we are about to close are
1847 # currently valid (issue 6542).
1848 for i in range(10):
1849 try: os.fstat(fd+i)
1850 except OSError:
1851 pass
1852 else:
1853 break
1854 if i < 2:
1855 raise unittest.SkipTest(
1856 "Unable to acquire a range of invalid file descriptors")
1857 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001858
Serhiy Storchaka43767632013-11-03 21:31:38 +02001859 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001860 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001862
Serhiy Storchaka43767632013-11-03 21:31:38 +02001863 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001864 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001866
Serhiy Storchaka43767632013-11-03 21:31:38 +02001867 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001868 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001869 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001870
Serhiy Storchaka43767632013-11-03 21:31:38 +02001871 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001872 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001873 self.check(os.pathconf, "PC_NAME_MAX")
1874 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001875
Serhiy Storchaka43767632013-11-03 21:31:38 +02001876 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001877 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001878 self.check(os.truncate, 0)
1879 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001880
Serhiy Storchaka43767632013-11-03 21:31:38 +02001881 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001882 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001883 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001884
Serhiy Storchaka43767632013-11-03 21:31:38 +02001885 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001886 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001887 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001888
Victor Stinner57ddf782014-01-08 15:21:28 +01001889 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1890 def test_readv(self):
1891 buf = bytearray(10)
1892 self.check(os.readv, [buf])
1893
Serhiy Storchaka43767632013-11-03 21:31:38 +02001894 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001895 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001896 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001897
Serhiy Storchaka43767632013-11-03 21:31:38 +02001898 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001899 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001900 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001901
Victor Stinner57ddf782014-01-08 15:21:28 +01001902 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1903 def test_writev(self):
1904 self.check(os.writev, [b'abc'])
1905
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001906 def test_inheritable(self):
1907 self.check(os.get_inheritable)
1908 self.check(os.set_inheritable, True)
1909
1910 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1911 'needs os.get_blocking() and os.set_blocking()')
1912 def test_blocking(self):
1913 self.check(os.get_blocking)
1914 self.check(os.set_blocking, True)
1915
Brian Curtin1b9df392010-11-24 20:24:31 +00001916
1917class LinkTests(unittest.TestCase):
1918 def setUp(self):
1919 self.file1 = support.TESTFN
1920 self.file2 = os.path.join(support.TESTFN + "2")
1921
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001922 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001923 for file in (self.file1, self.file2):
1924 if os.path.exists(file):
1925 os.unlink(file)
1926
Brian Curtin1b9df392010-11-24 20:24:31 +00001927 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001928 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001929
xdegaye6a55d092017-11-12 17:57:04 +01001930 try:
1931 os.link(file1, file2)
1932 except PermissionError as e:
1933 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001934 with open(file1, "r") as f1, open(file2, "r") as f2:
1935 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1936
1937 def test_link(self):
1938 self._test_link(self.file1, self.file2)
1939
1940 def test_link_bytes(self):
1941 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1942 bytes(self.file2, sys.getfilesystemencoding()))
1943
Brian Curtinf498b752010-11-30 15:54:04 +00001944 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001945 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001946 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001947 except UnicodeError:
1948 raise unittest.SkipTest("Unable to encode for this platform.")
1949
Brian Curtinf498b752010-11-30 15:54:04 +00001950 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001951 self.file2 = self.file1 + "2"
1952 self._test_link(self.file1, self.file2)
1953
Serhiy Storchaka43767632013-11-03 21:31:38 +02001954@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1955class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001956 # uid_t and gid_t are 32-bit unsigned integers on Linux
1957 UID_OVERFLOW = (1 << 32)
1958 GID_OVERFLOW = (1 << 32)
1959
Serhiy Storchaka43767632013-11-03 21:31:38 +02001960 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1961 def test_setuid(self):
1962 if os.getuid() != 0:
1963 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001964 self.assertRaises(TypeError, os.setuid, 'not an int')
1965 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001966
Serhiy Storchaka43767632013-11-03 21:31:38 +02001967 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1968 def test_setgid(self):
1969 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1970 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001971 self.assertRaises(TypeError, os.setgid, 'not an int')
1972 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001973
Serhiy Storchaka43767632013-11-03 21:31:38 +02001974 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1975 def test_seteuid(self):
1976 if os.getuid() != 0:
1977 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001978 self.assertRaises(TypeError, os.setegid, 'not an int')
1979 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001980
Serhiy Storchaka43767632013-11-03 21:31:38 +02001981 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1982 def test_setegid(self):
1983 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1984 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001985 self.assertRaises(TypeError, os.setegid, 'not an int')
1986 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001987
Serhiy Storchaka43767632013-11-03 21:31:38 +02001988 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1989 def test_setreuid(self):
1990 if os.getuid() != 0:
1991 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001992 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1993 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1994 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1995 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001996
Serhiy Storchaka43767632013-11-03 21:31:38 +02001997 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1998 def test_setreuid_neg1(self):
1999 # Needs to accept -1. We run this in a subprocess to avoid
2000 # altering the test runner's process state (issue8045).
2001 subprocess.check_call([
2002 sys.executable, '-c',
2003 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002004
Serhiy Storchaka43767632013-11-03 21:31:38 +02002005 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2006 def test_setregid(self):
2007 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2008 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002009 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2010 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2011 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2012 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002013
Serhiy Storchaka43767632013-11-03 21:31:38 +02002014 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2015 def test_setregid_neg1(self):
2016 # Needs to accept -1. We run this in a subprocess to avoid
2017 # altering the test runner's process state (issue8045).
2018 subprocess.check_call([
2019 sys.executable, '-c',
2020 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002021
Serhiy Storchaka43767632013-11-03 21:31:38 +02002022@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2023class Pep383Tests(unittest.TestCase):
2024 def setUp(self):
2025 if support.TESTFN_UNENCODABLE:
2026 self.dir = support.TESTFN_UNENCODABLE
2027 elif support.TESTFN_NONASCII:
2028 self.dir = support.TESTFN_NONASCII
2029 else:
2030 self.dir = support.TESTFN
2031 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002032
Serhiy Storchaka43767632013-11-03 21:31:38 +02002033 bytesfn = []
2034 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002035 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002036 fn = os.fsencode(fn)
2037 except UnicodeEncodeError:
2038 return
2039 bytesfn.append(fn)
2040 add_filename(support.TESTFN_UNICODE)
2041 if support.TESTFN_UNENCODABLE:
2042 add_filename(support.TESTFN_UNENCODABLE)
2043 if support.TESTFN_NONASCII:
2044 add_filename(support.TESTFN_NONASCII)
2045 if not bytesfn:
2046 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002047
Serhiy Storchaka43767632013-11-03 21:31:38 +02002048 self.unicodefn = set()
2049 os.mkdir(self.dir)
2050 try:
2051 for fn in bytesfn:
2052 support.create_empty_file(os.path.join(self.bdir, fn))
2053 fn = os.fsdecode(fn)
2054 if fn in self.unicodefn:
2055 raise ValueError("duplicate filename")
2056 self.unicodefn.add(fn)
2057 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002058 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002059 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002060
Serhiy Storchaka43767632013-11-03 21:31:38 +02002061 def tearDown(self):
2062 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002063
Serhiy Storchaka43767632013-11-03 21:31:38 +02002064 def test_listdir(self):
2065 expected = self.unicodefn
2066 found = set(os.listdir(self.dir))
2067 self.assertEqual(found, expected)
2068 # test listdir without arguments
2069 current_directory = os.getcwd()
2070 try:
2071 os.chdir(os.sep)
2072 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2073 finally:
2074 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002075
Serhiy Storchaka43767632013-11-03 21:31:38 +02002076 def test_open(self):
2077 for fn in self.unicodefn:
2078 f = open(os.path.join(self.dir, fn), 'rb')
2079 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002080
Serhiy Storchaka43767632013-11-03 21:31:38 +02002081 @unittest.skipUnless(hasattr(os, 'statvfs'),
2082 "need os.statvfs()")
2083 def test_statvfs(self):
2084 # issue #9645
2085 for fn in self.unicodefn:
2086 # should not fail with file not found error
2087 fullname = os.path.join(self.dir, fn)
2088 os.statvfs(fullname)
2089
2090 def test_stat(self):
2091 for fn in self.unicodefn:
2092 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002093
Brian Curtineb24d742010-04-12 17:16:38 +00002094@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2095class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002096 def _kill(self, sig):
2097 # Start sys.executable as a subprocess and communicate from the
2098 # subprocess to the parent that the interpreter is ready. When it
2099 # becomes ready, send *sig* via os.kill to the subprocess and check
2100 # that the return code is equal to *sig*.
2101 import ctypes
2102 from ctypes import wintypes
2103 import msvcrt
2104
2105 # Since we can't access the contents of the process' stdout until the
2106 # process has exited, use PeekNamedPipe to see what's inside stdout
2107 # without waiting. This is done so we can tell that the interpreter
2108 # is started and running at a point where it could handle a signal.
2109 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2110 PeekNamedPipe.restype = wintypes.BOOL
2111 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2112 ctypes.POINTER(ctypes.c_char), # stdout buf
2113 wintypes.DWORD, # Buffer size
2114 ctypes.POINTER(wintypes.DWORD), # bytes read
2115 ctypes.POINTER(wintypes.DWORD), # bytes avail
2116 ctypes.POINTER(wintypes.DWORD)) # bytes left
2117 msg = "running"
2118 proc = subprocess.Popen([sys.executable, "-c",
2119 "import sys;"
2120 "sys.stdout.write('{}');"
2121 "sys.stdout.flush();"
2122 "input()".format(msg)],
2123 stdout=subprocess.PIPE,
2124 stderr=subprocess.PIPE,
2125 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002126 self.addCleanup(proc.stdout.close)
2127 self.addCleanup(proc.stderr.close)
2128 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002129
2130 count, max = 0, 100
2131 while count < max and proc.poll() is None:
2132 # Create a string buffer to store the result of stdout from the pipe
2133 buf = ctypes.create_string_buffer(len(msg))
2134 # Obtain the text currently in proc.stdout
2135 # Bytes read/avail/left are left as NULL and unused
2136 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2137 buf, ctypes.sizeof(buf), None, None, None)
2138 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2139 if buf.value:
2140 self.assertEqual(msg, buf.value.decode())
2141 break
2142 time.sleep(0.1)
2143 count += 1
2144 else:
2145 self.fail("Did not receive communication from the subprocess")
2146
Brian Curtineb24d742010-04-12 17:16:38 +00002147 os.kill(proc.pid, sig)
2148 self.assertEqual(proc.wait(), sig)
2149
2150 def test_kill_sigterm(self):
2151 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002152 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002153
2154 def test_kill_int(self):
2155 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002156 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002157
2158 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002159 tagname = "test_os_%s" % uuid.uuid1()
2160 m = mmap.mmap(-1, 1, tagname)
2161 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002162 # Run a script which has console control handling enabled.
2163 proc = subprocess.Popen([sys.executable,
2164 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002165 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002166 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2167 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002168 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002169 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002170 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002171 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002172 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002173 count += 1
2174 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002175 # Forcefully kill the process if we weren't able to signal it.
2176 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002177 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002178 os.kill(proc.pid, event)
2179 # proc.send_signal(event) could also be done here.
2180 # Allow time for the signal to be passed and the process to exit.
2181 time.sleep(0.5)
2182 if not proc.poll():
2183 # Forcefully kill the process if we weren't able to signal it.
2184 os.kill(proc.pid, signal.SIGINT)
2185 self.fail("subprocess did not stop on {}".format(name))
2186
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002187 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002188 def test_CTRL_C_EVENT(self):
2189 from ctypes import wintypes
2190 import ctypes
2191
2192 # Make a NULL value by creating a pointer with no argument.
2193 NULL = ctypes.POINTER(ctypes.c_int)()
2194 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2195 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2196 wintypes.BOOL)
2197 SetConsoleCtrlHandler.restype = wintypes.BOOL
2198
2199 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002200 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002201 # by subprocesses.
2202 SetConsoleCtrlHandler(NULL, 0)
2203
2204 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2205
2206 def test_CTRL_BREAK_EVENT(self):
2207 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2208
2209
Brian Curtind40e6f72010-07-08 21:39:08 +00002210@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002211class Win32ListdirTests(unittest.TestCase):
2212 """Test listdir on Windows."""
2213
2214 def setUp(self):
2215 self.created_paths = []
2216 for i in range(2):
2217 dir_name = 'SUB%d' % i
2218 dir_path = os.path.join(support.TESTFN, dir_name)
2219 file_name = 'FILE%d' % i
2220 file_path = os.path.join(support.TESTFN, file_name)
2221 os.makedirs(dir_path)
2222 with open(file_path, 'w') as f:
2223 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2224 self.created_paths.extend([dir_name, file_name])
2225 self.created_paths.sort()
2226
2227 def tearDown(self):
2228 shutil.rmtree(support.TESTFN)
2229
2230 def test_listdir_no_extended_path(self):
2231 """Test when the path is not an "extended" path."""
2232 # unicode
2233 self.assertEqual(
2234 sorted(os.listdir(support.TESTFN)),
2235 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002236
Tim Golden781bbeb2013-10-25 20:24:06 +01002237 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002238 self.assertEqual(
2239 sorted(os.listdir(os.fsencode(support.TESTFN))),
2240 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002241
2242 def test_listdir_extended_path(self):
2243 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002244 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002245 # unicode
2246 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2247 self.assertEqual(
2248 sorted(os.listdir(path)),
2249 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002250
Tim Golden781bbeb2013-10-25 20:24:06 +01002251 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002252 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2253 self.assertEqual(
2254 sorted(os.listdir(path)),
2255 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002256
2257
Berker Peksage0b5b202018-08-15 13:03:41 +03002258@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2259class ReadlinkTests(unittest.TestCase):
2260 filelink = 'readlinktest'
2261 filelink_target = os.path.abspath(__file__)
2262 filelinkb = os.fsencode(filelink)
2263 filelinkb_target = os.fsencode(filelink_target)
2264
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002265 def assertPathEqual(self, left, right):
2266 left = os.path.normcase(left)
2267 right = os.path.normcase(right)
2268 if sys.platform == 'win32':
2269 # Bad practice to blindly strip the prefix as it may be required to
2270 # correctly refer to the file, but we're only comparing paths here.
2271 has_prefix = lambda p: p.startswith(
2272 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2273 if has_prefix(left):
2274 left = left[4:]
2275 if has_prefix(right):
2276 right = right[4:]
2277 self.assertEqual(left, right)
2278
Berker Peksage0b5b202018-08-15 13:03:41 +03002279 def setUp(self):
2280 self.assertTrue(os.path.exists(self.filelink_target))
2281 self.assertTrue(os.path.exists(self.filelinkb_target))
2282 self.assertFalse(os.path.exists(self.filelink))
2283 self.assertFalse(os.path.exists(self.filelinkb))
2284
2285 def test_not_symlink(self):
2286 filelink_target = FakePath(self.filelink_target)
2287 self.assertRaises(OSError, os.readlink, self.filelink_target)
2288 self.assertRaises(OSError, os.readlink, filelink_target)
2289
2290 def test_missing_link(self):
2291 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2292 self.assertRaises(FileNotFoundError, os.readlink,
2293 FakePath('missing-link'))
2294
2295 @support.skip_unless_symlink
2296 def test_pathlike(self):
2297 os.symlink(self.filelink_target, self.filelink)
2298 self.addCleanup(support.unlink, self.filelink)
2299 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002300 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002301
2302 @support.skip_unless_symlink
2303 def test_pathlike_bytes(self):
2304 os.symlink(self.filelinkb_target, self.filelinkb)
2305 self.addCleanup(support.unlink, self.filelinkb)
2306 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002307 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002308 self.assertIsInstance(path, bytes)
2309
2310 @support.skip_unless_symlink
2311 def test_bytes(self):
2312 os.symlink(self.filelinkb_target, self.filelinkb)
2313 self.addCleanup(support.unlink, self.filelinkb)
2314 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002315 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002316 self.assertIsInstance(path, bytes)
2317
2318
Tim Golden781bbeb2013-10-25 20:24:06 +01002319@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002320@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002321class Win32SymlinkTests(unittest.TestCase):
2322 filelink = 'filelinktest'
2323 filelink_target = os.path.abspath(__file__)
2324 dirlink = 'dirlinktest'
2325 dirlink_target = os.path.dirname(filelink_target)
2326 missing_link = 'missing link'
2327
2328 def setUp(self):
2329 assert os.path.exists(self.dirlink_target)
2330 assert os.path.exists(self.filelink_target)
2331 assert not os.path.exists(self.dirlink)
2332 assert not os.path.exists(self.filelink)
2333 assert not os.path.exists(self.missing_link)
2334
2335 def tearDown(self):
2336 if os.path.exists(self.filelink):
2337 os.remove(self.filelink)
2338 if os.path.exists(self.dirlink):
2339 os.rmdir(self.dirlink)
2340 if os.path.lexists(self.missing_link):
2341 os.remove(self.missing_link)
2342
2343 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002344 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002345 self.assertTrue(os.path.exists(self.dirlink))
2346 self.assertTrue(os.path.isdir(self.dirlink))
2347 self.assertTrue(os.path.islink(self.dirlink))
2348 self.check_stat(self.dirlink, self.dirlink_target)
2349
2350 def test_file_link(self):
2351 os.symlink(self.filelink_target, self.filelink)
2352 self.assertTrue(os.path.exists(self.filelink))
2353 self.assertTrue(os.path.isfile(self.filelink))
2354 self.assertTrue(os.path.islink(self.filelink))
2355 self.check_stat(self.filelink, self.filelink_target)
2356
2357 def _create_missing_dir_link(self):
2358 'Create a "directory" link to a non-existent target'
2359 linkname = self.missing_link
2360 if os.path.lexists(linkname):
2361 os.remove(linkname)
2362 target = r'c:\\target does not exist.29r3c740'
2363 assert not os.path.exists(target)
2364 target_is_dir = True
2365 os.symlink(target, linkname, target_is_dir)
2366
2367 def test_remove_directory_link_to_missing_target(self):
2368 self._create_missing_dir_link()
2369 # For compatibility with Unix, os.remove will check the
2370 # directory status and call RemoveDirectory if the symlink
2371 # was created with target_is_dir==True.
2372 os.remove(self.missing_link)
2373
Brian Curtind40e6f72010-07-08 21:39:08 +00002374 def test_isdir_on_directory_link_to_missing_target(self):
2375 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002376 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002377
Brian Curtind40e6f72010-07-08 21:39:08 +00002378 def test_rmdir_on_directory_link_to_missing_target(self):
2379 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002380 os.rmdir(self.missing_link)
2381
2382 def check_stat(self, link, target):
2383 self.assertEqual(os.stat(link), os.stat(target))
2384 self.assertNotEqual(os.lstat(link), os.stat(link))
2385
Brian Curtind25aef52011-06-13 15:16:04 -05002386 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002387 self.assertEqual(os.stat(bytes_link), os.stat(target))
2388 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002389
2390 def test_12084(self):
2391 level1 = os.path.abspath(support.TESTFN)
2392 level2 = os.path.join(level1, "level2")
2393 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002394 self.addCleanup(support.rmtree, level1)
2395
2396 os.mkdir(level1)
2397 os.mkdir(level2)
2398 os.mkdir(level3)
2399
2400 file1 = os.path.abspath(os.path.join(level1, "file1"))
2401 create_file(file1)
2402
2403 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002404 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002405 os.chdir(level2)
2406 link = os.path.join(level2, "link")
2407 os.symlink(os.path.relpath(file1), "link")
2408 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002409
Victor Stinnerae39d232016-03-24 17:12:55 +01002410 # Check os.stat calls from the same dir as the link
2411 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002412
Victor Stinnerae39d232016-03-24 17:12:55 +01002413 # Check os.stat calls from a dir below the link
2414 os.chdir(level1)
2415 self.assertEqual(os.stat(file1),
2416 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002417
Victor Stinnerae39d232016-03-24 17:12:55 +01002418 # Check os.stat calls from a dir above the link
2419 os.chdir(level3)
2420 self.assertEqual(os.stat(file1),
2421 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002422 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002423 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002424
SSE43c34aad2018-02-13 00:10:35 +07002425 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2426 and os.path.exists(r'C:\ProgramData'),
2427 'Test directories not found')
2428 def test_29248(self):
2429 # os.symlink() calls CreateSymbolicLink, which creates
2430 # the reparse data buffer with the print name stored
2431 # first, so the offset is always 0. CreateSymbolicLink
2432 # stores the "PrintName" DOS path (e.g. "C:\") first,
2433 # with an offset of 0, followed by the "SubstituteName"
2434 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2435 # the other hand, seems to have been created manually
2436 # with an inverted order.
2437 target = os.readlink(r'C:\Users\All Users')
2438 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2439
Steve Dower6921e732018-03-05 14:26:08 -08002440 def test_buffer_overflow(self):
2441 # Older versions would have a buffer overflow when detecting
2442 # whether a link source was a directory. This test ensures we
2443 # no longer crash, but does not otherwise validate the behavior
2444 segment = 'X' * 27
2445 path = os.path.join(*[segment] * 10)
2446 test_cases = [
2447 # overflow with absolute src
2448 ('\\' + path, segment),
2449 # overflow dest with relative src
2450 (segment, path),
2451 # overflow when joining src
2452 (path[:180], path[:180]),
2453 ]
2454 for src, dest in test_cases:
2455 try:
2456 os.symlink(src, dest)
2457 except FileNotFoundError:
2458 pass
2459 else:
2460 try:
2461 os.remove(dest)
2462 except OSError:
2463 pass
2464 # Also test with bytes, since that is a separate code path.
2465 try:
2466 os.symlink(os.fsencode(src), os.fsencode(dest))
2467 except FileNotFoundError:
2468 pass
2469 else:
2470 try:
2471 os.remove(dest)
2472 except OSError:
2473 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002474
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002475 def test_appexeclink(self):
2476 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002477 if not os.path.isdir(root):
2478 self.skipTest("test requires a WindowsApps directory")
2479
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002480 aliases = [os.path.join(root, a)
2481 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2482
2483 for alias in aliases:
2484 if support.verbose:
2485 print()
2486 print("Testing with", alias)
2487 st = os.lstat(alias)
2488 self.assertEqual(st, os.stat(alias))
2489 self.assertFalse(stat.S_ISLNK(st.st_mode))
2490 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2491 # testing the first one we see is sufficient
2492 break
2493 else:
2494 self.skipTest("test requires an app execution alias")
2495
Tim Golden0321cf22014-05-05 19:46:17 +01002496@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2497class Win32JunctionTests(unittest.TestCase):
2498 junction = 'junctiontest'
2499 junction_target = os.path.dirname(os.path.abspath(__file__))
2500
2501 def setUp(self):
2502 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002503 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002504
2505 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002506 if os.path.lexists(self.junction):
2507 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002508
2509 def test_create_junction(self):
2510 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002511 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002512 self.assertTrue(os.path.exists(self.junction))
2513 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002514 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2515 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002516
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002517 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002518 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002519 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2520 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002521
2522 def test_unlink_removes_junction(self):
2523 _winapi.CreateJunction(self.junction_target, self.junction)
2524 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002525 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002526
2527 os.unlink(self.junction)
2528 self.assertFalse(os.path.exists(self.junction))
2529
Mark Becwarb82bfac2019-02-02 16:08:23 -05002530@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2531class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002532 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002533 nt = support.import_module('nt')
2534 ctypes = support.import_module('ctypes')
2535 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002536
2537 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2538 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2539
2540 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2541 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2542 ctypes.wintypes.LPDWORD)
2543
2544 # This is a pseudo-handle that doesn't need to be closed
2545 hproc = kernel.GetCurrentProcess()
2546
2547 handle_count = ctypes.wintypes.DWORD()
2548 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2549 self.assertEqual(1, ok)
2550
2551 before_count = handle_count.value
2552
2553 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002554 filenames = [
2555 r'\\?\C:',
2556 r'\\?\NUL',
2557 r'\\?\CONIN',
2558 __file__,
2559 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002560
Berker Peksag6ef726a2019-04-22 18:46:28 +03002561 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002562 for name in filenames:
2563 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002564 nt._getfinalpathname(name)
2565 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002566 # Failure is expected
2567 pass
2568 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002569 os.stat(name)
2570 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002571 pass
2572
2573 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2574 self.assertEqual(1, ok)
2575
2576 handle_delta = handle_count.value - before_count
2577
2578 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002579
Jason R. Coombs3a092862013-05-27 23:21:28 -04002580@support.skip_unless_symlink
2581class NonLocalSymlinkTests(unittest.TestCase):
2582
2583 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002584 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002585 Create this structure:
2586
2587 base
2588 \___ some_dir
2589 """
2590 os.makedirs('base/some_dir')
2591
2592 def tearDown(self):
2593 shutil.rmtree('base')
2594
2595 def test_directory_link_nonlocal(self):
2596 """
2597 The symlink target should resolve relative to the link, not relative
2598 to the current directory.
2599
2600 Then, link base/some_link -> base/some_dir and ensure that some_link
2601 is resolved as a directory.
2602
2603 In issue13772, it was discovered that directory detection failed if
2604 the symlink target was not specified relative to the current
2605 directory, which was a defect in the implementation.
2606 """
2607 src = os.path.join('base', 'some_link')
2608 os.symlink('some_dir', src)
2609 assert os.path.isdir(src)
2610
2611
Victor Stinnere8d51452010-08-19 01:05:19 +00002612class FSEncodingTests(unittest.TestCase):
2613 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002614 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2615 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002616
Victor Stinnere8d51452010-08-19 01:05:19 +00002617 def test_identity(self):
2618 # assert fsdecode(fsencode(x)) == x
2619 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2620 try:
2621 bytesfn = os.fsencode(fn)
2622 except UnicodeEncodeError:
2623 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002625
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002626
Brett Cannonefb00c02012-02-29 18:31:31 -05002627
2628class DeviceEncodingTests(unittest.TestCase):
2629
2630 def test_bad_fd(self):
2631 # Return None when an fd doesn't actually exist.
2632 self.assertIsNone(os.device_encoding(123456))
2633
Paul Monson62dfd7d2019-04-25 11:36:45 -07002634 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002635 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002636 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002637 def test_device_encoding(self):
2638 encoding = os.device_encoding(0)
2639 self.assertIsNotNone(encoding)
2640 self.assertTrue(codecs.lookup(encoding))
2641
2642
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002643class PidTests(unittest.TestCase):
2644 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2645 def test_getppid(self):
2646 p = subprocess.Popen([sys.executable, '-c',
2647 'import os; print(os.getppid())'],
2648 stdout=subprocess.PIPE)
2649 stdout, _ = p.communicate()
2650 # We are the parent of our subprocess
2651 self.assertEqual(int(stdout), os.getpid())
2652
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002653 def test_waitpid(self):
2654 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002655 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002656 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002657 status = os.waitpid(pid, 0)
2658 self.assertEqual(status, (pid, 0))
2659
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002660
Victor Stinner4659ccf2016-09-14 10:57:00 +02002661class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002662 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002663 self.exitcode = 17
2664
2665 filename = support.TESTFN
2666 self.addCleanup(support.unlink, filename)
2667
2668 if not with_env:
2669 code = 'import sys; sys.exit(%s)' % self.exitcode
2670 else:
2671 self.env = dict(os.environ)
2672 # create an unique key
2673 self.key = str(uuid.uuid4())
2674 self.env[self.key] = self.key
2675 # read the variable from os.environ to check that it exists
2676 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2677 % (self.key, self.exitcode))
2678
2679 with open(filename, "w") as fp:
2680 fp.write(code)
2681
Berker Peksag81816462016-09-15 20:19:47 +03002682 args = [sys.executable, filename]
2683 if use_bytes:
2684 args = [os.fsencode(a) for a in args]
2685 self.env = {os.fsencode(k): os.fsencode(v)
2686 for k, v in self.env.items()}
2687
2688 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002689
Berker Peksag4af23d72016-09-15 20:32:44 +03002690 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002691 def test_spawnl(self):
2692 args = self.create_args()
2693 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2694 self.assertEqual(exitcode, self.exitcode)
2695
Berker Peksag4af23d72016-09-15 20:32:44 +03002696 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002697 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002698 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002699 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2700 self.assertEqual(exitcode, self.exitcode)
2701
Berker Peksag4af23d72016-09-15 20:32:44 +03002702 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002703 def test_spawnlp(self):
2704 args = self.create_args()
2705 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2706 self.assertEqual(exitcode, self.exitcode)
2707
Berker Peksag4af23d72016-09-15 20:32:44 +03002708 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002709 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002710 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002711 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2712 self.assertEqual(exitcode, self.exitcode)
2713
Berker Peksag4af23d72016-09-15 20:32:44 +03002714 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002715 def test_spawnv(self):
2716 args = self.create_args()
2717 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2718 self.assertEqual(exitcode, self.exitcode)
2719
Berker Peksag4af23d72016-09-15 20:32:44 +03002720 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002721 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002722 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002723 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2724 self.assertEqual(exitcode, self.exitcode)
2725
Berker Peksag4af23d72016-09-15 20:32:44 +03002726 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002727 def test_spawnvp(self):
2728 args = self.create_args()
2729 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2730 self.assertEqual(exitcode, self.exitcode)
2731
Berker Peksag4af23d72016-09-15 20:32:44 +03002732 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002733 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002734 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002735 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2736 self.assertEqual(exitcode, self.exitcode)
2737
Berker Peksag4af23d72016-09-15 20:32:44 +03002738 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002739 def test_nowait(self):
2740 args = self.create_args()
2741 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2742 result = os.waitpid(pid, 0)
2743 self.assertEqual(result[0], pid)
2744 status = result[1]
2745 if hasattr(os, 'WIFEXITED'):
2746 self.assertTrue(os.WIFEXITED(status))
2747 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2748 else:
2749 self.assertEqual(status, self.exitcode << 8)
2750
Berker Peksag4af23d72016-09-15 20:32:44 +03002751 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002752 def test_spawnve_bytes(self):
2753 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2754 args = self.create_args(with_env=True, use_bytes=True)
2755 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2756 self.assertEqual(exitcode, self.exitcode)
2757
Steve Dower859fd7b2016-11-19 18:53:19 -08002758 @requires_os_func('spawnl')
2759 def test_spawnl_noargs(self):
2760 args = self.create_args()
2761 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002762 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002763
2764 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002765 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002766 args = self.create_args()
2767 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002768 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002769
2770 @requires_os_func('spawnv')
2771 def test_spawnv_noargs(self):
2772 args = self.create_args()
2773 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2774 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002775 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2776 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002777
2778 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002779 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002780 args = self.create_args()
2781 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2782 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002783 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2784 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002785
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002786 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002787 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002788
Ville Skyttä49b27342017-08-03 09:00:59 +03002789 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002790 newenv = os.environ.copy()
2791 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2792 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002793 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002794 except ValueError:
2795 pass
2796 else:
2797 self.assertEqual(exitcode, 127)
2798
Ville Skyttä49b27342017-08-03 09:00:59 +03002799 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002800 newenv = os.environ.copy()
2801 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2802 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002803 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002804 except ValueError:
2805 pass
2806 else:
2807 self.assertEqual(exitcode, 127)
2808
Ville Skyttä49b27342017-08-03 09:00:59 +03002809 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002810 newenv = os.environ.copy()
2811 newenv["FRUIT=ORANGE"] = "lemon"
2812 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002813 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002814 except ValueError:
2815 pass
2816 else:
2817 self.assertEqual(exitcode, 127)
2818
Ville Skyttä49b27342017-08-03 09:00:59 +03002819 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002820 filename = support.TESTFN
2821 self.addCleanup(support.unlink, filename)
2822 with open(filename, "w") as fp:
2823 fp.write('import sys, os\n'
2824 'if os.getenv("FRUIT") != "orange=lemon":\n'
2825 ' raise AssertionError')
2826 args = [sys.executable, filename]
2827 newenv = os.environ.copy()
2828 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002829 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002830 self.assertEqual(exitcode, 0)
2831
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002832 @requires_os_func('spawnve')
2833 def test_spawnve_invalid_env(self):
2834 self._test_invalid_env(os.spawnve)
2835
2836 @requires_os_func('spawnvpe')
2837 def test_spawnvpe_invalid_env(self):
2838 self._test_invalid_env(os.spawnvpe)
2839
Serhiy Storchaka77703942017-06-25 07:33:01 +03002840
Brian Curtin0151b8e2010-09-24 13:43:43 +00002841# The introduction of this TestCase caused at least two different errors on
2842# *nix buildbots. Temporarily skip this to let the buildbots move along.
2843@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002844@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2845class LoginTests(unittest.TestCase):
2846 def test_getlogin(self):
2847 user_name = os.getlogin()
2848 self.assertNotEqual(len(user_name), 0)
2849
2850
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002851@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2852 "needs os.getpriority and os.setpriority")
2853class ProgramPriorityTests(unittest.TestCase):
2854 """Tests for os.getpriority() and os.setpriority()."""
2855
2856 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002857
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002858 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2859 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2860 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002861 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2862 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002863 raise unittest.SkipTest("unable to reliably test setpriority "
2864 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002865 else:
2866 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002867 finally:
2868 try:
2869 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2870 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002871 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002872 raise
2873
2874
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002875class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002876
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002877 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002878
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002879 def __init__(self, conn):
2880 asynchat.async_chat.__init__(self, conn)
2881 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002882 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002883 self.closed = False
2884 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002885
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002886 def handle_read(self):
2887 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002888 if self.accumulate:
2889 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002890
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002891 def get_data(self):
2892 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002893
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002894 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002895 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002896 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002897
2898 def handle_error(self):
2899 raise
2900
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002901 def __init__(self, address):
2902 threading.Thread.__init__(self)
2903 asyncore.dispatcher.__init__(self)
2904 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2905 self.bind(address)
2906 self.listen(5)
2907 self.host, self.port = self.socket.getsockname()[:2]
2908 self.handler_instance = None
2909 self._active = False
2910 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002911
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002912 # --- public API
2913
2914 @property
2915 def running(self):
2916 return self._active
2917
2918 def start(self):
2919 assert not self.running
2920 self.__flag = threading.Event()
2921 threading.Thread.start(self)
2922 self.__flag.wait()
2923
2924 def stop(self):
2925 assert self.running
2926 self._active = False
2927 self.join()
2928
2929 def wait(self):
2930 # wait for handler connection to be closed, then stop the server
2931 while not getattr(self.handler_instance, "closed", False):
2932 time.sleep(0.001)
2933 self.stop()
2934
2935 # --- internals
2936
2937 def run(self):
2938 self._active = True
2939 self.__flag.set()
2940 while self._active and asyncore.socket_map:
2941 self._active_lock.acquire()
2942 asyncore.loop(timeout=0.001, count=1)
2943 self._active_lock.release()
2944 asyncore.close_all()
2945
2946 def handle_accept(self):
2947 conn, addr = self.accept()
2948 self.handler_instance = self.Handler(conn)
2949
2950 def handle_connect(self):
2951 self.close()
2952 handle_read = handle_connect
2953
2954 def writable(self):
2955 return 0
2956
2957 def handle_error(self):
2958 raise
2959
2960
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002961@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2962class TestSendfile(unittest.TestCase):
2963
Victor Stinner8c663fd2017-11-08 14:44:44 -08002964 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002965 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002966 not sys.platform.startswith("solaris") and \
2967 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002968 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2969 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002970 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2971 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002972
2973 @classmethod
2974 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002975 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002976 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002977
2978 @classmethod
2979 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002980 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002981 support.unlink(support.TESTFN)
2982
2983 def setUp(self):
2984 self.server = SendfileTestServer((support.HOST, 0))
2985 self.server.start()
2986 self.client = socket.socket()
2987 self.client.connect((self.server.host, self.server.port))
2988 self.client.settimeout(1)
2989 # synchronize by waiting for "220 ready" response
2990 self.client.recv(1024)
2991 self.sockno = self.client.fileno()
2992 self.file = open(support.TESTFN, 'rb')
2993 self.fileno = self.file.fileno()
2994
2995 def tearDown(self):
2996 self.file.close()
2997 self.client.close()
2998 if self.server.running:
2999 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003000 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003001
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003002 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003003 """A higher level wrapper representing how an application is
3004 supposed to use sendfile().
3005 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003006 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003007 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003008 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003009 except OSError as err:
3010 if err.errno == errno.ECONNRESET:
3011 # disconnected
3012 raise
3013 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3014 # we have to retry send data
3015 continue
3016 else:
3017 raise
3018
3019 def test_send_whole_file(self):
3020 # normal send
3021 total_sent = 0
3022 offset = 0
3023 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003024 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003025 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3026 if sent == 0:
3027 break
3028 offset += sent
3029 total_sent += sent
3030 self.assertTrue(sent <= nbytes)
3031 self.assertEqual(offset, total_sent)
3032
3033 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003034 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003035 self.client.close()
3036 self.server.wait()
3037 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003038 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003039 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003040
3041 def test_send_at_certain_offset(self):
3042 # start sending a file at a certain offset
3043 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003044 offset = len(self.DATA) // 2
3045 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003046 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003047 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003048 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3049 if sent == 0:
3050 break
3051 offset += sent
3052 total_sent += sent
3053 self.assertTrue(sent <= nbytes)
3054
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003055 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003056 self.client.close()
3057 self.server.wait()
3058 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003059 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003060 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003061 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003062 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003063
3064 def test_offset_overflow(self):
3065 # specify an offset > file size
3066 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003067 try:
3068 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3069 except OSError as e:
3070 # Solaris can raise EINVAL if offset >= file length, ignore.
3071 if e.errno != errno.EINVAL:
3072 raise
3073 else:
3074 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003075 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003076 self.client.close()
3077 self.server.wait()
3078 data = self.server.handler_instance.get_data()
3079 self.assertEqual(data, b'')
3080
3081 def test_invalid_offset(self):
3082 with self.assertRaises(OSError) as cm:
3083 os.sendfile(self.sockno, self.fileno, -1, 4096)
3084 self.assertEqual(cm.exception.errno, errno.EINVAL)
3085
Martin Panterbf19d162015-09-09 01:01:13 +00003086 def test_keywords(self):
3087 # Keyword arguments should be supported
3088 os.sendfile(out=self.sockno, offset=0, count=4096,
3089 **{'in': self.fileno})
3090 if self.SUPPORT_HEADERS_TRAILERS:
3091 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003092 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003093
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003094 # --- headers / trailers tests
3095
Serhiy Storchaka43767632013-11-03 21:31:38 +02003096 @requires_headers_trailers
3097 def test_headers(self):
3098 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003099 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003100 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003101 headers=[b"x" * 512, b"y" * 256])
3102 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003103 total_sent += sent
3104 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003105 while total_sent < len(expected_data):
3106 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003107 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3108 offset, nbytes)
3109 if sent == 0:
3110 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003111 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003112 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003113 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003114
Serhiy Storchaka43767632013-11-03 21:31:38 +02003115 self.assertEqual(total_sent, len(expected_data))
3116 self.client.close()
3117 self.server.wait()
3118 data = self.server.handler_instance.get_data()
3119 self.assertEqual(hash(data), hash(expected_data))
3120
3121 @requires_headers_trailers
3122 def test_trailers(self):
3123 TESTFN2 = support.TESTFN + "2"
3124 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003125
3126 self.addCleanup(support.unlink, TESTFN2)
3127 create_file(TESTFN2, file_data)
3128
3129 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003130 os.sendfile(self.sockno, f.fileno(), 0, 5,
3131 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003132 self.client.close()
3133 self.server.wait()
3134 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003135 self.assertEqual(data, b"abcde123456789")
3136
3137 @requires_headers_trailers
3138 @requires_32b
3139 def test_headers_overflow_32bits(self):
3140 self.server.handler_instance.accumulate = False
3141 with self.assertRaises(OSError) as cm:
3142 os.sendfile(self.sockno, self.fileno, 0, 0,
3143 headers=[b"x" * 2**16] * 2**15)
3144 self.assertEqual(cm.exception.errno, errno.EINVAL)
3145
3146 @requires_headers_trailers
3147 @requires_32b
3148 def test_trailers_overflow_32bits(self):
3149 self.server.handler_instance.accumulate = False
3150 with self.assertRaises(OSError) as cm:
3151 os.sendfile(self.sockno, self.fileno, 0, 0,
3152 trailers=[b"x" * 2**16] * 2**15)
3153 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003154
Serhiy Storchaka43767632013-11-03 21:31:38 +02003155 @requires_headers_trailers
3156 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3157 'test needs os.SF_NODISKIO')
3158 def test_flags(self):
3159 try:
3160 os.sendfile(self.sockno, self.fileno, 0, 4096,
3161 flags=os.SF_NODISKIO)
3162 except OSError as err:
3163 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3164 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003165
3166
Larry Hastings9cf065c2012-06-22 16:30:09 -07003167def supports_extended_attributes():
3168 if not hasattr(os, "setxattr"):
3169 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003170
Larry Hastings9cf065c2012-06-22 16:30:09 -07003171 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003172 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003173 try:
3174 os.setxattr(fp.fileno(), b"user.test", b"")
3175 except OSError:
3176 return False
3177 finally:
3178 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003179
3180 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003181
3182
3183@unittest.skipUnless(supports_extended_attributes(),
3184 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003185# Kernels < 2.6.39 don't respect setxattr flags.
3186@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003187class ExtendedAttributeTests(unittest.TestCase):
3188
Larry Hastings9cf065c2012-06-22 16:30:09 -07003189 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003190 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003191 self.addCleanup(support.unlink, fn)
3192 create_file(fn)
3193
Benjamin Peterson799bd802011-08-31 22:15:17 -04003194 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003195 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003196 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003197
Victor Stinnerf12e5062011-10-16 22:12:03 +02003198 init_xattr = listxattr(fn)
3199 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003200
Larry Hastings9cf065c2012-06-22 16:30:09 -07003201 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003202 xattr = set(init_xattr)
3203 xattr.add("user.test")
3204 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003205 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3206 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3207 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003208
Benjamin Peterson799bd802011-08-31 22:15:17 -04003209 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003210 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003211 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003212
Benjamin Peterson799bd802011-08-31 22:15:17 -04003213 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003214 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003215 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003216
Larry Hastings9cf065c2012-06-22 16:30:09 -07003217 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003218 xattr.add("user.test2")
3219 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003220 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003221
Benjamin Peterson799bd802011-08-31 22:15:17 -04003222 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003223 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003224 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003225
Victor Stinnerf12e5062011-10-16 22:12:03 +02003226 xattr.remove("user.test")
3227 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003228 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3229 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3230 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3231 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003232 many = sorted("user.test{}".format(i) for i in range(100))
3233 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003234 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003235 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003236
Larry Hastings9cf065c2012-06-22 16:30:09 -07003237 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003238 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003239 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003240
3241 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3242 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003243
3244 def test_simple(self):
3245 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3246 os.listxattr)
3247
3248 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003249 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3250 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003251
3252 def test_fds(self):
3253 def getxattr(path, *args):
3254 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003255 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003256 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003257 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003258 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003259 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003260 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003261 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003262 def listxattr(path, *args):
3263 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003264 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003265 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3266
3267
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003268@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3269class TermsizeTests(unittest.TestCase):
3270 def test_does_not_crash(self):
3271 """Check if get_terminal_size() returns a meaningful value.
3272
3273 There's no easy portable way to actually check the size of the
3274 terminal, so let's check if it returns something sensible instead.
3275 """
3276 try:
3277 size = os.get_terminal_size()
3278 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003279 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003280 # Under win32 a generic OSError can be thrown if the
3281 # handle cannot be retrieved
3282 self.skipTest("failed to query terminal size")
3283 raise
3284
Antoine Pitroucfade362012-02-08 23:48:59 +01003285 self.assertGreaterEqual(size.columns, 0)
3286 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003287
3288 def test_stty_match(self):
3289 """Check if stty returns the same results
3290
3291 stty actually tests stdin, so get_terminal_size is invoked on
3292 stdin explicitly. If stty succeeded, then get_terminal_size()
3293 should work too.
3294 """
3295 try:
3296 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003297 except (FileNotFoundError, subprocess.CalledProcessError,
3298 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003299 self.skipTest("stty invocation failed")
3300 expected = (int(size[1]), int(size[0])) # reversed order
3301
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003302 try:
3303 actual = os.get_terminal_size(sys.__stdin__.fileno())
3304 except OSError as e:
3305 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3306 # Under win32 a generic OSError can be thrown if the
3307 # handle cannot be retrieved
3308 self.skipTest("failed to query terminal size")
3309 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003310 self.assertEqual(expected, actual)
3311
3312
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003313@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003314@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003315class MemfdCreateTests(unittest.TestCase):
3316 def test_memfd_create(self):
3317 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3318 self.assertNotEqual(fd, -1)
3319 self.addCleanup(os.close, fd)
3320 self.assertFalse(os.get_inheritable(fd))
3321 with open(fd, "wb", closefd=False) as f:
3322 f.write(b'memfd_create')
3323 self.assertEqual(f.tell(), 12)
3324
3325 fd2 = os.memfd_create("Hi")
3326 self.addCleanup(os.close, fd2)
3327 self.assertFalse(os.get_inheritable(fd2))
3328
3329
Victor Stinner292c8352012-10-30 02:17:38 +01003330class OSErrorTests(unittest.TestCase):
3331 def setUp(self):
3332 class Str(str):
3333 pass
3334
Victor Stinnerafe17062012-10-31 22:47:43 +01003335 self.bytes_filenames = []
3336 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003337 if support.TESTFN_UNENCODABLE is not None:
3338 decoded = support.TESTFN_UNENCODABLE
3339 else:
3340 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003341 self.unicode_filenames.append(decoded)
3342 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003343 if support.TESTFN_UNDECODABLE is not None:
3344 encoded = support.TESTFN_UNDECODABLE
3345 else:
3346 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003347 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003348 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003349 self.bytes_filenames.append(memoryview(encoded))
3350
3351 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003352
3353 def test_oserror_filename(self):
3354 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003355 (self.filenames, os.chdir,),
3356 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003357 (self.filenames, os.lstat,),
3358 (self.filenames, os.open, os.O_RDONLY),
3359 (self.filenames, os.rmdir,),
3360 (self.filenames, os.stat,),
3361 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003362 ]
3363 if sys.platform == "win32":
3364 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003365 (self.bytes_filenames, os.rename, b"dst"),
3366 (self.bytes_filenames, os.replace, b"dst"),
3367 (self.unicode_filenames, os.rename, "dst"),
3368 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003369 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003370 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003371 else:
3372 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003373 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003374 (self.filenames, os.rename, "dst"),
3375 (self.filenames, os.replace, "dst"),
3376 ))
3377 if hasattr(os, "chown"):
3378 funcs.append((self.filenames, os.chown, 0, 0))
3379 if hasattr(os, "lchown"):
3380 funcs.append((self.filenames, os.lchown, 0, 0))
3381 if hasattr(os, "truncate"):
3382 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003383 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003384 funcs.append((self.filenames, os.chflags, 0))
3385 if hasattr(os, "lchflags"):
3386 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003387 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003388 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003389 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003390 if sys.platform == "win32":
3391 funcs.append((self.bytes_filenames, os.link, b"dst"))
3392 funcs.append((self.unicode_filenames, os.link, "dst"))
3393 else:
3394 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003395 if hasattr(os, "listxattr"):
3396 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003397 (self.filenames, os.listxattr,),
3398 (self.filenames, os.getxattr, "user.test"),
3399 (self.filenames, os.setxattr, "user.test", b'user'),
3400 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003401 ))
3402 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003403 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003404 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003405 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003406
Steve Dowercc16be82016-09-08 10:35:16 -07003407
Victor Stinnerafe17062012-10-31 22:47:43 +01003408 for filenames, func, *func_args in funcs:
3409 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003410 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003411 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003412 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003413 else:
3414 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3415 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003416 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003417 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003418 except UnicodeDecodeError:
3419 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003420 else:
3421 self.fail("No exception thrown by {}".format(func))
3422
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003423class CPUCountTests(unittest.TestCase):
3424 def test_cpu_count(self):
3425 cpus = os.cpu_count()
3426 if cpus is not None:
3427 self.assertIsInstance(cpus, int)
3428 self.assertGreater(cpus, 0)
3429 else:
3430 self.skipTest("Could not determine the number of CPUs")
3431
Victor Stinnerdaf45552013-08-28 00:53:59 +02003432
3433class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003434 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003435 fd = os.open(__file__, os.O_RDONLY)
3436 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003437 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003438
Victor Stinnerdaf45552013-08-28 00:53:59 +02003439 os.set_inheritable(fd, True)
3440 self.assertEqual(os.get_inheritable(fd), True)
3441
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003442 @unittest.skipIf(fcntl is None, "need fcntl")
3443 def test_get_inheritable_cloexec(self):
3444 fd = os.open(__file__, os.O_RDONLY)
3445 self.addCleanup(os.close, fd)
3446 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003447
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003448 # clear FD_CLOEXEC flag
3449 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3450 flags &= ~fcntl.FD_CLOEXEC
3451 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003452
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003453 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003454
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003455 @unittest.skipIf(fcntl is None, "need fcntl")
3456 def test_set_inheritable_cloexec(self):
3457 fd = os.open(__file__, os.O_RDONLY)
3458 self.addCleanup(os.close, fd)
3459 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3460 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003461
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003462 os.set_inheritable(fd, True)
3463 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3464 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003465
Victor Stinnerdaf45552013-08-28 00:53:59 +02003466 def test_open(self):
3467 fd = os.open(__file__, os.O_RDONLY)
3468 self.addCleanup(os.close, fd)
3469 self.assertEqual(os.get_inheritable(fd), False)
3470
3471 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3472 def test_pipe(self):
3473 rfd, wfd = os.pipe()
3474 self.addCleanup(os.close, rfd)
3475 self.addCleanup(os.close, wfd)
3476 self.assertEqual(os.get_inheritable(rfd), False)
3477 self.assertEqual(os.get_inheritable(wfd), False)
3478
3479 def test_dup(self):
3480 fd1 = os.open(__file__, os.O_RDONLY)
3481 self.addCleanup(os.close, fd1)
3482
3483 fd2 = os.dup(fd1)
3484 self.addCleanup(os.close, fd2)
3485 self.assertEqual(os.get_inheritable(fd2), False)
3486
Zackery Spytz5be66602019-08-23 12:38:41 -06003487 def test_dup_standard_stream(self):
3488 fd = os.dup(1)
3489 self.addCleanup(os.close, fd)
3490 self.assertGreater(fd, 0)
3491
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003492 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3493 def test_dup_nul(self):
3494 # os.dup() was creating inheritable fds for character files.
3495 fd1 = os.open('NUL', os.O_RDONLY)
3496 self.addCleanup(os.close, fd1)
3497 fd2 = os.dup(fd1)
3498 self.addCleanup(os.close, fd2)
3499 self.assertFalse(os.get_inheritable(fd2))
3500
Victor Stinnerdaf45552013-08-28 00:53:59 +02003501 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3502 def test_dup2(self):
3503 fd = os.open(__file__, os.O_RDONLY)
3504 self.addCleanup(os.close, fd)
3505
3506 # inheritable by default
3507 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003508 self.addCleanup(os.close, fd2)
3509 self.assertEqual(os.dup2(fd, fd2), fd2)
3510 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003511
3512 # force non-inheritable
3513 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003514 self.addCleanup(os.close, fd3)
3515 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3516 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003517
3518 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3519 def test_openpty(self):
3520 master_fd, slave_fd = os.openpty()
3521 self.addCleanup(os.close, master_fd)
3522 self.addCleanup(os.close, slave_fd)
3523 self.assertEqual(os.get_inheritable(master_fd), False)
3524 self.assertEqual(os.get_inheritable(slave_fd), False)
3525
3526
Brett Cannon3f9183b2016-08-26 14:44:48 -07003527class PathTConverterTests(unittest.TestCase):
3528 # tuples of (function name, allows fd arguments, additional arguments to
3529 # function, cleanup function)
3530 functions = [
3531 ('stat', True, (), None),
3532 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003533 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003534 ('chflags', False, (0,), None),
3535 ('lchflags', False, (0,), None),
3536 ('open', False, (0,), getattr(os, 'close', None)),
3537 ]
3538
3539 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003540 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003541 if os.name == 'nt':
3542 bytes_fspath = bytes_filename = None
3543 else:
3544 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003545 bytes_fspath = FakePath(bytes_filename)
3546 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003547 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003548 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003549
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003550 int_fspath = FakePath(fd)
3551 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003552
3553 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3554 with self.subTest(name=name):
3555 try:
3556 fn = getattr(os, name)
3557 except AttributeError:
3558 continue
3559
Brett Cannon8f96a302016-08-26 19:30:11 -07003560 for path in (str_filename, bytes_filename, str_fspath,
3561 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003562 if path is None:
3563 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003564 with self.subTest(name=name, path=path):
3565 result = fn(path, *extra_args)
3566 if cleanup_fn is not None:
3567 cleanup_fn(result)
3568
3569 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003570 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003571 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003572
3573 if allow_fd:
3574 result = fn(fd, *extra_args) # should not fail
3575 if cleanup_fn is not None:
3576 cleanup_fn(result)
3577 else:
3578 with self.assertRaisesRegex(
3579 TypeError,
3580 'os.PathLike'):
3581 fn(fd, *extra_args)
3582
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003583 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003584 msg = r'__fspath__\(\) to return str or bytes, not %s'
3585 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003586 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003587 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003588 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003589 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003590 os.stat(FakePath(object()))
3591
Brett Cannon3f9183b2016-08-26 14:44:48 -07003592
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003593@unittest.skipUnless(hasattr(os, 'get_blocking'),
3594 'needs os.get_blocking() and os.set_blocking()')
3595class BlockingTests(unittest.TestCase):
3596 def test_blocking(self):
3597 fd = os.open(__file__, os.O_RDONLY)
3598 self.addCleanup(os.close, fd)
3599 self.assertEqual(os.get_blocking(fd), True)
3600
3601 os.set_blocking(fd, False)
3602 self.assertEqual(os.get_blocking(fd), False)
3603
3604 os.set_blocking(fd, True)
3605 self.assertEqual(os.get_blocking(fd), True)
3606
3607
Yury Selivanov97e2e062014-09-26 12:33:06 -04003608
3609class ExportsTests(unittest.TestCase):
3610 def test_os_all(self):
3611 self.assertIn('open', os.__all__)
3612 self.assertIn('walk', os.__all__)
3613
3614
Victor Stinner6036e442015-03-08 01:58:04 +01003615class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003616 check_no_resource_warning = support.check_no_resource_warning
3617
Victor Stinner6036e442015-03-08 01:58:04 +01003618 def setUp(self):
3619 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003620 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003621 self.addCleanup(support.rmtree, self.path)
3622 os.mkdir(self.path)
3623
3624 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003625 path = self.bytes_path if isinstance(name, bytes) else self.path
3626 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003627 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003628 return filename
3629
3630 def get_entries(self, names):
3631 entries = dict((entry.name, entry)
3632 for entry in os.scandir(self.path))
3633 self.assertEqual(sorted(entries.keys()), names)
3634 return entries
3635
3636 def assert_stat_equal(self, stat1, stat2, skip_fields):
3637 if skip_fields:
3638 for attr in dir(stat1):
3639 if not attr.startswith("st_"):
3640 continue
3641 if attr in ("st_dev", "st_ino", "st_nlink"):
3642 continue
3643 self.assertEqual(getattr(stat1, attr),
3644 getattr(stat2, attr),
3645 (stat1, stat2, attr))
3646 else:
3647 self.assertEqual(stat1, stat2)
3648
3649 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003650 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003651 self.assertEqual(entry.name, name)
3652 self.assertEqual(entry.path, os.path.join(self.path, name))
3653 self.assertEqual(entry.inode(),
3654 os.stat(entry.path, follow_symlinks=False).st_ino)
3655
3656 entry_stat = os.stat(entry.path)
3657 self.assertEqual(entry.is_dir(),
3658 stat.S_ISDIR(entry_stat.st_mode))
3659 self.assertEqual(entry.is_file(),
3660 stat.S_ISREG(entry_stat.st_mode))
3661 self.assertEqual(entry.is_symlink(),
3662 os.path.islink(entry.path))
3663
3664 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3665 self.assertEqual(entry.is_dir(follow_symlinks=False),
3666 stat.S_ISDIR(entry_lstat.st_mode))
3667 self.assertEqual(entry.is_file(follow_symlinks=False),
3668 stat.S_ISREG(entry_lstat.st_mode))
3669
3670 self.assert_stat_equal(entry.stat(),
3671 entry_stat,
3672 os.name == 'nt' and not is_symlink)
3673 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3674 entry_lstat,
3675 os.name == 'nt')
3676
3677 def test_attributes(self):
3678 link = hasattr(os, 'link')
3679 symlink = support.can_symlink()
3680
3681 dirname = os.path.join(self.path, "dir")
3682 os.mkdir(dirname)
3683 filename = self.create_file("file.txt")
3684 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003685 try:
3686 os.link(filename, os.path.join(self.path, "link_file.txt"))
3687 except PermissionError as e:
3688 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003689 if symlink:
3690 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3691 target_is_directory=True)
3692 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3693
3694 names = ['dir', 'file.txt']
3695 if link:
3696 names.append('link_file.txt')
3697 if symlink:
3698 names.extend(('symlink_dir', 'symlink_file.txt'))
3699 entries = self.get_entries(names)
3700
3701 entry = entries['dir']
3702 self.check_entry(entry, 'dir', True, False, False)
3703
3704 entry = entries['file.txt']
3705 self.check_entry(entry, 'file.txt', False, True, False)
3706
3707 if link:
3708 entry = entries['link_file.txt']
3709 self.check_entry(entry, 'link_file.txt', False, True, False)
3710
3711 if symlink:
3712 entry = entries['symlink_dir']
3713 self.check_entry(entry, 'symlink_dir', True, False, True)
3714
3715 entry = entries['symlink_file.txt']
3716 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3717
3718 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003719 path = self.bytes_path if isinstance(name, bytes) else self.path
3720 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003721 self.assertEqual(len(entries), 1)
3722
3723 entry = entries[0]
3724 self.assertEqual(entry.name, name)
3725 return entry
3726
Brett Cannon96881cd2016-06-10 14:37:21 -07003727 def create_file_entry(self, name='file.txt'):
3728 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003729 return self.get_entry(os.path.basename(filename))
3730
3731 def test_current_directory(self):
3732 filename = self.create_file()
3733 old_dir = os.getcwd()
3734 try:
3735 os.chdir(self.path)
3736
3737 # call scandir() without parameter: it must list the content
3738 # of the current directory
3739 entries = dict((entry.name, entry) for entry in os.scandir())
3740 self.assertEqual(sorted(entries.keys()),
3741 [os.path.basename(filename)])
3742 finally:
3743 os.chdir(old_dir)
3744
3745 def test_repr(self):
3746 entry = self.create_file_entry()
3747 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3748
Brett Cannon96881cd2016-06-10 14:37:21 -07003749 def test_fspath_protocol(self):
3750 entry = self.create_file_entry()
3751 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3752
3753 def test_fspath_protocol_bytes(self):
3754 bytes_filename = os.fsencode('bytesfile.txt')
3755 bytes_entry = self.create_file_entry(name=bytes_filename)
3756 fspath = os.fspath(bytes_entry)
3757 self.assertIsInstance(fspath, bytes)
3758 self.assertEqual(fspath,
3759 os.path.join(os.fsencode(self.path),bytes_filename))
3760
Victor Stinner6036e442015-03-08 01:58:04 +01003761 def test_removed_dir(self):
3762 path = os.path.join(self.path, 'dir')
3763
3764 os.mkdir(path)
3765 entry = self.get_entry('dir')
3766 os.rmdir(path)
3767
3768 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3769 if os.name == 'nt':
3770 self.assertTrue(entry.is_dir())
3771 self.assertFalse(entry.is_file())
3772 self.assertFalse(entry.is_symlink())
3773 if os.name == 'nt':
3774 self.assertRaises(FileNotFoundError, entry.inode)
3775 # don't fail
3776 entry.stat()
3777 entry.stat(follow_symlinks=False)
3778 else:
3779 self.assertGreater(entry.inode(), 0)
3780 self.assertRaises(FileNotFoundError, entry.stat)
3781 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3782
3783 def test_removed_file(self):
3784 entry = self.create_file_entry()
3785 os.unlink(entry.path)
3786
3787 self.assertFalse(entry.is_dir())
3788 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3789 if os.name == 'nt':
3790 self.assertTrue(entry.is_file())
3791 self.assertFalse(entry.is_symlink())
3792 if os.name == 'nt':
3793 self.assertRaises(FileNotFoundError, entry.inode)
3794 # don't fail
3795 entry.stat()
3796 entry.stat(follow_symlinks=False)
3797 else:
3798 self.assertGreater(entry.inode(), 0)
3799 self.assertRaises(FileNotFoundError, entry.stat)
3800 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3801
3802 def test_broken_symlink(self):
3803 if not support.can_symlink():
3804 return self.skipTest('cannot create symbolic link')
3805
3806 filename = self.create_file("file.txt")
3807 os.symlink(filename,
3808 os.path.join(self.path, "symlink.txt"))
3809 entries = self.get_entries(['file.txt', 'symlink.txt'])
3810 entry = entries['symlink.txt']
3811 os.unlink(filename)
3812
3813 self.assertGreater(entry.inode(), 0)
3814 self.assertFalse(entry.is_dir())
3815 self.assertFalse(entry.is_file()) # broken symlink returns False
3816 self.assertFalse(entry.is_dir(follow_symlinks=False))
3817 self.assertFalse(entry.is_file(follow_symlinks=False))
3818 self.assertTrue(entry.is_symlink())
3819 self.assertRaises(FileNotFoundError, entry.stat)
3820 # don't fail
3821 entry.stat(follow_symlinks=False)
3822
3823 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003824 self.create_file("file.txt")
3825
3826 path_bytes = os.fsencode(self.path)
3827 entries = list(os.scandir(path_bytes))
3828 self.assertEqual(len(entries), 1, entries)
3829 entry = entries[0]
3830
3831 self.assertEqual(entry.name, b'file.txt')
3832 self.assertEqual(entry.path,
3833 os.fsencode(os.path.join(self.path, 'file.txt')))
3834
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003835 def test_bytes_like(self):
3836 self.create_file("file.txt")
3837
3838 for cls in bytearray, memoryview:
3839 path_bytes = cls(os.fsencode(self.path))
3840 with self.assertWarns(DeprecationWarning):
3841 entries = list(os.scandir(path_bytes))
3842 self.assertEqual(len(entries), 1, entries)
3843 entry = entries[0]
3844
3845 self.assertEqual(entry.name, b'file.txt')
3846 self.assertEqual(entry.path,
3847 os.fsencode(os.path.join(self.path, 'file.txt')))
3848 self.assertIs(type(entry.name), bytes)
3849 self.assertIs(type(entry.path), bytes)
3850
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003851 @unittest.skipUnless(os.listdir in os.supports_fd,
3852 'fd support for listdir required for this test.')
3853 def test_fd(self):
3854 self.assertIn(os.scandir, os.supports_fd)
3855 self.create_file('file.txt')
3856 expected_names = ['file.txt']
3857 if support.can_symlink():
3858 os.symlink('file.txt', os.path.join(self.path, 'link'))
3859 expected_names.append('link')
3860
3861 fd = os.open(self.path, os.O_RDONLY)
3862 try:
3863 with os.scandir(fd) as it:
3864 entries = list(it)
3865 names = [entry.name for entry in entries]
3866 self.assertEqual(sorted(names), expected_names)
3867 self.assertEqual(names, os.listdir(fd))
3868 for entry in entries:
3869 self.assertEqual(entry.path, entry.name)
3870 self.assertEqual(os.fspath(entry), entry.name)
3871 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3872 if os.stat in os.supports_dir_fd:
3873 st = os.stat(entry.name, dir_fd=fd)
3874 self.assertEqual(entry.stat(), st)
3875 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3876 self.assertEqual(entry.stat(follow_symlinks=False), st)
3877 finally:
3878 os.close(fd)
3879
Victor Stinner6036e442015-03-08 01:58:04 +01003880 def test_empty_path(self):
3881 self.assertRaises(FileNotFoundError, os.scandir, '')
3882
3883 def test_consume_iterator_twice(self):
3884 self.create_file("file.txt")
3885 iterator = os.scandir(self.path)
3886
3887 entries = list(iterator)
3888 self.assertEqual(len(entries), 1, entries)
3889
3890 # check than consuming the iterator twice doesn't raise exception
3891 entries2 = list(iterator)
3892 self.assertEqual(len(entries2), 0, entries2)
3893
3894 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003895 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003896 self.assertRaises(TypeError, os.scandir, obj)
3897
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003898 def test_close(self):
3899 self.create_file("file.txt")
3900 self.create_file("file2.txt")
3901 iterator = os.scandir(self.path)
3902 next(iterator)
3903 iterator.close()
3904 # multiple closes
3905 iterator.close()
3906 with self.check_no_resource_warning():
3907 del iterator
3908
3909 def test_context_manager(self):
3910 self.create_file("file.txt")
3911 self.create_file("file2.txt")
3912 with os.scandir(self.path) as iterator:
3913 next(iterator)
3914 with self.check_no_resource_warning():
3915 del iterator
3916
3917 def test_context_manager_close(self):
3918 self.create_file("file.txt")
3919 self.create_file("file2.txt")
3920 with os.scandir(self.path) as iterator:
3921 next(iterator)
3922 iterator.close()
3923
3924 def test_context_manager_exception(self):
3925 self.create_file("file.txt")
3926 self.create_file("file2.txt")
3927 with self.assertRaises(ZeroDivisionError):
3928 with os.scandir(self.path) as iterator:
3929 next(iterator)
3930 1/0
3931 with self.check_no_resource_warning():
3932 del iterator
3933
3934 def test_resource_warning(self):
3935 self.create_file("file.txt")
3936 self.create_file("file2.txt")
3937 iterator = os.scandir(self.path)
3938 next(iterator)
3939 with self.assertWarns(ResourceWarning):
3940 del iterator
3941 support.gc_collect()
3942 # exhausted iterator
3943 iterator = os.scandir(self.path)
3944 list(iterator)
3945 with self.check_no_resource_warning():
3946 del iterator
3947
Victor Stinner6036e442015-03-08 01:58:04 +01003948
Ethan Furmancdc08792016-06-02 15:06:09 -07003949class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003950
3951 # Abstracted so it can be overridden to test pure Python implementation
3952 # if a C version is provided.
3953 fspath = staticmethod(os.fspath)
3954
Ethan Furmancdc08792016-06-02 15:06:09 -07003955 def test_return_bytes(self):
3956 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003957 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003958
3959 def test_return_string(self):
3960 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003961 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003962
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003963 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003964 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003965 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003966
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003967 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003968 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3969 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3970
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003971 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003972 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3973 self.assertTrue(issubclass(FakePath, os.PathLike))
3974 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003975
Ethan Furmancdc08792016-06-02 15:06:09 -07003976 def test_garbage_in_exception_out(self):
3977 vapor = type('blah', (), {})
3978 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003979 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003980
3981 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003982 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003983
Brett Cannon044283a2016-07-15 10:41:49 -07003984 def test_bad_pathlike(self):
3985 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003986 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003987 # __fspath__ attribute that is not callable.
3988 c = type('foo', (), {})
3989 c.__fspath__ = 1
3990 self.assertRaises(TypeError, self.fspath, c())
3991 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003992 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003993 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003994
Victor Stinnerc29b5852017-11-02 07:28:27 -07003995
3996class TimesTests(unittest.TestCase):
3997 def test_times(self):
3998 times = os.times()
3999 self.assertIsInstance(times, os.times_result)
4000
4001 for field in ('user', 'system', 'children_user', 'children_system',
4002 'elapsed'):
4003 value = getattr(times, field)
4004 self.assertIsInstance(value, float)
4005
4006 if os.name == 'nt':
4007 self.assertEqual(times.children_user, 0)
4008 self.assertEqual(times.children_system, 0)
4009 self.assertEqual(times.elapsed, 0)
4010
4011
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004012# Only test if the C version is provided, otherwise TestPEP519 already tested
4013# the pure Python implementation.
4014if hasattr(os, "_fspath"):
4015 class TestPEP519PurePython(TestPEP519):
4016
4017 """Explicitly test the pure Python implementation of os.fspath()."""
4018
4019 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004020
4021
Fred Drake2e2be372001-09-20 21:33:42 +00004022if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004023 unittest.main()