blob: 440cd6c1cf73c3b0ffb4b3a21b7d96f8ebcedfdd [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200109 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Victor Stinner689830e2019-06-26 17:31:12 +0200144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Victor Stinner47aacc82015-06-12 17:26:23 +0200594
595class UtimeTests(unittest.TestCase):
596 def setUp(self):
597 self.dirname = support.TESTFN
598 self.fname = os.path.join(self.dirname, "f1")
599
600 self.addCleanup(support.rmtree, self.dirname)
601 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100602 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200603
Victor Stinner47aacc82015-06-12 17:26:23 +0200604 def support_subsecond(self, filename):
605 # Heuristic to check if the filesystem supports timestamp with
606 # subsecond resolution: check if float and int timestamps are different
607 st = os.stat(filename)
608 return ((st.st_atime != st[7])
609 or (st.st_mtime != st[8])
610 or (st.st_ctime != st[9]))
611
612 def _test_utime(self, set_time, filename=None):
613 if not filename:
614 filename = self.fname
615
616 support_subsecond = self.support_subsecond(filename)
617 if support_subsecond:
618 # Timestamp with a resolution of 1 microsecond (10^-6).
619 #
620 # The resolution of the C internal function used by os.utime()
621 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
622 # test with a resolution of 1 ns requires more work:
623 # see the issue #15745.
624 atime_ns = 1002003000 # 1.002003 seconds
625 mtime_ns = 4005006000 # 4.005006 seconds
626 else:
627 # use a resolution of 1 second
628 atime_ns = 5 * 10**9
629 mtime_ns = 8 * 10**9
630
631 set_time(filename, (atime_ns, mtime_ns))
632 st = os.stat(filename)
633
634 if support_subsecond:
635 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
636 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
637 else:
638 self.assertEqual(st.st_atime, atime_ns * 1e-9)
639 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
640 self.assertEqual(st.st_atime_ns, atime_ns)
641 self.assertEqual(st.st_mtime_ns, mtime_ns)
642
643 def test_utime(self):
644 def set_time(filename, ns):
645 # test the ns keyword parameter
646 os.utime(filename, ns=ns)
647 self._test_utime(set_time)
648
649 @staticmethod
650 def ns_to_sec(ns):
651 # Convert a number of nanosecond (int) to a number of seconds (float).
652 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
653 # issue, os.utime() rounds towards minus infinity.
654 return (ns * 1e-9) + 0.5e-9
655
656 def test_utime_by_indexed(self):
657 # pass times as floating point seconds as the second indexed parameter
658 def set_time(filename, ns):
659 atime_ns, mtime_ns = ns
660 atime = self.ns_to_sec(atime_ns)
661 mtime = self.ns_to_sec(mtime_ns)
662 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
663 # or utime(time_t)
664 os.utime(filename, (atime, mtime))
665 self._test_utime(set_time)
666
667 def test_utime_by_times(self):
668 def set_time(filename, ns):
669 atime_ns, mtime_ns = ns
670 atime = self.ns_to_sec(atime_ns)
671 mtime = self.ns_to_sec(mtime_ns)
672 # test the times keyword parameter
673 os.utime(filename, times=(atime, mtime))
674 self._test_utime(set_time)
675
676 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
677 "follow_symlinks support for utime required "
678 "for this test.")
679 def test_utime_nofollow_symlinks(self):
680 def set_time(filename, ns):
681 # use follow_symlinks=False to test utimensat(timespec)
682 # or lutimes(timeval)
683 os.utime(filename, ns=ns, follow_symlinks=False)
684 self._test_utime(set_time)
685
686 @unittest.skipUnless(os.utime in os.supports_fd,
687 "fd support for utime required for this test.")
688 def test_utime_fd(self):
689 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100690 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200691 # use a file descriptor to test futimens(timespec)
692 # or futimes(timeval)
693 os.utime(fp.fileno(), ns=ns)
694 self._test_utime(set_time)
695
696 @unittest.skipUnless(os.utime in os.supports_dir_fd,
697 "dir_fd support for utime required for this test.")
698 def test_utime_dir_fd(self):
699 def set_time(filename, ns):
700 dirname, name = os.path.split(filename)
701 dirfd = os.open(dirname, os.O_RDONLY)
702 try:
703 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
704 os.utime(name, dir_fd=dirfd, ns=ns)
705 finally:
706 os.close(dirfd)
707 self._test_utime(set_time)
708
709 def test_utime_directory(self):
710 def set_time(filename, ns):
711 # test calling os.utime() on a directory
712 os.utime(filename, ns=ns)
713 self._test_utime(set_time, filename=self.dirname)
714
715 def _test_utime_current(self, set_time):
716 # Get the system clock
717 current = time.time()
718
719 # Call os.utime() to set the timestamp to the current system clock
720 set_time(self.fname)
721
722 if not self.support_subsecond(self.fname):
723 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700724 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200725 # On Windows, the usual resolution of time.time() is 15.6 ms.
726 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700727 #
728 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
729 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200730 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200731 st = os.stat(self.fname)
732 msg = ("st_time=%r, current=%r, dt=%r"
733 % (st.st_mtime, current, st.st_mtime - current))
734 self.assertAlmostEqual(st.st_mtime, current,
735 delta=delta, msg=msg)
736
737 def test_utime_current(self):
738 def set_time(filename):
739 # Set to the current time in the new way
740 os.utime(self.fname)
741 self._test_utime_current(set_time)
742
743 def test_utime_current_old(self):
744 def set_time(filename):
745 # Set to the current time in the old explicit way.
746 os.utime(self.fname, None)
747 self._test_utime_current(set_time)
748
749 def get_file_system(self, path):
750 if sys.platform == 'win32':
751 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
752 import ctypes
753 kernel32 = ctypes.windll.kernel32
754 buf = ctypes.create_unicode_buffer("", 100)
755 ok = kernel32.GetVolumeInformationW(root, None, 0,
756 None, None, None,
757 buf, len(buf))
758 if ok:
759 return buf.value
760 # return None if the filesystem is unknown
761
762 def test_large_time(self):
763 # Many filesystems are limited to the year 2038. At least, the test
764 # pass with NTFS filesystem.
765 if self.get_file_system(self.dirname) != "NTFS":
766 self.skipTest("requires NTFS")
767
768 large = 5000000000 # some day in 2128
769 os.utime(self.fname, (large, large))
770 self.assertEqual(os.stat(self.fname).st_mtime, large)
771
772 def test_utime_invalid_arguments(self):
773 # seconds and nanoseconds parameters are mutually exclusive
774 with self.assertRaises(ValueError):
775 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200776 with self.assertRaises(TypeError):
777 os.utime(self.fname, [5, 5])
778 with self.assertRaises(TypeError):
779 os.utime(self.fname, (5,))
780 with self.assertRaises(TypeError):
781 os.utime(self.fname, (5, 5, 5))
782 with self.assertRaises(TypeError):
783 os.utime(self.fname, ns=[5, 5])
784 with self.assertRaises(TypeError):
785 os.utime(self.fname, ns=(5,))
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, ns=(5, 5, 5))
788
789 if os.utime not in os.supports_follow_symlinks:
790 with self.assertRaises(NotImplementedError):
791 os.utime(self.fname, (5, 5), follow_symlinks=False)
792 if os.utime not in os.supports_fd:
793 with open(self.fname, 'wb', 0) as fp:
794 with self.assertRaises(TypeError):
795 os.utime(fp.fileno(), (5, 5))
796 if os.utime not in os.supports_dir_fd:
797 with self.assertRaises(NotImplementedError):
798 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200799
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300800 @support.cpython_only
801 def test_issue31577(self):
802 # The interpreter shouldn't crash in case utime() received a bad
803 # ns argument.
804 def get_bad_int(divmod_ret_val):
805 class BadInt:
806 def __divmod__(*args):
807 return divmod_ret_val
808 return BadInt()
809 with self.assertRaises(TypeError):
810 os.utime(self.fname, ns=(get_bad_int(42), 1))
811 with self.assertRaises(TypeError):
812 os.utime(self.fname, ns=(get_bad_int(()), 1))
813 with self.assertRaises(TypeError):
814 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
815
Victor Stinner47aacc82015-06-12 17:26:23 +0200816
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000817from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000818
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000819class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000820 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000821 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000822
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000823 def setUp(self):
824 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000825 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000826 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000827 for key, value in self._reference().items():
828 os.environ[key] = value
829
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000830 def tearDown(self):
831 os.environ.clear()
832 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 os.environb.clear()
835 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000836
Christian Heimes90333392007-11-01 19:08:42 +0000837 def _reference(self):
838 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
839
840 def _empty_mapping(self):
841 os.environ.clear()
842 return os.environ
843
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000844 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200845 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
846 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000847 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000848 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300849 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200850 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300851 value = popen.read().strip()
852 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000853
Xavier de Gayed1415312016-07-22 12:15:29 +0200854 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
855 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000856 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200857 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
858 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 it = iter(popen)
860 self.assertEqual(next(it), "line1\n")
861 self.assertEqual(next(it), "line2\n")
862 self.assertEqual(next(it), "line3\n")
863 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000864
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000865 # Verify environ keys and values from the OS are of the
866 # correct str type.
867 def test_keyvalue_types(self):
868 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000869 self.assertEqual(type(key), str)
870 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000871
Christian Heimes90333392007-11-01 19:08:42 +0000872 def test_items(self):
873 for key, value in self._reference().items():
874 self.assertEqual(os.environ.get(key), value)
875
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000876 # Issue 7310
877 def test___repr__(self):
878 """Check that the repr() of os.environ looks like environ({...})."""
879 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000880 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
881 '{!r}: {!r}'.format(key, value)
882 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000883
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000884 def test_get_exec_path(self):
885 defpath_list = os.defpath.split(os.pathsep)
886 test_path = ['/monty', '/python', '', '/flying/circus']
887 test_env = {'PATH': os.pathsep.join(test_path)}
888
889 saved_environ = os.environ
890 try:
891 os.environ = dict(test_env)
892 # Test that defaulting to os.environ works.
893 self.assertSequenceEqual(test_path, os.get_exec_path())
894 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
895 finally:
896 os.environ = saved_environ
897
898 # No PATH environment variable
899 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
900 # Empty PATH environment variable
901 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
902 # Supplied PATH environment variable
903 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
904
Victor Stinnerb745a742010-05-18 17:17:23 +0000905 if os.supports_bytes_environ:
906 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000907 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000908 # ignore BytesWarning warning
909 with warnings.catch_warnings(record=True):
910 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000911 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000912 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000913 pass
914 else:
915 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000916
917 # bytes key and/or value
918 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
919 ['abc'])
920 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
921 ['abc'])
922 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
923 ['abc'])
924
925 @unittest.skipUnless(os.supports_bytes_environ,
926 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000927 def test_environb(self):
928 # os.environ -> os.environb
929 value = 'euro\u20ac'
930 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000931 value_bytes = value.encode(sys.getfilesystemencoding(),
932 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000933 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000934 msg = "U+20AC character is not encodable to %s" % (
935 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000936 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000937 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000938 self.assertEqual(os.environ['unicode'], value)
939 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000940
941 # os.environb -> os.environ
942 value = b'\xff'
943 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000944 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000947
Victor Stinner13ff2452018-01-22 18:32:50 +0100948 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100949 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100950 def test_unset_error(self):
951 if sys.platform == "win32":
952 # an environment variable is limited to 32,767 characters
953 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100954 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100955 else:
956 # "=" is not allowed in a variable name
957 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100958 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100959
Victor Stinner6d101392013-04-14 16:35:04 +0200960 def test_key_type(self):
961 missing = 'missingkey'
962 self.assertNotIn(missing, os.environ)
963
Victor Stinner839e5ea2013-04-14 16:43:03 +0200964 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200965 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200966 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200967 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200968
Victor Stinner839e5ea2013-04-14 16:43:03 +0200969 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200970 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200971 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200972 self.assertTrue(cm.exception.__suppress_context__)
973
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300974 def _test_environ_iteration(self, collection):
975 iterator = iter(collection)
976 new_key = "__new_key__"
977
978 next(iterator) # start iteration over os.environ.items
979
980 # add a new key in os.environ mapping
981 os.environ[new_key] = "test_environ_iteration"
982
983 try:
984 next(iterator) # force iteration over modified mapping
985 self.assertEqual(os.environ[new_key], "test_environ_iteration")
986 finally:
987 del os.environ[new_key]
988
989 def test_iter_error_when_changing_os_environ(self):
990 self._test_environ_iteration(os.environ)
991
992 def test_iter_error_when_changing_os_environ_items(self):
993 self._test_environ_iteration(os.environ.items())
994
995 def test_iter_error_when_changing_os_environ_values(self):
996 self._test_environ_iteration(os.environ.values())
997
Victor Stinner6d101392013-04-14 16:35:04 +0200998
Tim Petersc4e09402003-04-25 07:11:48 +0000999class WalkTests(unittest.TestCase):
1000 """Tests for os.walk()."""
1001
Victor Stinner0561c532015-03-12 10:28:24 +01001002 # Wrapper to hide minor differences between os.walk and os.fwalk
1003 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001004 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001005 if 'follow_symlinks' in kwargs:
1006 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001007 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001008
Charles-François Natali7372b062012-02-05 15:15:38 +01001009 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001010 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001011 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001012
1013 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001014 # TESTFN/
1015 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001016 # tmp1
1017 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001018 # tmp2
1019 # SUB11/ no kids
1020 # SUB2/ a file kid and a dirsymlink kid
1021 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001022 # SUB21/ not readable
1023 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001024 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001025 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001026 # broken_link2
1027 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001028 # TEST2/
1029 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001030 self.walk_path = join(support.TESTFN, "TEST1")
1031 self.sub1_path = join(self.walk_path, "SUB1")
1032 self.sub11_path = join(self.sub1_path, "SUB11")
1033 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001034 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001035 tmp1_path = join(self.walk_path, "tmp1")
1036 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001037 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001038 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001039 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001040 t2_path = join(support.TESTFN, "TEST2")
1041 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001042 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001043 broken_link2_path = join(sub2_path, "broken_link2")
1044 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001045
1046 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001047 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001048 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001049 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001050 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001051
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001052 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001053 with open(path, "x") as f:
1054 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001055
Victor Stinner0561c532015-03-12 10:28:24 +01001056 if support.can_symlink():
1057 os.symlink(os.path.abspath(t2_path), self.link_path)
1058 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001059 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1060 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001061 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001062 ["broken_link", "broken_link2", "broken_link3",
1063 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001064 else:
pxinwr3e028b22019-02-15 13:04:47 +08001065 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001066
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001067 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001068 try:
1069 os.listdir(sub21_path)
1070 except PermissionError:
1071 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1072 else:
1073 os.chmod(sub21_path, stat.S_IRWXU)
1074 os.unlink(tmp5_path)
1075 os.rmdir(sub21_path)
1076 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001077
Victor Stinner0561c532015-03-12 10:28:24 +01001078 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001079 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001080 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001081
Tim Petersc4e09402003-04-25 07:11:48 +00001082 self.assertEqual(len(all), 4)
1083 # We can't know which order SUB1 and SUB2 will appear in.
1084 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1085 # flipped: TESTFN, SUB2, SUB1, SUB11
1086 flipped = all[0][1][0] != "SUB1"
1087 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001088 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001089 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001090 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1091 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1092 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1093 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001094
Brett Cannon3f9183b2016-08-26 14:44:48 -07001095 def test_walk_prune(self, walk_path=None):
1096 if walk_path is None:
1097 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001098 # Prune the search.
1099 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001100 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001101 all.append((root, dirs, files))
1102 # Don't descend into SUB1.
1103 if 'SUB1' in dirs:
1104 # Note that this also mutates the dirs we appended to all!
1105 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001106
Victor Stinner0561c532015-03-12 10:28:24 +01001107 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001108 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001109
1110 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001111 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001112 self.assertEqual(all[1], self.sub2_tree)
1113
Brett Cannon3f9183b2016-08-26 14:44:48 -07001114 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001115 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001116
Victor Stinner0561c532015-03-12 10:28:24 +01001117 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001118 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001119 all = list(self.walk(self.walk_path, topdown=False))
1120
Victor Stinner53b0a412016-03-26 01:12:36 +01001121 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001122 # We can't know which order SUB1 and SUB2 will appear in.
1123 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1124 # flipped: SUB2, SUB11, SUB1, TESTFN
1125 flipped = all[3][1][0] != "SUB1"
1126 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001127 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001128 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001129 self.assertEqual(all[3],
1130 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1131 self.assertEqual(all[flipped],
1132 (self.sub11_path, [], []))
1133 self.assertEqual(all[flipped + 1],
1134 (self.sub1_path, ["SUB11"], ["tmp2"]))
1135 self.assertEqual(all[2 - 2 * flipped],
1136 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001137
Victor Stinner0561c532015-03-12 10:28:24 +01001138 def test_walk_symlink(self):
1139 if not support.can_symlink():
1140 self.skipTest("need symlink support")
1141
1142 # Walk, following symlinks.
1143 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1144 for root, dirs, files in walk_it:
1145 if root == self.link_path:
1146 self.assertEqual(dirs, [])
1147 self.assertEqual(files, ["tmp4"])
1148 break
1149 else:
1150 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001151
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001152 def test_walk_bad_dir(self):
1153 # Walk top-down.
1154 errors = []
1155 walk_it = self.walk(self.walk_path, onerror=errors.append)
1156 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001157 self.assertEqual(errors, [])
1158 dir1 = 'SUB1'
1159 path1 = os.path.join(root, dir1)
1160 path1new = os.path.join(root, dir1 + '.new')
1161 os.rename(path1, path1new)
1162 try:
1163 roots = [r for r, d, f in walk_it]
1164 self.assertTrue(errors)
1165 self.assertNotIn(path1, roots)
1166 self.assertNotIn(path1new, roots)
1167 for dir2 in dirs:
1168 if dir2 != dir1:
1169 self.assertIn(os.path.join(root, dir2), roots)
1170 finally:
1171 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001172
Charles-François Natali7372b062012-02-05 15:15:38 +01001173
1174@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1175class FwalkTests(WalkTests):
1176 """Tests for os.fwalk()."""
1177
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001178 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001179 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001180 yield (root, dirs, files)
1181
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001182 def fwalk(self, *args, **kwargs):
1183 return os.fwalk(*args, **kwargs)
1184
Larry Hastingsc48fe982012-06-25 04:49:05 -07001185 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1186 """
1187 compare with walk() results.
1188 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001189 walk_kwargs = walk_kwargs.copy()
1190 fwalk_kwargs = fwalk_kwargs.copy()
1191 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1192 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1193 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001194
Charles-François Natali7372b062012-02-05 15:15:38 +01001195 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001196 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001197 expected[root] = (set(dirs), set(files))
1198
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001199 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001200 self.assertIn(root, expected)
1201 self.assertEqual(expected[root], (set(dirs), set(files)))
1202
Larry Hastingsc48fe982012-06-25 04:49:05 -07001203 def test_compare_to_walk(self):
1204 kwargs = {'top': support.TESTFN}
1205 self._compare_to_walk(kwargs, kwargs)
1206
Charles-François Natali7372b062012-02-05 15:15:38 +01001207 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001208 try:
1209 fd = os.open(".", os.O_RDONLY)
1210 walk_kwargs = {'top': support.TESTFN}
1211 fwalk_kwargs = walk_kwargs.copy()
1212 fwalk_kwargs['dir_fd'] = fd
1213 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1214 finally:
1215 os.close(fd)
1216
1217 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001218 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001219 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1220 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001221 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001222 # check that the FD is valid
1223 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001224 # redundant check
1225 os.stat(rootfd)
1226 # check that listdir() returns consistent information
1227 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001228
1229 def test_fd_leak(self):
1230 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1231 # we both check that calling fwalk() a large number of times doesn't
1232 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1233 minfd = os.dup(1)
1234 os.close(minfd)
1235 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001236 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001237 pass
1238 newfd = os.dup(1)
1239 self.addCleanup(os.close, newfd)
1240 self.assertEqual(newfd, minfd)
1241
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001242class BytesWalkTests(WalkTests):
1243 """Tests for os.walk() with bytes."""
1244 def walk(self, top, **kwargs):
1245 if 'follow_symlinks' in kwargs:
1246 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1247 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1248 root = os.fsdecode(broot)
1249 dirs = list(map(os.fsdecode, bdirs))
1250 files = list(map(os.fsdecode, bfiles))
1251 yield (root, dirs, files)
1252 bdirs[:] = list(map(os.fsencode, dirs))
1253 bfiles[:] = list(map(os.fsencode, files))
1254
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001255@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1256class BytesFwalkTests(FwalkTests):
1257 """Tests for os.walk() with bytes."""
1258 def fwalk(self, top='.', *args, **kwargs):
1259 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1260 root = os.fsdecode(broot)
1261 dirs = list(map(os.fsdecode, bdirs))
1262 files = list(map(os.fsdecode, bfiles))
1263 yield (root, dirs, files, topfd)
1264 bdirs[:] = list(map(os.fsencode, dirs))
1265 bfiles[:] = list(map(os.fsencode, files))
1266
Charles-François Natali7372b062012-02-05 15:15:38 +01001267
Guido van Rossume7ba4952007-06-06 23:52:48 +00001268class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001270 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001271
1272 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001273 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001274 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1275 os.makedirs(path) # Should work
1276 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1277 os.makedirs(path)
1278
1279 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001280 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001281 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1282 os.makedirs(path)
1283 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1284 'dir5', 'dir6')
1285 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001286
Serhiy Storchakae304e332017-03-24 13:27:42 +02001287 def test_mode(self):
1288 with support.temp_umask(0o002):
1289 base = support.TESTFN
1290 parent = os.path.join(base, 'dir1')
1291 path = os.path.join(parent, 'dir2')
1292 os.makedirs(path, 0o555)
1293 self.assertTrue(os.path.exists(path))
1294 self.assertTrue(os.path.isdir(path))
1295 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001296 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1297 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001298
Terry Reedy5a22b652010-12-02 07:05:56 +00001299 def test_exist_ok_existing_directory(self):
1300 path = os.path.join(support.TESTFN, 'dir1')
1301 mode = 0o777
1302 old_mask = os.umask(0o022)
1303 os.makedirs(path, mode)
1304 self.assertRaises(OSError, os.makedirs, path, mode)
1305 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001306 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001307 os.makedirs(path, mode=mode, exist_ok=True)
1308 os.umask(old_mask)
1309
Martin Pantera82642f2015-11-19 04:48:44 +00001310 # Issue #25583: A drive root could raise PermissionError on Windows
1311 os.makedirs(os.path.abspath('/'), exist_ok=True)
1312
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001313 def test_exist_ok_s_isgid_directory(self):
1314 path = os.path.join(support.TESTFN, 'dir1')
1315 S_ISGID = stat.S_ISGID
1316 mode = 0o777
1317 old_mask = os.umask(0o022)
1318 try:
1319 existing_testfn_mode = stat.S_IMODE(
1320 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001321 try:
1322 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001323 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001324 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001325 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1326 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1327 # The os should apply S_ISGID from the parent dir for us, but
1328 # this test need not depend on that behavior. Be explicit.
1329 os.makedirs(path, mode | S_ISGID)
1330 # http://bugs.python.org/issue14992
1331 # Should not fail when the bit is already set.
1332 os.makedirs(path, mode, exist_ok=True)
1333 # remove the bit.
1334 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001335 # May work even when the bit is not already set when demanded.
1336 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001337 finally:
1338 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001339
1340 def test_exist_ok_existing_regular_file(self):
1341 base = support.TESTFN
1342 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001343 with open(path, 'w') as f:
1344 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001345 self.assertRaises(OSError, os.makedirs, path)
1346 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1347 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1348 os.remove(path)
1349
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001350 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001351 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001352 'dir4', 'dir5', 'dir6')
1353 # If the tests failed, the bottom-most directory ('../dir6')
1354 # may not have been created, so we look for the outermost directory
1355 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001356 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001357 path = os.path.dirname(path)
1358
1359 os.removedirs(path)
1360
Andrew Svetlov405faed2012-12-25 12:18:09 +02001361
R David Murrayf2ad1732014-12-25 18:36:56 -05001362@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1363class ChownFileTests(unittest.TestCase):
1364
Berker Peksag036a71b2015-07-21 09:29:48 +03001365 @classmethod
1366 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001367 os.mkdir(support.TESTFN)
1368
1369 def test_chown_uid_gid_arguments_must_be_index(self):
1370 stat = os.stat(support.TESTFN)
1371 uid = stat.st_uid
1372 gid = stat.st_gid
1373 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1374 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1375 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1376 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1377 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1378
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001379 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1380 def test_chown_gid(self):
1381 groups = os.getgroups()
1382 if len(groups) < 2:
1383 self.skipTest("test needs at least 2 groups")
1384
R David Murrayf2ad1732014-12-25 18:36:56 -05001385 gid_1, gid_2 = groups[:2]
1386 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001387
R David Murrayf2ad1732014-12-25 18:36:56 -05001388 os.chown(support.TESTFN, uid, gid_1)
1389 gid = os.stat(support.TESTFN).st_gid
1390 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001391
R David Murrayf2ad1732014-12-25 18:36:56 -05001392 os.chown(support.TESTFN, uid, gid_2)
1393 gid = os.stat(support.TESTFN).st_gid
1394 self.assertEqual(gid, gid_2)
1395
1396 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1397 "test needs root privilege and more than one user")
1398 def test_chown_with_root(self):
1399 uid_1, uid_2 = all_users[:2]
1400 gid = os.stat(support.TESTFN).st_gid
1401 os.chown(support.TESTFN, uid_1, gid)
1402 uid = os.stat(support.TESTFN).st_uid
1403 self.assertEqual(uid, uid_1)
1404 os.chown(support.TESTFN, uid_2, gid)
1405 uid = os.stat(support.TESTFN).st_uid
1406 self.assertEqual(uid, uid_2)
1407
1408 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1409 "test needs non-root account and more than one user")
1410 def test_chown_without_permission(self):
1411 uid_1, uid_2 = all_users[:2]
1412 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001413 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001414 os.chown(support.TESTFN, uid_1, gid)
1415 os.chown(support.TESTFN, uid_2, gid)
1416
Berker Peksag036a71b2015-07-21 09:29:48 +03001417 @classmethod
1418 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001419 os.rmdir(support.TESTFN)
1420
1421
Andrew Svetlov405faed2012-12-25 12:18:09 +02001422class RemoveDirsTests(unittest.TestCase):
1423 def setUp(self):
1424 os.makedirs(support.TESTFN)
1425
1426 def tearDown(self):
1427 support.rmtree(support.TESTFN)
1428
1429 def test_remove_all(self):
1430 dira = os.path.join(support.TESTFN, 'dira')
1431 os.mkdir(dira)
1432 dirb = os.path.join(dira, 'dirb')
1433 os.mkdir(dirb)
1434 os.removedirs(dirb)
1435 self.assertFalse(os.path.exists(dirb))
1436 self.assertFalse(os.path.exists(dira))
1437 self.assertFalse(os.path.exists(support.TESTFN))
1438
1439 def test_remove_partial(self):
1440 dira = os.path.join(support.TESTFN, 'dira')
1441 os.mkdir(dira)
1442 dirb = os.path.join(dira, 'dirb')
1443 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001444 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001445 os.removedirs(dirb)
1446 self.assertFalse(os.path.exists(dirb))
1447 self.assertTrue(os.path.exists(dira))
1448 self.assertTrue(os.path.exists(support.TESTFN))
1449
1450 def test_remove_nothing(self):
1451 dira = os.path.join(support.TESTFN, 'dira')
1452 os.mkdir(dira)
1453 dirb = os.path.join(dira, 'dirb')
1454 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001455 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001456 with self.assertRaises(OSError):
1457 os.removedirs(dirb)
1458 self.assertTrue(os.path.exists(dirb))
1459 self.assertTrue(os.path.exists(dira))
1460 self.assertTrue(os.path.exists(support.TESTFN))
1461
1462
Guido van Rossume7ba4952007-06-06 23:52:48 +00001463class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001464 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001465 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001466 f.write(b'hello')
1467 f.close()
1468 with open(os.devnull, 'rb') as f:
1469 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001470
Andrew Svetlov405faed2012-12-25 12:18:09 +02001471
Guido van Rossume7ba4952007-06-06 23:52:48 +00001472class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001473 def test_urandom_length(self):
1474 self.assertEqual(len(os.urandom(0)), 0)
1475 self.assertEqual(len(os.urandom(1)), 1)
1476 self.assertEqual(len(os.urandom(10)), 10)
1477 self.assertEqual(len(os.urandom(100)), 100)
1478 self.assertEqual(len(os.urandom(1000)), 1000)
1479
1480 def test_urandom_value(self):
1481 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001482 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001483 data2 = os.urandom(16)
1484 self.assertNotEqual(data1, data2)
1485
1486 def get_urandom_subprocess(self, count):
1487 code = '\n'.join((
1488 'import os, sys',
1489 'data = os.urandom(%s)' % count,
1490 'sys.stdout.buffer.write(data)',
1491 'sys.stdout.buffer.flush()'))
1492 out = assert_python_ok('-c', code)
1493 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001494 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001495 return stdout
1496
1497 def test_urandom_subprocess(self):
1498 data1 = self.get_urandom_subprocess(16)
1499 data2 = self.get_urandom_subprocess(16)
1500 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001501
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001502
Victor Stinner9b1f4742016-09-06 16:18:52 -07001503@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1504class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001505 @classmethod
1506 def setUpClass(cls):
1507 try:
1508 os.getrandom(1)
1509 except OSError as exc:
1510 if exc.errno == errno.ENOSYS:
1511 # Python compiled on a more recent Linux version
1512 # than the current Linux kernel
1513 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1514 else:
1515 raise
1516
Victor Stinner9b1f4742016-09-06 16:18:52 -07001517 def test_getrandom_type(self):
1518 data = os.getrandom(16)
1519 self.assertIsInstance(data, bytes)
1520 self.assertEqual(len(data), 16)
1521
1522 def test_getrandom0(self):
1523 empty = os.getrandom(0)
1524 self.assertEqual(empty, b'')
1525
1526 def test_getrandom_random(self):
1527 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1528
1529 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1530 # resource /dev/random
1531
1532 def test_getrandom_nonblock(self):
1533 # The call must not fail. Check also that the flag exists
1534 try:
1535 os.getrandom(1, os.GRND_NONBLOCK)
1536 except BlockingIOError:
1537 # System urandom is not initialized yet
1538 pass
1539
1540 def test_getrandom_value(self):
1541 data1 = os.getrandom(16)
1542 data2 = os.getrandom(16)
1543 self.assertNotEqual(data1, data2)
1544
1545
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001546# os.urandom() doesn't use a file descriptor when it is implemented with the
1547# getentropy() function, the getrandom() function or the getrandom() syscall
1548OS_URANDOM_DONT_USE_FD = (
1549 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1550 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1551 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001552
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001553@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1554 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001555@unittest.skipIf(sys.platform == "vxworks",
1556 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001557class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001558 @unittest.skipUnless(resource, "test requires the resource module")
1559 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001560 # Check urandom() failing when it is not able to open /dev/random.
1561 # We spawn a new process to make the test more robust (if getrlimit()
1562 # failed to restore the file descriptor limit after this, the whole
1563 # test suite would crash; this actually happened on the OS X Tiger
1564 # buildbot).
1565 code = """if 1:
1566 import errno
1567 import os
1568 import resource
1569
1570 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1571 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1572 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001573 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001574 except OSError as e:
1575 assert e.errno == errno.EMFILE, e.errno
1576 else:
1577 raise AssertionError("OSError not raised")
1578 """
1579 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001580
Antoine Pitroue472aea2014-04-26 14:33:03 +02001581 def test_urandom_fd_closed(self):
1582 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1583 # closed.
1584 code = """if 1:
1585 import os
1586 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001587 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001588 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001589 with test.support.SuppressCrashReport():
1590 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001591 sys.stdout.buffer.write(os.urandom(4))
1592 """
1593 rc, out, err = assert_python_ok('-Sc', code)
1594
1595 def test_urandom_fd_reopened(self):
1596 # Issue #21207: urandom() should detect its fd to /dev/urandom
1597 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001598 self.addCleanup(support.unlink, support.TESTFN)
1599 create_file(support.TESTFN, b"x" * 256)
1600
Antoine Pitroue472aea2014-04-26 14:33:03 +02001601 code = """if 1:
1602 import os
1603 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001604 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001605 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001606 with test.support.SuppressCrashReport():
1607 for fd in range(3, 256):
1608 try:
1609 os.close(fd)
1610 except OSError:
1611 pass
1612 else:
1613 # Found the urandom fd (XXX hopefully)
1614 break
1615 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001616 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001617 new_fd = f.fileno()
1618 # Issue #26935: posix allows new_fd and fd to be equal but
1619 # some libc implementations have dup2 return an error in this
1620 # case.
1621 if new_fd != fd:
1622 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001623 sys.stdout.buffer.write(os.urandom(4))
1624 sys.stdout.buffer.write(os.urandom(4))
1625 """.format(TESTFN=support.TESTFN)
1626 rc, out, err = assert_python_ok('-Sc', code)
1627 self.assertEqual(len(out), 8)
1628 self.assertNotEqual(out[0:4], out[4:8])
1629 rc, out2, err2 = assert_python_ok('-Sc', code)
1630 self.assertEqual(len(out2), 8)
1631 self.assertNotEqual(out2, out)
1632
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001633
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001634@contextlib.contextmanager
1635def _execvpe_mockup(defpath=None):
1636 """
1637 Stubs out execv and execve functions when used as context manager.
1638 Records exec calls. The mock execv and execve functions always raise an
1639 exception as they would normally never return.
1640 """
1641 # A list of tuples containing (function name, first arg, args)
1642 # of calls to execv or execve that have been made.
1643 calls = []
1644
1645 def mock_execv(name, *args):
1646 calls.append(('execv', name, args))
1647 raise RuntimeError("execv called")
1648
1649 def mock_execve(name, *args):
1650 calls.append(('execve', name, args))
1651 raise OSError(errno.ENOTDIR, "execve called")
1652
1653 try:
1654 orig_execv = os.execv
1655 orig_execve = os.execve
1656 orig_defpath = os.defpath
1657 os.execv = mock_execv
1658 os.execve = mock_execve
1659 if defpath is not None:
1660 os.defpath = defpath
1661 yield calls
1662 finally:
1663 os.execv = orig_execv
1664 os.execve = orig_execve
1665 os.defpath = orig_defpath
1666
pxinwrf2d7ac72019-05-21 18:46:37 +08001667@unittest.skipUnless(hasattr(os, 'execv'),
1668 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001669class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001670 @unittest.skipIf(USING_LINUXTHREADS,
1671 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001672 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001673 self.assertRaises(OSError, os.execvpe, 'no such app-',
1674 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001675
Steve Dowerbce26262016-11-19 19:17:26 -08001676 def test_execv_with_bad_arglist(self):
1677 self.assertRaises(ValueError, os.execv, 'notepad', ())
1678 self.assertRaises(ValueError, os.execv, 'notepad', [])
1679 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1680 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1681
Thomas Heller6790d602007-08-30 17:15:14 +00001682 def test_execvpe_with_bad_arglist(self):
1683 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001684 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1685 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001686
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001687 @unittest.skipUnless(hasattr(os, '_execvpe'),
1688 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001689 def _test_internal_execvpe(self, test_type):
1690 program_path = os.sep + 'absolutepath'
1691 if test_type is bytes:
1692 program = b'executable'
1693 fullpath = os.path.join(os.fsencode(program_path), program)
1694 native_fullpath = fullpath
1695 arguments = [b'progname', 'arg1', 'arg2']
1696 else:
1697 program = 'executable'
1698 arguments = ['progname', 'arg1', 'arg2']
1699 fullpath = os.path.join(program_path, program)
1700 if os.name != "nt":
1701 native_fullpath = os.fsencode(fullpath)
1702 else:
1703 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001704 env = {'spam': 'beans'}
1705
Victor Stinnerb745a742010-05-18 17:17:23 +00001706 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001707 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001708 self.assertRaises(RuntimeError,
1709 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001710 self.assertEqual(len(calls), 1)
1711 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1712
Victor Stinnerb745a742010-05-18 17:17:23 +00001713 # test os._execvpe() with a relative path:
1714 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001715 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001716 self.assertRaises(OSError,
1717 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001718 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001719 self.assertSequenceEqual(calls[0],
1720 ('execve', native_fullpath, (arguments, env)))
1721
1722 # test os._execvpe() with a relative path:
1723 # os.get_exec_path() reads the 'PATH' variable
1724 with _execvpe_mockup() as calls:
1725 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001726 if test_type is bytes:
1727 env_path[b'PATH'] = program_path
1728 else:
1729 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001730 self.assertRaises(OSError,
1731 os._execvpe, program, arguments, env=env_path)
1732 self.assertEqual(len(calls), 1)
1733 self.assertSequenceEqual(calls[0],
1734 ('execve', native_fullpath, (arguments, env_path)))
1735
1736 def test_internal_execvpe_str(self):
1737 self._test_internal_execvpe(str)
1738 if os.name != "nt":
1739 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001740
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001741 def test_execve_invalid_env(self):
1742 args = [sys.executable, '-c', 'pass']
1743
Ville Skyttä49b27342017-08-03 09:00:59 +03001744 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001745 newenv = os.environ.copy()
1746 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1747 with self.assertRaises(ValueError):
1748 os.execve(args[0], args, newenv)
1749
Ville Skyttä49b27342017-08-03 09:00:59 +03001750 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001751 newenv = os.environ.copy()
1752 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1753 with self.assertRaises(ValueError):
1754 os.execve(args[0], args, newenv)
1755
Ville Skyttä49b27342017-08-03 09:00:59 +03001756 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001757 newenv = os.environ.copy()
1758 newenv["FRUIT=ORANGE"] = "lemon"
1759 with self.assertRaises(ValueError):
1760 os.execve(args[0], args, newenv)
1761
Alexey Izbyshev83460312018-10-20 03:28:22 +03001762 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1763 def test_execve_with_empty_path(self):
1764 # bpo-32890: Check GetLastError() misuse
1765 try:
1766 os.execve('', ['arg'], {})
1767 except OSError as e:
1768 self.assertTrue(e.winerror is None or e.winerror != 0)
1769 else:
1770 self.fail('No OSError raised')
1771
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001772
Serhiy Storchaka43767632013-11-03 21:31:38 +02001773@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001774class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001775 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001776 try:
1777 os.stat(support.TESTFN)
1778 except FileNotFoundError:
1779 exists = False
1780 except OSError as exc:
1781 exists = True
1782 self.fail("file %s must not exist; os.stat failed with %s"
1783 % (support.TESTFN, exc))
1784 else:
1785 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001786
Thomas Wouters477c8d52006-05-27 19:21:47 +00001787 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001788 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001789
1790 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001791 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001792
1793 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001794 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001795
1796 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001797 self.addCleanup(support.unlink, support.TESTFN)
1798
Victor Stinnere77c9742016-03-25 10:28:23 +01001799 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001800 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001801
1802 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001803 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001804
Thomas Wouters477c8d52006-05-27 19:21:47 +00001805 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001806 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001807
Victor Stinnere77c9742016-03-25 10:28:23 +01001808
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001809class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001810 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001811 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1812 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001813 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001814 def get_single(f):
1815 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001816 if hasattr(os, f):
1817 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001818 return helper
1819 for f in singles:
1820 locals()["test_"+f] = get_single(f)
1821
Benjamin Peterson7522c742009-01-19 21:00:09 +00001822 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001823 try:
1824 f(support.make_bad_fd(), *args)
1825 except OSError as e:
1826 self.assertEqual(e.errno, errno.EBADF)
1827 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001828 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001829 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001830
Serhiy Storchaka43767632013-11-03 21:31:38 +02001831 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001832 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001833 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001834
Serhiy Storchaka43767632013-11-03 21:31:38 +02001835 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001836 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001837 fd = support.make_bad_fd()
1838 # Make sure none of the descriptors we are about to close are
1839 # currently valid (issue 6542).
1840 for i in range(10):
1841 try: os.fstat(fd+i)
1842 except OSError:
1843 pass
1844 else:
1845 break
1846 if i < 2:
1847 raise unittest.SkipTest(
1848 "Unable to acquire a range of invalid file descriptors")
1849 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001850
Serhiy Storchaka43767632013-11-03 21:31:38 +02001851 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001852 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001853 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001854
Serhiy Storchaka43767632013-11-03 21:31:38 +02001855 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001856 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001857 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001858
Serhiy Storchaka43767632013-11-03 21:31:38 +02001859 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001860 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001862
Serhiy Storchaka43767632013-11-03 21:31:38 +02001863 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001864 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865 self.check(os.pathconf, "PC_NAME_MAX")
1866 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001867
Serhiy Storchaka43767632013-11-03 21:31:38 +02001868 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001869 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001870 self.check(os.truncate, 0)
1871 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001872
Serhiy Storchaka43767632013-11-03 21:31:38 +02001873 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001874 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001875 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001876
Serhiy Storchaka43767632013-11-03 21:31:38 +02001877 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001878 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001879 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001880
Victor Stinner57ddf782014-01-08 15:21:28 +01001881 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1882 def test_readv(self):
1883 buf = bytearray(10)
1884 self.check(os.readv, [buf])
1885
Serhiy Storchaka43767632013-11-03 21:31:38 +02001886 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001887 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001888 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001889
Serhiy Storchaka43767632013-11-03 21:31:38 +02001890 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001891 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001893
Victor Stinner57ddf782014-01-08 15:21:28 +01001894 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1895 def test_writev(self):
1896 self.check(os.writev, [b'abc'])
1897
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001898 def test_inheritable(self):
1899 self.check(os.get_inheritable)
1900 self.check(os.set_inheritable, True)
1901
1902 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1903 'needs os.get_blocking() and os.set_blocking()')
1904 def test_blocking(self):
1905 self.check(os.get_blocking)
1906 self.check(os.set_blocking, True)
1907
Brian Curtin1b9df392010-11-24 20:24:31 +00001908
1909class LinkTests(unittest.TestCase):
1910 def setUp(self):
1911 self.file1 = support.TESTFN
1912 self.file2 = os.path.join(support.TESTFN + "2")
1913
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001914 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001915 for file in (self.file1, self.file2):
1916 if os.path.exists(file):
1917 os.unlink(file)
1918
Brian Curtin1b9df392010-11-24 20:24:31 +00001919 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001920 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001921
xdegaye6a55d092017-11-12 17:57:04 +01001922 try:
1923 os.link(file1, file2)
1924 except PermissionError as e:
1925 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001926 with open(file1, "r") as f1, open(file2, "r") as f2:
1927 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1928
1929 def test_link(self):
1930 self._test_link(self.file1, self.file2)
1931
1932 def test_link_bytes(self):
1933 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1934 bytes(self.file2, sys.getfilesystemencoding()))
1935
Brian Curtinf498b752010-11-30 15:54:04 +00001936 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001937 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001938 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001939 except UnicodeError:
1940 raise unittest.SkipTest("Unable to encode for this platform.")
1941
Brian Curtinf498b752010-11-30 15:54:04 +00001942 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001943 self.file2 = self.file1 + "2"
1944 self._test_link(self.file1, self.file2)
1945
Serhiy Storchaka43767632013-11-03 21:31:38 +02001946@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1947class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001948 # uid_t and gid_t are 32-bit unsigned integers on Linux
1949 UID_OVERFLOW = (1 << 32)
1950 GID_OVERFLOW = (1 << 32)
1951
Serhiy Storchaka43767632013-11-03 21:31:38 +02001952 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1953 def test_setuid(self):
1954 if os.getuid() != 0:
1955 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001956 self.assertRaises(TypeError, os.setuid, 'not an int')
1957 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001958
Serhiy Storchaka43767632013-11-03 21:31:38 +02001959 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1960 def test_setgid(self):
1961 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1962 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001963 self.assertRaises(TypeError, os.setgid, 'not an int')
1964 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001965
Serhiy Storchaka43767632013-11-03 21:31:38 +02001966 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1967 def test_seteuid(self):
1968 if os.getuid() != 0:
1969 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001970 self.assertRaises(TypeError, os.setegid, 'not an int')
1971 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001972
Serhiy Storchaka43767632013-11-03 21:31:38 +02001973 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1974 def test_setegid(self):
1975 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1976 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001977 self.assertRaises(TypeError, os.setegid, 'not an int')
1978 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001979
Serhiy Storchaka43767632013-11-03 21:31:38 +02001980 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1981 def test_setreuid(self):
1982 if os.getuid() != 0:
1983 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001984 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1985 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1986 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1987 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001988
Serhiy Storchaka43767632013-11-03 21:31:38 +02001989 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1990 def test_setreuid_neg1(self):
1991 # Needs to accept -1. We run this in a subprocess to avoid
1992 # altering the test runner's process state (issue8045).
1993 subprocess.check_call([
1994 sys.executable, '-c',
1995 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001996
Serhiy Storchaka43767632013-11-03 21:31:38 +02001997 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1998 def test_setregid(self):
1999 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2000 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002001 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2002 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2003 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2004 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2007 def test_setregid_neg1(self):
2008 # Needs to accept -1. We run this in a subprocess to avoid
2009 # altering the test runner's process state (issue8045).
2010 subprocess.check_call([
2011 sys.executable, '-c',
2012 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002013
Serhiy Storchaka43767632013-11-03 21:31:38 +02002014@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2015class Pep383Tests(unittest.TestCase):
2016 def setUp(self):
2017 if support.TESTFN_UNENCODABLE:
2018 self.dir = support.TESTFN_UNENCODABLE
2019 elif support.TESTFN_NONASCII:
2020 self.dir = support.TESTFN_NONASCII
2021 else:
2022 self.dir = support.TESTFN
2023 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002024
Serhiy Storchaka43767632013-11-03 21:31:38 +02002025 bytesfn = []
2026 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002027 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002028 fn = os.fsencode(fn)
2029 except UnicodeEncodeError:
2030 return
2031 bytesfn.append(fn)
2032 add_filename(support.TESTFN_UNICODE)
2033 if support.TESTFN_UNENCODABLE:
2034 add_filename(support.TESTFN_UNENCODABLE)
2035 if support.TESTFN_NONASCII:
2036 add_filename(support.TESTFN_NONASCII)
2037 if not bytesfn:
2038 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002039
Serhiy Storchaka43767632013-11-03 21:31:38 +02002040 self.unicodefn = set()
2041 os.mkdir(self.dir)
2042 try:
2043 for fn in bytesfn:
2044 support.create_empty_file(os.path.join(self.bdir, fn))
2045 fn = os.fsdecode(fn)
2046 if fn in self.unicodefn:
2047 raise ValueError("duplicate filename")
2048 self.unicodefn.add(fn)
2049 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002050 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002051 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002052
Serhiy Storchaka43767632013-11-03 21:31:38 +02002053 def tearDown(self):
2054 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002055
Serhiy Storchaka43767632013-11-03 21:31:38 +02002056 def test_listdir(self):
2057 expected = self.unicodefn
2058 found = set(os.listdir(self.dir))
2059 self.assertEqual(found, expected)
2060 # test listdir without arguments
2061 current_directory = os.getcwd()
2062 try:
2063 os.chdir(os.sep)
2064 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2065 finally:
2066 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002067
Serhiy Storchaka43767632013-11-03 21:31:38 +02002068 def test_open(self):
2069 for fn in self.unicodefn:
2070 f = open(os.path.join(self.dir, fn), 'rb')
2071 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002072
Serhiy Storchaka43767632013-11-03 21:31:38 +02002073 @unittest.skipUnless(hasattr(os, 'statvfs'),
2074 "need os.statvfs()")
2075 def test_statvfs(self):
2076 # issue #9645
2077 for fn in self.unicodefn:
2078 # should not fail with file not found error
2079 fullname = os.path.join(self.dir, fn)
2080 os.statvfs(fullname)
2081
2082 def test_stat(self):
2083 for fn in self.unicodefn:
2084 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002085
Brian Curtineb24d742010-04-12 17:16:38 +00002086@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2087class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002088 def _kill(self, sig):
2089 # Start sys.executable as a subprocess and communicate from the
2090 # subprocess to the parent that the interpreter is ready. When it
2091 # becomes ready, send *sig* via os.kill to the subprocess and check
2092 # that the return code is equal to *sig*.
2093 import ctypes
2094 from ctypes import wintypes
2095 import msvcrt
2096
2097 # Since we can't access the contents of the process' stdout until the
2098 # process has exited, use PeekNamedPipe to see what's inside stdout
2099 # without waiting. This is done so we can tell that the interpreter
2100 # is started and running at a point where it could handle a signal.
2101 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2102 PeekNamedPipe.restype = wintypes.BOOL
2103 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2104 ctypes.POINTER(ctypes.c_char), # stdout buf
2105 wintypes.DWORD, # Buffer size
2106 ctypes.POINTER(wintypes.DWORD), # bytes read
2107 ctypes.POINTER(wintypes.DWORD), # bytes avail
2108 ctypes.POINTER(wintypes.DWORD)) # bytes left
2109 msg = "running"
2110 proc = subprocess.Popen([sys.executable, "-c",
2111 "import sys;"
2112 "sys.stdout.write('{}');"
2113 "sys.stdout.flush();"
2114 "input()".format(msg)],
2115 stdout=subprocess.PIPE,
2116 stderr=subprocess.PIPE,
2117 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002118 self.addCleanup(proc.stdout.close)
2119 self.addCleanup(proc.stderr.close)
2120 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002121
2122 count, max = 0, 100
2123 while count < max and proc.poll() is None:
2124 # Create a string buffer to store the result of stdout from the pipe
2125 buf = ctypes.create_string_buffer(len(msg))
2126 # Obtain the text currently in proc.stdout
2127 # Bytes read/avail/left are left as NULL and unused
2128 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2129 buf, ctypes.sizeof(buf), None, None, None)
2130 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2131 if buf.value:
2132 self.assertEqual(msg, buf.value.decode())
2133 break
2134 time.sleep(0.1)
2135 count += 1
2136 else:
2137 self.fail("Did not receive communication from the subprocess")
2138
Brian Curtineb24d742010-04-12 17:16:38 +00002139 os.kill(proc.pid, sig)
2140 self.assertEqual(proc.wait(), sig)
2141
2142 def test_kill_sigterm(self):
2143 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002144 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002145
2146 def test_kill_int(self):
2147 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002148 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002149
2150 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002151 tagname = "test_os_%s" % uuid.uuid1()
2152 m = mmap.mmap(-1, 1, tagname)
2153 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002154 # Run a script which has console control handling enabled.
2155 proc = subprocess.Popen([sys.executable,
2156 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002157 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002158 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2159 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002160 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002161 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002162 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002163 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002164 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002165 count += 1
2166 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002167 # Forcefully kill the process if we weren't able to signal it.
2168 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002169 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002170 os.kill(proc.pid, event)
2171 # proc.send_signal(event) could also be done here.
2172 # Allow time for the signal to be passed and the process to exit.
2173 time.sleep(0.5)
2174 if not proc.poll():
2175 # Forcefully kill the process if we weren't able to signal it.
2176 os.kill(proc.pid, signal.SIGINT)
2177 self.fail("subprocess did not stop on {}".format(name))
2178
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002179 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002180 def test_CTRL_C_EVENT(self):
2181 from ctypes import wintypes
2182 import ctypes
2183
2184 # Make a NULL value by creating a pointer with no argument.
2185 NULL = ctypes.POINTER(ctypes.c_int)()
2186 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2187 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2188 wintypes.BOOL)
2189 SetConsoleCtrlHandler.restype = wintypes.BOOL
2190
2191 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002192 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002193 # by subprocesses.
2194 SetConsoleCtrlHandler(NULL, 0)
2195
2196 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2197
2198 def test_CTRL_BREAK_EVENT(self):
2199 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2200
2201
Brian Curtind40e6f72010-07-08 21:39:08 +00002202@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002203class Win32ListdirTests(unittest.TestCase):
2204 """Test listdir on Windows."""
2205
2206 def setUp(self):
2207 self.created_paths = []
2208 for i in range(2):
2209 dir_name = 'SUB%d' % i
2210 dir_path = os.path.join(support.TESTFN, dir_name)
2211 file_name = 'FILE%d' % i
2212 file_path = os.path.join(support.TESTFN, file_name)
2213 os.makedirs(dir_path)
2214 with open(file_path, 'w') as f:
2215 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2216 self.created_paths.extend([dir_name, file_name])
2217 self.created_paths.sort()
2218
2219 def tearDown(self):
2220 shutil.rmtree(support.TESTFN)
2221
2222 def test_listdir_no_extended_path(self):
2223 """Test when the path is not an "extended" path."""
2224 # unicode
2225 self.assertEqual(
2226 sorted(os.listdir(support.TESTFN)),
2227 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002228
Tim Golden781bbeb2013-10-25 20:24:06 +01002229 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002230 self.assertEqual(
2231 sorted(os.listdir(os.fsencode(support.TESTFN))),
2232 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002233
2234 def test_listdir_extended_path(self):
2235 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002236 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002237 # unicode
2238 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2239 self.assertEqual(
2240 sorted(os.listdir(path)),
2241 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002242
Tim Golden781bbeb2013-10-25 20:24:06 +01002243 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002244 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2245 self.assertEqual(
2246 sorted(os.listdir(path)),
2247 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002248
2249
Berker Peksage0b5b202018-08-15 13:03:41 +03002250@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2251class ReadlinkTests(unittest.TestCase):
2252 filelink = 'readlinktest'
2253 filelink_target = os.path.abspath(__file__)
2254 filelinkb = os.fsencode(filelink)
2255 filelinkb_target = os.fsencode(filelink_target)
2256
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002257 def assertPathEqual(self, left, right):
2258 left = os.path.normcase(left)
2259 right = os.path.normcase(right)
2260 if sys.platform == 'win32':
2261 # Bad practice to blindly strip the prefix as it may be required to
2262 # correctly refer to the file, but we're only comparing paths here.
2263 has_prefix = lambda p: p.startswith(
2264 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2265 if has_prefix(left):
2266 left = left[4:]
2267 if has_prefix(right):
2268 right = right[4:]
2269 self.assertEqual(left, right)
2270
Berker Peksage0b5b202018-08-15 13:03:41 +03002271 def setUp(self):
2272 self.assertTrue(os.path.exists(self.filelink_target))
2273 self.assertTrue(os.path.exists(self.filelinkb_target))
2274 self.assertFalse(os.path.exists(self.filelink))
2275 self.assertFalse(os.path.exists(self.filelinkb))
2276
2277 def test_not_symlink(self):
2278 filelink_target = FakePath(self.filelink_target)
2279 self.assertRaises(OSError, os.readlink, self.filelink_target)
2280 self.assertRaises(OSError, os.readlink, filelink_target)
2281
2282 def test_missing_link(self):
2283 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2284 self.assertRaises(FileNotFoundError, os.readlink,
2285 FakePath('missing-link'))
2286
2287 @support.skip_unless_symlink
2288 def test_pathlike(self):
2289 os.symlink(self.filelink_target, self.filelink)
2290 self.addCleanup(support.unlink, self.filelink)
2291 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002292 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002293
2294 @support.skip_unless_symlink
2295 def test_pathlike_bytes(self):
2296 os.symlink(self.filelinkb_target, self.filelinkb)
2297 self.addCleanup(support.unlink, self.filelinkb)
2298 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002299 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002300 self.assertIsInstance(path, bytes)
2301
2302 @support.skip_unless_symlink
2303 def test_bytes(self):
2304 os.symlink(self.filelinkb_target, self.filelinkb)
2305 self.addCleanup(support.unlink, self.filelinkb)
2306 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002307 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002308 self.assertIsInstance(path, bytes)
2309
2310
Tim Golden781bbeb2013-10-25 20:24:06 +01002311@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002312@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002313class Win32SymlinkTests(unittest.TestCase):
2314 filelink = 'filelinktest'
2315 filelink_target = os.path.abspath(__file__)
2316 dirlink = 'dirlinktest'
2317 dirlink_target = os.path.dirname(filelink_target)
2318 missing_link = 'missing link'
2319
2320 def setUp(self):
2321 assert os.path.exists(self.dirlink_target)
2322 assert os.path.exists(self.filelink_target)
2323 assert not os.path.exists(self.dirlink)
2324 assert not os.path.exists(self.filelink)
2325 assert not os.path.exists(self.missing_link)
2326
2327 def tearDown(self):
2328 if os.path.exists(self.filelink):
2329 os.remove(self.filelink)
2330 if os.path.exists(self.dirlink):
2331 os.rmdir(self.dirlink)
2332 if os.path.lexists(self.missing_link):
2333 os.remove(self.missing_link)
2334
2335 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002336 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002337 self.assertTrue(os.path.exists(self.dirlink))
2338 self.assertTrue(os.path.isdir(self.dirlink))
2339 self.assertTrue(os.path.islink(self.dirlink))
2340 self.check_stat(self.dirlink, self.dirlink_target)
2341
2342 def test_file_link(self):
2343 os.symlink(self.filelink_target, self.filelink)
2344 self.assertTrue(os.path.exists(self.filelink))
2345 self.assertTrue(os.path.isfile(self.filelink))
2346 self.assertTrue(os.path.islink(self.filelink))
2347 self.check_stat(self.filelink, self.filelink_target)
2348
2349 def _create_missing_dir_link(self):
2350 'Create a "directory" link to a non-existent target'
2351 linkname = self.missing_link
2352 if os.path.lexists(linkname):
2353 os.remove(linkname)
2354 target = r'c:\\target does not exist.29r3c740'
2355 assert not os.path.exists(target)
2356 target_is_dir = True
2357 os.symlink(target, linkname, target_is_dir)
2358
2359 def test_remove_directory_link_to_missing_target(self):
2360 self._create_missing_dir_link()
2361 # For compatibility with Unix, os.remove will check the
2362 # directory status and call RemoveDirectory if the symlink
2363 # was created with target_is_dir==True.
2364 os.remove(self.missing_link)
2365
Brian Curtind40e6f72010-07-08 21:39:08 +00002366 def test_isdir_on_directory_link_to_missing_target(self):
2367 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002368 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002369
Brian Curtind40e6f72010-07-08 21:39:08 +00002370 def test_rmdir_on_directory_link_to_missing_target(self):
2371 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002372 os.rmdir(self.missing_link)
2373
2374 def check_stat(self, link, target):
2375 self.assertEqual(os.stat(link), os.stat(target))
2376 self.assertNotEqual(os.lstat(link), os.stat(link))
2377
Brian Curtind25aef52011-06-13 15:16:04 -05002378 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002379 self.assertEqual(os.stat(bytes_link), os.stat(target))
2380 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002381
2382 def test_12084(self):
2383 level1 = os.path.abspath(support.TESTFN)
2384 level2 = os.path.join(level1, "level2")
2385 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002386 self.addCleanup(support.rmtree, level1)
2387
2388 os.mkdir(level1)
2389 os.mkdir(level2)
2390 os.mkdir(level3)
2391
2392 file1 = os.path.abspath(os.path.join(level1, "file1"))
2393 create_file(file1)
2394
2395 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002396 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002397 os.chdir(level2)
2398 link = os.path.join(level2, "link")
2399 os.symlink(os.path.relpath(file1), "link")
2400 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002401
Victor Stinnerae39d232016-03-24 17:12:55 +01002402 # Check os.stat calls from the same dir as the link
2403 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002404
Victor Stinnerae39d232016-03-24 17:12:55 +01002405 # Check os.stat calls from a dir below the link
2406 os.chdir(level1)
2407 self.assertEqual(os.stat(file1),
2408 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002409
Victor Stinnerae39d232016-03-24 17:12:55 +01002410 # Check os.stat calls from a dir above the link
2411 os.chdir(level3)
2412 self.assertEqual(os.stat(file1),
2413 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002414 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002415 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002416
SSE43c34aad2018-02-13 00:10:35 +07002417 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2418 and os.path.exists(r'C:\ProgramData'),
2419 'Test directories not found')
2420 def test_29248(self):
2421 # os.symlink() calls CreateSymbolicLink, which creates
2422 # the reparse data buffer with the print name stored
2423 # first, so the offset is always 0. CreateSymbolicLink
2424 # stores the "PrintName" DOS path (e.g. "C:\") first,
2425 # with an offset of 0, followed by the "SubstituteName"
2426 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2427 # the other hand, seems to have been created manually
2428 # with an inverted order.
2429 target = os.readlink(r'C:\Users\All Users')
2430 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2431
Steve Dower6921e732018-03-05 14:26:08 -08002432 def test_buffer_overflow(self):
2433 # Older versions would have a buffer overflow when detecting
2434 # whether a link source was a directory. This test ensures we
2435 # no longer crash, but does not otherwise validate the behavior
2436 segment = 'X' * 27
2437 path = os.path.join(*[segment] * 10)
2438 test_cases = [
2439 # overflow with absolute src
2440 ('\\' + path, segment),
2441 # overflow dest with relative src
2442 (segment, path),
2443 # overflow when joining src
2444 (path[:180], path[:180]),
2445 ]
2446 for src, dest in test_cases:
2447 try:
2448 os.symlink(src, dest)
2449 except FileNotFoundError:
2450 pass
2451 else:
2452 try:
2453 os.remove(dest)
2454 except OSError:
2455 pass
2456 # Also test with bytes, since that is a separate code path.
2457 try:
2458 os.symlink(os.fsencode(src), os.fsencode(dest))
2459 except FileNotFoundError:
2460 pass
2461 else:
2462 try:
2463 os.remove(dest)
2464 except OSError:
2465 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002466
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002467 def test_appexeclink(self):
2468 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002469 if not os.path.isdir(root):
2470 self.skipTest("test requires a WindowsApps directory")
2471
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002472 aliases = [os.path.join(root, a)
2473 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2474
2475 for alias in aliases:
2476 if support.verbose:
2477 print()
2478 print("Testing with", alias)
2479 st = os.lstat(alias)
2480 self.assertEqual(st, os.stat(alias))
2481 self.assertFalse(stat.S_ISLNK(st.st_mode))
2482 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2483 # testing the first one we see is sufficient
2484 break
2485 else:
2486 self.skipTest("test requires an app execution alias")
2487
Tim Golden0321cf22014-05-05 19:46:17 +01002488@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2489class Win32JunctionTests(unittest.TestCase):
2490 junction = 'junctiontest'
2491 junction_target = os.path.dirname(os.path.abspath(__file__))
2492
2493 def setUp(self):
2494 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002495 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002496
2497 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002498 if os.path.lexists(self.junction):
2499 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002500
2501 def test_create_junction(self):
2502 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002503 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002504 self.assertTrue(os.path.exists(self.junction))
2505 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002506 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2507 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002508
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002509 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002510 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002511 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2512 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002513
2514 def test_unlink_removes_junction(self):
2515 _winapi.CreateJunction(self.junction_target, self.junction)
2516 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002517 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002518
2519 os.unlink(self.junction)
2520 self.assertFalse(os.path.exists(self.junction))
2521
Mark Becwarb82bfac2019-02-02 16:08:23 -05002522@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2523class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002524 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002525 nt = support.import_module('nt')
2526 ctypes = support.import_module('ctypes')
2527 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002528
2529 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2530 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2531
2532 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2533 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2534 ctypes.wintypes.LPDWORD)
2535
2536 # This is a pseudo-handle that doesn't need to be closed
2537 hproc = kernel.GetCurrentProcess()
2538
2539 handle_count = ctypes.wintypes.DWORD()
2540 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2541 self.assertEqual(1, ok)
2542
2543 before_count = handle_count.value
2544
2545 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002546 filenames = [
2547 r'\\?\C:',
2548 r'\\?\NUL',
2549 r'\\?\CONIN',
2550 __file__,
2551 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002552
Berker Peksag6ef726a2019-04-22 18:46:28 +03002553 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002554 for name in filenames:
2555 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002556 nt._getfinalpathname(name)
2557 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002558 # Failure is expected
2559 pass
2560 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002561 os.stat(name)
2562 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002563 pass
2564
2565 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2566 self.assertEqual(1, ok)
2567
2568 handle_delta = handle_count.value - before_count
2569
2570 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002571
Jason R. Coombs3a092862013-05-27 23:21:28 -04002572@support.skip_unless_symlink
2573class NonLocalSymlinkTests(unittest.TestCase):
2574
2575 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002576 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002577 Create this structure:
2578
2579 base
2580 \___ some_dir
2581 """
2582 os.makedirs('base/some_dir')
2583
2584 def tearDown(self):
2585 shutil.rmtree('base')
2586
2587 def test_directory_link_nonlocal(self):
2588 """
2589 The symlink target should resolve relative to the link, not relative
2590 to the current directory.
2591
2592 Then, link base/some_link -> base/some_dir and ensure that some_link
2593 is resolved as a directory.
2594
2595 In issue13772, it was discovered that directory detection failed if
2596 the symlink target was not specified relative to the current
2597 directory, which was a defect in the implementation.
2598 """
2599 src = os.path.join('base', 'some_link')
2600 os.symlink('some_dir', src)
2601 assert os.path.isdir(src)
2602
2603
Victor Stinnere8d51452010-08-19 01:05:19 +00002604class FSEncodingTests(unittest.TestCase):
2605 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002606 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2607 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002608
Victor Stinnere8d51452010-08-19 01:05:19 +00002609 def test_identity(self):
2610 # assert fsdecode(fsencode(x)) == x
2611 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2612 try:
2613 bytesfn = os.fsencode(fn)
2614 except UnicodeEncodeError:
2615 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002617
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002618
Brett Cannonefb00c02012-02-29 18:31:31 -05002619
2620class DeviceEncodingTests(unittest.TestCase):
2621
2622 def test_bad_fd(self):
2623 # Return None when an fd doesn't actually exist.
2624 self.assertIsNone(os.device_encoding(123456))
2625
Paul Monson62dfd7d2019-04-25 11:36:45 -07002626 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002627 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002628 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002629 def test_device_encoding(self):
2630 encoding = os.device_encoding(0)
2631 self.assertIsNotNone(encoding)
2632 self.assertTrue(codecs.lookup(encoding))
2633
2634
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002635class PidTests(unittest.TestCase):
2636 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2637 def test_getppid(self):
2638 p = subprocess.Popen([sys.executable, '-c',
2639 'import os; print(os.getppid())'],
2640 stdout=subprocess.PIPE)
2641 stdout, _ = p.communicate()
2642 # We are the parent of our subprocess
2643 self.assertEqual(int(stdout), os.getpid())
2644
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002645 def test_waitpid(self):
2646 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002647 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002648 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002649 status = os.waitpid(pid, 0)
2650 self.assertEqual(status, (pid, 0))
2651
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002652
Victor Stinner4659ccf2016-09-14 10:57:00 +02002653class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002654 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002655 self.exitcode = 17
2656
2657 filename = support.TESTFN
2658 self.addCleanup(support.unlink, filename)
2659
2660 if not with_env:
2661 code = 'import sys; sys.exit(%s)' % self.exitcode
2662 else:
2663 self.env = dict(os.environ)
2664 # create an unique key
2665 self.key = str(uuid.uuid4())
2666 self.env[self.key] = self.key
2667 # read the variable from os.environ to check that it exists
2668 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2669 % (self.key, self.exitcode))
2670
2671 with open(filename, "w") as fp:
2672 fp.write(code)
2673
Berker Peksag81816462016-09-15 20:19:47 +03002674 args = [sys.executable, filename]
2675 if use_bytes:
2676 args = [os.fsencode(a) for a in args]
2677 self.env = {os.fsencode(k): os.fsencode(v)
2678 for k, v in self.env.items()}
2679
2680 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002681
Berker Peksag4af23d72016-09-15 20:32:44 +03002682 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002683 def test_spawnl(self):
2684 args = self.create_args()
2685 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2686 self.assertEqual(exitcode, self.exitcode)
2687
Berker Peksag4af23d72016-09-15 20:32:44 +03002688 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002689 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002690 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002691 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2692 self.assertEqual(exitcode, self.exitcode)
2693
Berker Peksag4af23d72016-09-15 20:32:44 +03002694 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002695 def test_spawnlp(self):
2696 args = self.create_args()
2697 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2698 self.assertEqual(exitcode, self.exitcode)
2699
Berker Peksag4af23d72016-09-15 20:32:44 +03002700 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002701 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002702 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002703 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2704 self.assertEqual(exitcode, self.exitcode)
2705
Berker Peksag4af23d72016-09-15 20:32:44 +03002706 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002707 def test_spawnv(self):
2708 args = self.create_args()
2709 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2710 self.assertEqual(exitcode, self.exitcode)
2711
Berker Peksag4af23d72016-09-15 20:32:44 +03002712 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002713 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002714 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002715 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2716 self.assertEqual(exitcode, self.exitcode)
2717
Berker Peksag4af23d72016-09-15 20:32:44 +03002718 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002719 def test_spawnvp(self):
2720 args = self.create_args()
2721 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2722 self.assertEqual(exitcode, self.exitcode)
2723
Berker Peksag4af23d72016-09-15 20:32:44 +03002724 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002725 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002726 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002727 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2728 self.assertEqual(exitcode, self.exitcode)
2729
Berker Peksag4af23d72016-09-15 20:32:44 +03002730 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002731 def test_nowait(self):
2732 args = self.create_args()
2733 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2734 result = os.waitpid(pid, 0)
2735 self.assertEqual(result[0], pid)
2736 status = result[1]
2737 if hasattr(os, 'WIFEXITED'):
2738 self.assertTrue(os.WIFEXITED(status))
2739 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2740 else:
2741 self.assertEqual(status, self.exitcode << 8)
2742
Berker Peksag4af23d72016-09-15 20:32:44 +03002743 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002744 def test_spawnve_bytes(self):
2745 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2746 args = self.create_args(with_env=True, use_bytes=True)
2747 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2748 self.assertEqual(exitcode, self.exitcode)
2749
Steve Dower859fd7b2016-11-19 18:53:19 -08002750 @requires_os_func('spawnl')
2751 def test_spawnl_noargs(self):
2752 args = self.create_args()
2753 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002754 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002755
2756 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002757 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002758 args = self.create_args()
2759 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002760 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002761
2762 @requires_os_func('spawnv')
2763 def test_spawnv_noargs(self):
2764 args = self.create_args()
2765 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2766 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002767 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2768 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002769
2770 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002771 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002772 args = self.create_args()
2773 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2774 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002775 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2776 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002777
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002778 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002779 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002780
Ville Skyttä49b27342017-08-03 09:00:59 +03002781 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002782 newenv = os.environ.copy()
2783 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2784 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002785 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002786 except ValueError:
2787 pass
2788 else:
2789 self.assertEqual(exitcode, 127)
2790
Ville Skyttä49b27342017-08-03 09:00:59 +03002791 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002792 newenv = os.environ.copy()
2793 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2794 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002795 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002796 except ValueError:
2797 pass
2798 else:
2799 self.assertEqual(exitcode, 127)
2800
Ville Skyttä49b27342017-08-03 09:00:59 +03002801 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002802 newenv = os.environ.copy()
2803 newenv["FRUIT=ORANGE"] = "lemon"
2804 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002805 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002806 except ValueError:
2807 pass
2808 else:
2809 self.assertEqual(exitcode, 127)
2810
Ville Skyttä49b27342017-08-03 09:00:59 +03002811 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002812 filename = support.TESTFN
2813 self.addCleanup(support.unlink, filename)
2814 with open(filename, "w") as fp:
2815 fp.write('import sys, os\n'
2816 'if os.getenv("FRUIT") != "orange=lemon":\n'
2817 ' raise AssertionError')
2818 args = [sys.executable, filename]
2819 newenv = os.environ.copy()
2820 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002821 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002822 self.assertEqual(exitcode, 0)
2823
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002824 @requires_os_func('spawnve')
2825 def test_spawnve_invalid_env(self):
2826 self._test_invalid_env(os.spawnve)
2827
2828 @requires_os_func('spawnvpe')
2829 def test_spawnvpe_invalid_env(self):
2830 self._test_invalid_env(os.spawnvpe)
2831
Serhiy Storchaka77703942017-06-25 07:33:01 +03002832
Brian Curtin0151b8e2010-09-24 13:43:43 +00002833# The introduction of this TestCase caused at least two different errors on
2834# *nix buildbots. Temporarily skip this to let the buildbots move along.
2835@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002836@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2837class LoginTests(unittest.TestCase):
2838 def test_getlogin(self):
2839 user_name = os.getlogin()
2840 self.assertNotEqual(len(user_name), 0)
2841
2842
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002843@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2844 "needs os.getpriority and os.setpriority")
2845class ProgramPriorityTests(unittest.TestCase):
2846 """Tests for os.getpriority() and os.setpriority()."""
2847
2848 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002849
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002850 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2851 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2852 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002853 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2854 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002855 raise unittest.SkipTest("unable to reliably test setpriority "
2856 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002857 else:
2858 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002859 finally:
2860 try:
2861 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2862 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002863 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002864 raise
2865
2866
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002867class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002868
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002869 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002870
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002871 def __init__(self, conn):
2872 asynchat.async_chat.__init__(self, conn)
2873 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002874 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002875 self.closed = False
2876 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002877
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002878 def handle_read(self):
2879 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002880 if self.accumulate:
2881 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002882
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002883 def get_data(self):
2884 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002885
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002886 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002887 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002888 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002889
2890 def handle_error(self):
2891 raise
2892
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893 def __init__(self, address):
2894 threading.Thread.__init__(self)
2895 asyncore.dispatcher.__init__(self)
2896 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2897 self.bind(address)
2898 self.listen(5)
2899 self.host, self.port = self.socket.getsockname()[:2]
2900 self.handler_instance = None
2901 self._active = False
2902 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002903
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002904 # --- public API
2905
2906 @property
2907 def running(self):
2908 return self._active
2909
2910 def start(self):
2911 assert not self.running
2912 self.__flag = threading.Event()
2913 threading.Thread.start(self)
2914 self.__flag.wait()
2915
2916 def stop(self):
2917 assert self.running
2918 self._active = False
2919 self.join()
2920
2921 def wait(self):
2922 # wait for handler connection to be closed, then stop the server
2923 while not getattr(self.handler_instance, "closed", False):
2924 time.sleep(0.001)
2925 self.stop()
2926
2927 # --- internals
2928
2929 def run(self):
2930 self._active = True
2931 self.__flag.set()
2932 while self._active and asyncore.socket_map:
2933 self._active_lock.acquire()
2934 asyncore.loop(timeout=0.001, count=1)
2935 self._active_lock.release()
2936 asyncore.close_all()
2937
2938 def handle_accept(self):
2939 conn, addr = self.accept()
2940 self.handler_instance = self.Handler(conn)
2941
2942 def handle_connect(self):
2943 self.close()
2944 handle_read = handle_connect
2945
2946 def writable(self):
2947 return 0
2948
2949 def handle_error(self):
2950 raise
2951
2952
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002953@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2954class TestSendfile(unittest.TestCase):
2955
Victor Stinner8c663fd2017-11-08 14:44:44 -08002956 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002957 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002958 not sys.platform.startswith("solaris") and \
2959 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002960 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2961 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002962 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2963 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002964
2965 @classmethod
2966 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002967 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002968 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002969
2970 @classmethod
2971 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002972 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002973 support.unlink(support.TESTFN)
2974
2975 def setUp(self):
2976 self.server = SendfileTestServer((support.HOST, 0))
2977 self.server.start()
2978 self.client = socket.socket()
2979 self.client.connect((self.server.host, self.server.port))
2980 self.client.settimeout(1)
2981 # synchronize by waiting for "220 ready" response
2982 self.client.recv(1024)
2983 self.sockno = self.client.fileno()
2984 self.file = open(support.TESTFN, 'rb')
2985 self.fileno = self.file.fileno()
2986
2987 def tearDown(self):
2988 self.file.close()
2989 self.client.close()
2990 if self.server.running:
2991 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002992 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002993
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002994 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002995 """A higher level wrapper representing how an application is
2996 supposed to use sendfile().
2997 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002998 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002999 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003000 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003001 except OSError as err:
3002 if err.errno == errno.ECONNRESET:
3003 # disconnected
3004 raise
3005 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3006 # we have to retry send data
3007 continue
3008 else:
3009 raise
3010
3011 def test_send_whole_file(self):
3012 # normal send
3013 total_sent = 0
3014 offset = 0
3015 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003016 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003017 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3018 if sent == 0:
3019 break
3020 offset += sent
3021 total_sent += sent
3022 self.assertTrue(sent <= nbytes)
3023 self.assertEqual(offset, total_sent)
3024
3025 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003026 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003027 self.client.close()
3028 self.server.wait()
3029 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003030 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003031 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003032
3033 def test_send_at_certain_offset(self):
3034 # start sending a file at a certain offset
3035 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003036 offset = len(self.DATA) // 2
3037 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003038 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003039 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003040 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3041 if sent == 0:
3042 break
3043 offset += sent
3044 total_sent += sent
3045 self.assertTrue(sent <= nbytes)
3046
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003047 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003048 self.client.close()
3049 self.server.wait()
3050 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003051 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003052 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003053 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003054 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003055
3056 def test_offset_overflow(self):
3057 # specify an offset > file size
3058 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003059 try:
3060 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3061 except OSError as e:
3062 # Solaris can raise EINVAL if offset >= file length, ignore.
3063 if e.errno != errno.EINVAL:
3064 raise
3065 else:
3066 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003067 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003068 self.client.close()
3069 self.server.wait()
3070 data = self.server.handler_instance.get_data()
3071 self.assertEqual(data, b'')
3072
3073 def test_invalid_offset(self):
3074 with self.assertRaises(OSError) as cm:
3075 os.sendfile(self.sockno, self.fileno, -1, 4096)
3076 self.assertEqual(cm.exception.errno, errno.EINVAL)
3077
Martin Panterbf19d162015-09-09 01:01:13 +00003078 def test_keywords(self):
3079 # Keyword arguments should be supported
3080 os.sendfile(out=self.sockno, offset=0, count=4096,
3081 **{'in': self.fileno})
3082 if self.SUPPORT_HEADERS_TRAILERS:
3083 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003084 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003085
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003086 # --- headers / trailers tests
3087
Serhiy Storchaka43767632013-11-03 21:31:38 +02003088 @requires_headers_trailers
3089 def test_headers(self):
3090 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003091 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003092 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003093 headers=[b"x" * 512, b"y" * 256])
3094 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003095 total_sent += sent
3096 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003097 while total_sent < len(expected_data):
3098 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003099 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3100 offset, nbytes)
3101 if sent == 0:
3102 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003103 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003104 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003105 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003106
Serhiy Storchaka43767632013-11-03 21:31:38 +02003107 self.assertEqual(total_sent, len(expected_data))
3108 self.client.close()
3109 self.server.wait()
3110 data = self.server.handler_instance.get_data()
3111 self.assertEqual(hash(data), hash(expected_data))
3112
3113 @requires_headers_trailers
3114 def test_trailers(self):
3115 TESTFN2 = support.TESTFN + "2"
3116 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003117
3118 self.addCleanup(support.unlink, TESTFN2)
3119 create_file(TESTFN2, file_data)
3120
3121 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003122 os.sendfile(self.sockno, f.fileno(), 0, 5,
3123 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003124 self.client.close()
3125 self.server.wait()
3126 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003127 self.assertEqual(data, b"abcde123456789")
3128
3129 @requires_headers_trailers
3130 @requires_32b
3131 def test_headers_overflow_32bits(self):
3132 self.server.handler_instance.accumulate = False
3133 with self.assertRaises(OSError) as cm:
3134 os.sendfile(self.sockno, self.fileno, 0, 0,
3135 headers=[b"x" * 2**16] * 2**15)
3136 self.assertEqual(cm.exception.errno, errno.EINVAL)
3137
3138 @requires_headers_trailers
3139 @requires_32b
3140 def test_trailers_overflow_32bits(self):
3141 self.server.handler_instance.accumulate = False
3142 with self.assertRaises(OSError) as cm:
3143 os.sendfile(self.sockno, self.fileno, 0, 0,
3144 trailers=[b"x" * 2**16] * 2**15)
3145 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003146
Serhiy Storchaka43767632013-11-03 21:31:38 +02003147 @requires_headers_trailers
3148 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3149 'test needs os.SF_NODISKIO')
3150 def test_flags(self):
3151 try:
3152 os.sendfile(self.sockno, self.fileno, 0, 4096,
3153 flags=os.SF_NODISKIO)
3154 except OSError as err:
3155 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3156 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003157
3158
Larry Hastings9cf065c2012-06-22 16:30:09 -07003159def supports_extended_attributes():
3160 if not hasattr(os, "setxattr"):
3161 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003162
Larry Hastings9cf065c2012-06-22 16:30:09 -07003163 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003164 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003165 try:
3166 os.setxattr(fp.fileno(), b"user.test", b"")
3167 except OSError:
3168 return False
3169 finally:
3170 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003171
3172 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003173
3174
3175@unittest.skipUnless(supports_extended_attributes(),
3176 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003177# Kernels < 2.6.39 don't respect setxattr flags.
3178@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003179class ExtendedAttributeTests(unittest.TestCase):
3180
Larry Hastings9cf065c2012-06-22 16:30:09 -07003181 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003182 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003183 self.addCleanup(support.unlink, fn)
3184 create_file(fn)
3185
Benjamin Peterson799bd802011-08-31 22:15:17 -04003186 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003187 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003188 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003189
Victor Stinnerf12e5062011-10-16 22:12:03 +02003190 init_xattr = listxattr(fn)
3191 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003192
Larry Hastings9cf065c2012-06-22 16:30:09 -07003193 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003194 xattr = set(init_xattr)
3195 xattr.add("user.test")
3196 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003197 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3198 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3199 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003200
Benjamin Peterson799bd802011-08-31 22:15:17 -04003201 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003202 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003203 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003204
Benjamin Peterson799bd802011-08-31 22:15:17 -04003205 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003206 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003207 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003208
Larry Hastings9cf065c2012-06-22 16:30:09 -07003209 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003210 xattr.add("user.test2")
3211 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003212 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003213
Benjamin Peterson799bd802011-08-31 22:15:17 -04003214 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003215 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003216 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003217
Victor Stinnerf12e5062011-10-16 22:12:03 +02003218 xattr.remove("user.test")
3219 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003220 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3221 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3222 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3223 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003224 many = sorted("user.test{}".format(i) for i in range(100))
3225 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003226 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003227 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003228
Larry Hastings9cf065c2012-06-22 16:30:09 -07003229 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003230 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003231 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003232
3233 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3234 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003235
3236 def test_simple(self):
3237 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3238 os.listxattr)
3239
3240 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003241 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3242 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003243
3244 def test_fds(self):
3245 def getxattr(path, *args):
3246 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003247 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003248 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003249 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003250 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003251 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003252 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003253 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003254 def listxattr(path, *args):
3255 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003256 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003257 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3258
3259
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003260@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3261class TermsizeTests(unittest.TestCase):
3262 def test_does_not_crash(self):
3263 """Check if get_terminal_size() returns a meaningful value.
3264
3265 There's no easy portable way to actually check the size of the
3266 terminal, so let's check if it returns something sensible instead.
3267 """
3268 try:
3269 size = os.get_terminal_size()
3270 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003271 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003272 # Under win32 a generic OSError can be thrown if the
3273 # handle cannot be retrieved
3274 self.skipTest("failed to query terminal size")
3275 raise
3276
Antoine Pitroucfade362012-02-08 23:48:59 +01003277 self.assertGreaterEqual(size.columns, 0)
3278 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003279
3280 def test_stty_match(self):
3281 """Check if stty returns the same results
3282
3283 stty actually tests stdin, so get_terminal_size is invoked on
3284 stdin explicitly. If stty succeeded, then get_terminal_size()
3285 should work too.
3286 """
3287 try:
3288 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003289 except (FileNotFoundError, subprocess.CalledProcessError,
3290 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003291 self.skipTest("stty invocation failed")
3292 expected = (int(size[1]), int(size[0])) # reversed order
3293
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003294 try:
3295 actual = os.get_terminal_size(sys.__stdin__.fileno())
3296 except OSError as e:
3297 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3298 # Under win32 a generic OSError can be thrown if the
3299 # handle cannot be retrieved
3300 self.skipTest("failed to query terminal size")
3301 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003302 self.assertEqual(expected, actual)
3303
3304
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003305@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003306@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003307class MemfdCreateTests(unittest.TestCase):
3308 def test_memfd_create(self):
3309 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3310 self.assertNotEqual(fd, -1)
3311 self.addCleanup(os.close, fd)
3312 self.assertFalse(os.get_inheritable(fd))
3313 with open(fd, "wb", closefd=False) as f:
3314 f.write(b'memfd_create')
3315 self.assertEqual(f.tell(), 12)
3316
3317 fd2 = os.memfd_create("Hi")
3318 self.addCleanup(os.close, fd2)
3319 self.assertFalse(os.get_inheritable(fd2))
3320
3321
Victor Stinner292c8352012-10-30 02:17:38 +01003322class OSErrorTests(unittest.TestCase):
3323 def setUp(self):
3324 class Str(str):
3325 pass
3326
Victor Stinnerafe17062012-10-31 22:47:43 +01003327 self.bytes_filenames = []
3328 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003329 if support.TESTFN_UNENCODABLE is not None:
3330 decoded = support.TESTFN_UNENCODABLE
3331 else:
3332 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003333 self.unicode_filenames.append(decoded)
3334 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003335 if support.TESTFN_UNDECODABLE is not None:
3336 encoded = support.TESTFN_UNDECODABLE
3337 else:
3338 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003339 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003340 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003341 self.bytes_filenames.append(memoryview(encoded))
3342
3343 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003344
3345 def test_oserror_filename(self):
3346 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003347 (self.filenames, os.chdir,),
3348 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003349 (self.filenames, os.lstat,),
3350 (self.filenames, os.open, os.O_RDONLY),
3351 (self.filenames, os.rmdir,),
3352 (self.filenames, os.stat,),
3353 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003354 ]
3355 if sys.platform == "win32":
3356 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003357 (self.bytes_filenames, os.rename, b"dst"),
3358 (self.bytes_filenames, os.replace, b"dst"),
3359 (self.unicode_filenames, os.rename, "dst"),
3360 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003361 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003362 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003363 else:
3364 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003365 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003366 (self.filenames, os.rename, "dst"),
3367 (self.filenames, os.replace, "dst"),
3368 ))
3369 if hasattr(os, "chown"):
3370 funcs.append((self.filenames, os.chown, 0, 0))
3371 if hasattr(os, "lchown"):
3372 funcs.append((self.filenames, os.lchown, 0, 0))
3373 if hasattr(os, "truncate"):
3374 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003375 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003376 funcs.append((self.filenames, os.chflags, 0))
3377 if hasattr(os, "lchflags"):
3378 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003379 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003380 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003381 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003382 if sys.platform == "win32":
3383 funcs.append((self.bytes_filenames, os.link, b"dst"))
3384 funcs.append((self.unicode_filenames, os.link, "dst"))
3385 else:
3386 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003387 if hasattr(os, "listxattr"):
3388 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003389 (self.filenames, os.listxattr,),
3390 (self.filenames, os.getxattr, "user.test"),
3391 (self.filenames, os.setxattr, "user.test", b'user'),
3392 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003393 ))
3394 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003395 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003396 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003397 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003398
Steve Dowercc16be82016-09-08 10:35:16 -07003399
Victor Stinnerafe17062012-10-31 22:47:43 +01003400 for filenames, func, *func_args in funcs:
3401 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003402 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003403 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003404 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003405 else:
3406 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3407 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003408 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003409 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003410 except UnicodeDecodeError:
3411 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003412 else:
3413 self.fail("No exception thrown by {}".format(func))
3414
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003415class CPUCountTests(unittest.TestCase):
3416 def test_cpu_count(self):
3417 cpus = os.cpu_count()
3418 if cpus is not None:
3419 self.assertIsInstance(cpus, int)
3420 self.assertGreater(cpus, 0)
3421 else:
3422 self.skipTest("Could not determine the number of CPUs")
3423
Victor Stinnerdaf45552013-08-28 00:53:59 +02003424
3425class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003426 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003427 fd = os.open(__file__, os.O_RDONLY)
3428 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003429 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003430
Victor Stinnerdaf45552013-08-28 00:53:59 +02003431 os.set_inheritable(fd, True)
3432 self.assertEqual(os.get_inheritable(fd), True)
3433
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003434 @unittest.skipIf(fcntl is None, "need fcntl")
3435 def test_get_inheritable_cloexec(self):
3436 fd = os.open(__file__, os.O_RDONLY)
3437 self.addCleanup(os.close, fd)
3438 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003439
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003440 # clear FD_CLOEXEC flag
3441 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3442 flags &= ~fcntl.FD_CLOEXEC
3443 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003444
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003445 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003446
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003447 @unittest.skipIf(fcntl is None, "need fcntl")
3448 def test_set_inheritable_cloexec(self):
3449 fd = os.open(__file__, os.O_RDONLY)
3450 self.addCleanup(os.close, fd)
3451 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3452 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003453
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003454 os.set_inheritable(fd, True)
3455 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3456 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003457
Victor Stinnerdaf45552013-08-28 00:53:59 +02003458 def test_open(self):
3459 fd = os.open(__file__, os.O_RDONLY)
3460 self.addCleanup(os.close, fd)
3461 self.assertEqual(os.get_inheritable(fd), False)
3462
3463 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3464 def test_pipe(self):
3465 rfd, wfd = os.pipe()
3466 self.addCleanup(os.close, rfd)
3467 self.addCleanup(os.close, wfd)
3468 self.assertEqual(os.get_inheritable(rfd), False)
3469 self.assertEqual(os.get_inheritable(wfd), False)
3470
3471 def test_dup(self):
3472 fd1 = os.open(__file__, os.O_RDONLY)
3473 self.addCleanup(os.close, fd1)
3474
3475 fd2 = os.dup(fd1)
3476 self.addCleanup(os.close, fd2)
3477 self.assertEqual(os.get_inheritable(fd2), False)
3478
Zackery Spytz5be66602019-08-23 12:38:41 -06003479 def test_dup_standard_stream(self):
3480 fd = os.dup(1)
3481 self.addCleanup(os.close, fd)
3482 self.assertGreater(fd, 0)
3483
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003484 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3485 def test_dup_nul(self):
3486 # os.dup() was creating inheritable fds for character files.
3487 fd1 = os.open('NUL', os.O_RDONLY)
3488 self.addCleanup(os.close, fd1)
3489 fd2 = os.dup(fd1)
3490 self.addCleanup(os.close, fd2)
3491 self.assertFalse(os.get_inheritable(fd2))
3492
Victor Stinnerdaf45552013-08-28 00:53:59 +02003493 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3494 def test_dup2(self):
3495 fd = os.open(__file__, os.O_RDONLY)
3496 self.addCleanup(os.close, fd)
3497
3498 # inheritable by default
3499 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003500 self.addCleanup(os.close, fd2)
3501 self.assertEqual(os.dup2(fd, fd2), fd2)
3502 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003503
3504 # force non-inheritable
3505 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003506 self.addCleanup(os.close, fd3)
3507 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3508 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003509
3510 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3511 def test_openpty(self):
3512 master_fd, slave_fd = os.openpty()
3513 self.addCleanup(os.close, master_fd)
3514 self.addCleanup(os.close, slave_fd)
3515 self.assertEqual(os.get_inheritable(master_fd), False)
3516 self.assertEqual(os.get_inheritable(slave_fd), False)
3517
3518
Brett Cannon3f9183b2016-08-26 14:44:48 -07003519class PathTConverterTests(unittest.TestCase):
3520 # tuples of (function name, allows fd arguments, additional arguments to
3521 # function, cleanup function)
3522 functions = [
3523 ('stat', True, (), None),
3524 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003525 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003526 ('chflags', False, (0,), None),
3527 ('lchflags', False, (0,), None),
3528 ('open', False, (0,), getattr(os, 'close', None)),
3529 ]
3530
3531 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003532 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003533 if os.name == 'nt':
3534 bytes_fspath = bytes_filename = None
3535 else:
3536 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003537 bytes_fspath = FakePath(bytes_filename)
3538 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003539 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003540 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003541
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003542 int_fspath = FakePath(fd)
3543 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003544
3545 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3546 with self.subTest(name=name):
3547 try:
3548 fn = getattr(os, name)
3549 except AttributeError:
3550 continue
3551
Brett Cannon8f96a302016-08-26 19:30:11 -07003552 for path in (str_filename, bytes_filename, str_fspath,
3553 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003554 if path is None:
3555 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003556 with self.subTest(name=name, path=path):
3557 result = fn(path, *extra_args)
3558 if cleanup_fn is not None:
3559 cleanup_fn(result)
3560
3561 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003562 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003563 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003564
3565 if allow_fd:
3566 result = fn(fd, *extra_args) # should not fail
3567 if cleanup_fn is not None:
3568 cleanup_fn(result)
3569 else:
3570 with self.assertRaisesRegex(
3571 TypeError,
3572 'os.PathLike'):
3573 fn(fd, *extra_args)
3574
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003575 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003576 msg = r'__fspath__\(\) to return str or bytes, not %s'
3577 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003578 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003579 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003580 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003581 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003582 os.stat(FakePath(object()))
3583
Brett Cannon3f9183b2016-08-26 14:44:48 -07003584
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003585@unittest.skipUnless(hasattr(os, 'get_blocking'),
3586 'needs os.get_blocking() and os.set_blocking()')
3587class BlockingTests(unittest.TestCase):
3588 def test_blocking(self):
3589 fd = os.open(__file__, os.O_RDONLY)
3590 self.addCleanup(os.close, fd)
3591 self.assertEqual(os.get_blocking(fd), True)
3592
3593 os.set_blocking(fd, False)
3594 self.assertEqual(os.get_blocking(fd), False)
3595
3596 os.set_blocking(fd, True)
3597 self.assertEqual(os.get_blocking(fd), True)
3598
3599
Yury Selivanov97e2e062014-09-26 12:33:06 -04003600
3601class ExportsTests(unittest.TestCase):
3602 def test_os_all(self):
3603 self.assertIn('open', os.__all__)
3604 self.assertIn('walk', os.__all__)
3605
3606
Victor Stinner6036e442015-03-08 01:58:04 +01003607class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003608 check_no_resource_warning = support.check_no_resource_warning
3609
Victor Stinner6036e442015-03-08 01:58:04 +01003610 def setUp(self):
3611 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003612 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003613 self.addCleanup(support.rmtree, self.path)
3614 os.mkdir(self.path)
3615
3616 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003617 path = self.bytes_path if isinstance(name, bytes) else self.path
3618 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003619 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003620 return filename
3621
3622 def get_entries(self, names):
3623 entries = dict((entry.name, entry)
3624 for entry in os.scandir(self.path))
3625 self.assertEqual(sorted(entries.keys()), names)
3626 return entries
3627
3628 def assert_stat_equal(self, stat1, stat2, skip_fields):
3629 if skip_fields:
3630 for attr in dir(stat1):
3631 if not attr.startswith("st_"):
3632 continue
3633 if attr in ("st_dev", "st_ino", "st_nlink"):
3634 continue
3635 self.assertEqual(getattr(stat1, attr),
3636 getattr(stat2, attr),
3637 (stat1, stat2, attr))
3638 else:
3639 self.assertEqual(stat1, stat2)
3640
3641 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003642 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003643 self.assertEqual(entry.name, name)
3644 self.assertEqual(entry.path, os.path.join(self.path, name))
3645 self.assertEqual(entry.inode(),
3646 os.stat(entry.path, follow_symlinks=False).st_ino)
3647
3648 entry_stat = os.stat(entry.path)
3649 self.assertEqual(entry.is_dir(),
3650 stat.S_ISDIR(entry_stat.st_mode))
3651 self.assertEqual(entry.is_file(),
3652 stat.S_ISREG(entry_stat.st_mode))
3653 self.assertEqual(entry.is_symlink(),
3654 os.path.islink(entry.path))
3655
3656 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3657 self.assertEqual(entry.is_dir(follow_symlinks=False),
3658 stat.S_ISDIR(entry_lstat.st_mode))
3659 self.assertEqual(entry.is_file(follow_symlinks=False),
3660 stat.S_ISREG(entry_lstat.st_mode))
3661
3662 self.assert_stat_equal(entry.stat(),
3663 entry_stat,
3664 os.name == 'nt' and not is_symlink)
3665 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3666 entry_lstat,
3667 os.name == 'nt')
3668
3669 def test_attributes(self):
3670 link = hasattr(os, 'link')
3671 symlink = support.can_symlink()
3672
3673 dirname = os.path.join(self.path, "dir")
3674 os.mkdir(dirname)
3675 filename = self.create_file("file.txt")
3676 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003677 try:
3678 os.link(filename, os.path.join(self.path, "link_file.txt"))
3679 except PermissionError as e:
3680 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003681 if symlink:
3682 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3683 target_is_directory=True)
3684 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3685
3686 names = ['dir', 'file.txt']
3687 if link:
3688 names.append('link_file.txt')
3689 if symlink:
3690 names.extend(('symlink_dir', 'symlink_file.txt'))
3691 entries = self.get_entries(names)
3692
3693 entry = entries['dir']
3694 self.check_entry(entry, 'dir', True, False, False)
3695
3696 entry = entries['file.txt']
3697 self.check_entry(entry, 'file.txt', False, True, False)
3698
3699 if link:
3700 entry = entries['link_file.txt']
3701 self.check_entry(entry, 'link_file.txt', False, True, False)
3702
3703 if symlink:
3704 entry = entries['symlink_dir']
3705 self.check_entry(entry, 'symlink_dir', True, False, True)
3706
3707 entry = entries['symlink_file.txt']
3708 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3709
3710 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003711 path = self.bytes_path if isinstance(name, bytes) else self.path
3712 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003713 self.assertEqual(len(entries), 1)
3714
3715 entry = entries[0]
3716 self.assertEqual(entry.name, name)
3717 return entry
3718
Brett Cannon96881cd2016-06-10 14:37:21 -07003719 def create_file_entry(self, name='file.txt'):
3720 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003721 return self.get_entry(os.path.basename(filename))
3722
3723 def test_current_directory(self):
3724 filename = self.create_file()
3725 old_dir = os.getcwd()
3726 try:
3727 os.chdir(self.path)
3728
3729 # call scandir() without parameter: it must list the content
3730 # of the current directory
3731 entries = dict((entry.name, entry) for entry in os.scandir())
3732 self.assertEqual(sorted(entries.keys()),
3733 [os.path.basename(filename)])
3734 finally:
3735 os.chdir(old_dir)
3736
3737 def test_repr(self):
3738 entry = self.create_file_entry()
3739 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3740
Brett Cannon96881cd2016-06-10 14:37:21 -07003741 def test_fspath_protocol(self):
3742 entry = self.create_file_entry()
3743 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3744
3745 def test_fspath_protocol_bytes(self):
3746 bytes_filename = os.fsencode('bytesfile.txt')
3747 bytes_entry = self.create_file_entry(name=bytes_filename)
3748 fspath = os.fspath(bytes_entry)
3749 self.assertIsInstance(fspath, bytes)
3750 self.assertEqual(fspath,
3751 os.path.join(os.fsencode(self.path),bytes_filename))
3752
Victor Stinner6036e442015-03-08 01:58:04 +01003753 def test_removed_dir(self):
3754 path = os.path.join(self.path, 'dir')
3755
3756 os.mkdir(path)
3757 entry = self.get_entry('dir')
3758 os.rmdir(path)
3759
3760 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3761 if os.name == 'nt':
3762 self.assertTrue(entry.is_dir())
3763 self.assertFalse(entry.is_file())
3764 self.assertFalse(entry.is_symlink())
3765 if os.name == 'nt':
3766 self.assertRaises(FileNotFoundError, entry.inode)
3767 # don't fail
3768 entry.stat()
3769 entry.stat(follow_symlinks=False)
3770 else:
3771 self.assertGreater(entry.inode(), 0)
3772 self.assertRaises(FileNotFoundError, entry.stat)
3773 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3774
3775 def test_removed_file(self):
3776 entry = self.create_file_entry()
3777 os.unlink(entry.path)
3778
3779 self.assertFalse(entry.is_dir())
3780 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3781 if os.name == 'nt':
3782 self.assertTrue(entry.is_file())
3783 self.assertFalse(entry.is_symlink())
3784 if os.name == 'nt':
3785 self.assertRaises(FileNotFoundError, entry.inode)
3786 # don't fail
3787 entry.stat()
3788 entry.stat(follow_symlinks=False)
3789 else:
3790 self.assertGreater(entry.inode(), 0)
3791 self.assertRaises(FileNotFoundError, entry.stat)
3792 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3793
3794 def test_broken_symlink(self):
3795 if not support.can_symlink():
3796 return self.skipTest('cannot create symbolic link')
3797
3798 filename = self.create_file("file.txt")
3799 os.symlink(filename,
3800 os.path.join(self.path, "symlink.txt"))
3801 entries = self.get_entries(['file.txt', 'symlink.txt'])
3802 entry = entries['symlink.txt']
3803 os.unlink(filename)
3804
3805 self.assertGreater(entry.inode(), 0)
3806 self.assertFalse(entry.is_dir())
3807 self.assertFalse(entry.is_file()) # broken symlink returns False
3808 self.assertFalse(entry.is_dir(follow_symlinks=False))
3809 self.assertFalse(entry.is_file(follow_symlinks=False))
3810 self.assertTrue(entry.is_symlink())
3811 self.assertRaises(FileNotFoundError, entry.stat)
3812 # don't fail
3813 entry.stat(follow_symlinks=False)
3814
3815 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003816 self.create_file("file.txt")
3817
3818 path_bytes = os.fsencode(self.path)
3819 entries = list(os.scandir(path_bytes))
3820 self.assertEqual(len(entries), 1, entries)
3821 entry = entries[0]
3822
3823 self.assertEqual(entry.name, b'file.txt')
3824 self.assertEqual(entry.path,
3825 os.fsencode(os.path.join(self.path, 'file.txt')))
3826
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003827 def test_bytes_like(self):
3828 self.create_file("file.txt")
3829
3830 for cls in bytearray, memoryview:
3831 path_bytes = cls(os.fsencode(self.path))
3832 with self.assertWarns(DeprecationWarning):
3833 entries = list(os.scandir(path_bytes))
3834 self.assertEqual(len(entries), 1, entries)
3835 entry = entries[0]
3836
3837 self.assertEqual(entry.name, b'file.txt')
3838 self.assertEqual(entry.path,
3839 os.fsencode(os.path.join(self.path, 'file.txt')))
3840 self.assertIs(type(entry.name), bytes)
3841 self.assertIs(type(entry.path), bytes)
3842
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003843 @unittest.skipUnless(os.listdir in os.supports_fd,
3844 'fd support for listdir required for this test.')
3845 def test_fd(self):
3846 self.assertIn(os.scandir, os.supports_fd)
3847 self.create_file('file.txt')
3848 expected_names = ['file.txt']
3849 if support.can_symlink():
3850 os.symlink('file.txt', os.path.join(self.path, 'link'))
3851 expected_names.append('link')
3852
3853 fd = os.open(self.path, os.O_RDONLY)
3854 try:
3855 with os.scandir(fd) as it:
3856 entries = list(it)
3857 names = [entry.name for entry in entries]
3858 self.assertEqual(sorted(names), expected_names)
3859 self.assertEqual(names, os.listdir(fd))
3860 for entry in entries:
3861 self.assertEqual(entry.path, entry.name)
3862 self.assertEqual(os.fspath(entry), entry.name)
3863 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3864 if os.stat in os.supports_dir_fd:
3865 st = os.stat(entry.name, dir_fd=fd)
3866 self.assertEqual(entry.stat(), st)
3867 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3868 self.assertEqual(entry.stat(follow_symlinks=False), st)
3869 finally:
3870 os.close(fd)
3871
Victor Stinner6036e442015-03-08 01:58:04 +01003872 def test_empty_path(self):
3873 self.assertRaises(FileNotFoundError, os.scandir, '')
3874
3875 def test_consume_iterator_twice(self):
3876 self.create_file("file.txt")
3877 iterator = os.scandir(self.path)
3878
3879 entries = list(iterator)
3880 self.assertEqual(len(entries), 1, entries)
3881
3882 # check than consuming the iterator twice doesn't raise exception
3883 entries2 = list(iterator)
3884 self.assertEqual(len(entries2), 0, entries2)
3885
3886 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003887 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003888 self.assertRaises(TypeError, os.scandir, obj)
3889
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003890 def test_close(self):
3891 self.create_file("file.txt")
3892 self.create_file("file2.txt")
3893 iterator = os.scandir(self.path)
3894 next(iterator)
3895 iterator.close()
3896 # multiple closes
3897 iterator.close()
3898 with self.check_no_resource_warning():
3899 del iterator
3900
3901 def test_context_manager(self):
3902 self.create_file("file.txt")
3903 self.create_file("file2.txt")
3904 with os.scandir(self.path) as iterator:
3905 next(iterator)
3906 with self.check_no_resource_warning():
3907 del iterator
3908
3909 def test_context_manager_close(self):
3910 self.create_file("file.txt")
3911 self.create_file("file2.txt")
3912 with os.scandir(self.path) as iterator:
3913 next(iterator)
3914 iterator.close()
3915
3916 def test_context_manager_exception(self):
3917 self.create_file("file.txt")
3918 self.create_file("file2.txt")
3919 with self.assertRaises(ZeroDivisionError):
3920 with os.scandir(self.path) as iterator:
3921 next(iterator)
3922 1/0
3923 with self.check_no_resource_warning():
3924 del iterator
3925
3926 def test_resource_warning(self):
3927 self.create_file("file.txt")
3928 self.create_file("file2.txt")
3929 iterator = os.scandir(self.path)
3930 next(iterator)
3931 with self.assertWarns(ResourceWarning):
3932 del iterator
3933 support.gc_collect()
3934 # exhausted iterator
3935 iterator = os.scandir(self.path)
3936 list(iterator)
3937 with self.check_no_resource_warning():
3938 del iterator
3939
Victor Stinner6036e442015-03-08 01:58:04 +01003940
Ethan Furmancdc08792016-06-02 15:06:09 -07003941class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003942
3943 # Abstracted so it can be overridden to test pure Python implementation
3944 # if a C version is provided.
3945 fspath = staticmethod(os.fspath)
3946
Ethan Furmancdc08792016-06-02 15:06:09 -07003947 def test_return_bytes(self):
3948 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003949 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003950
3951 def test_return_string(self):
3952 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003953 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003954
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003955 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003956 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003957 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003958
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003959 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003960 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3961 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3962
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003963 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003964 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3965 self.assertTrue(issubclass(FakePath, os.PathLike))
3966 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003967
Ethan Furmancdc08792016-06-02 15:06:09 -07003968 def test_garbage_in_exception_out(self):
3969 vapor = type('blah', (), {})
3970 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003971 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003972
3973 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003974 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003975
Brett Cannon044283a2016-07-15 10:41:49 -07003976 def test_bad_pathlike(self):
3977 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003978 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003979 # __fspath__ attribute that is not callable.
3980 c = type('foo', (), {})
3981 c.__fspath__ = 1
3982 self.assertRaises(TypeError, self.fspath, c())
3983 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003984 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003985 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003986
Victor Stinnerc29b5852017-11-02 07:28:27 -07003987
3988class TimesTests(unittest.TestCase):
3989 def test_times(self):
3990 times = os.times()
3991 self.assertIsInstance(times, os.times_result)
3992
3993 for field in ('user', 'system', 'children_user', 'children_system',
3994 'elapsed'):
3995 value = getattr(times, field)
3996 self.assertIsInstance(value, float)
3997
3998 if os.name == 'nt':
3999 self.assertEqual(times.children_user, 0)
4000 self.assertEqual(times.children_system, 0)
4001 self.assertEqual(times.elapsed, 0)
4002
4003
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004004# Only test if the C version is provided, otherwise TestPEP519 already tested
4005# the pure Python implementation.
4006if hasattr(os, "_fspath"):
4007 class TestPEP519PurePython(TestPEP519):
4008
4009 """Explicitly test the pure Python implementation of os.fspath()."""
4010
4011 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004012
4013
Fred Drake2e2be372001-09-20 21:33:42 +00004014if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004015 unittest.main()