blob: ba9f5c35ae3188e3b4a0a7462491ebc1082ce963 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070032from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import pwd
48 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010049except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050050 all_users = []
51try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020052 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020053except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020054 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020055
Berker Peksagce643912015-05-06 06:33:17 +030056from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020057from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000058
Victor Stinner923590e2016-03-24 09:11:48 +010059
R David Murrayf2ad1732014-12-25 18:36:56 -050060root_in_posix = False
61if hasattr(os, 'geteuid'):
62 root_in_posix = (os.geteuid() == 0)
63
Mark Dickinson7cf03892010-04-16 13:45:35 +000064# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library. There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020068if hasattr(sys, 'thread_info') and sys.thread_info.version:
69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000072
Stefan Krahebee49a2013-01-17 15:31:00 +010073# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
Victor Stinner923590e2016-03-24 09:11:48 +010076
Berker Peksag4af23d72016-09-15 20:32:44 +030077def requires_os_func(name):
78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
Victor Stinnerae39d232016-03-24 17:12:55 +010081def create_file(filename, content=b'content'):
82 with open(filename, "xb", 0) as fp:
83 fp.write(content)
84
85
Victor Stinner689830e2019-06-26 17:31:12 +020086class MiscTests(unittest.TestCase):
87 def test_getcwd(self):
88 cwd = os.getcwd()
89 self.assertIsInstance(cwd, str)
90
Victor Stinnerec3e20a2019-06-28 18:01:59 +020091 def test_getcwd_long_path(self):
92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94 # longer path if longer paths support is enabled. Internally, the os
95 # module uses MAXPATHLEN which is at least 1024.
96 #
97 # Use a directory name of 200 characters to fit into Windows MAX_PATH
98 # limit.
99 #
100 # On Windows, the test can stop when trying to create a path longer
101 # than MAX_PATH if long paths support is disabled:
102 # see RtlAreLongPathsEnabled().
103 min_len = 2000 # characters
104 dirlen = 200 # characters
105 dirname = 'python_test_dir_'
106 dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108 with tempfile.TemporaryDirectory() as tmpdir:
Victor Stinner29f609e2019-06-28 19:39:48 +0200109 with support.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200110 expected = path
111
112 while True:
113 cwd = os.getcwd()
114 self.assertEqual(cwd, expected)
115
116 need = min_len - (len(cwd) + len(os.path.sep))
117 if need <= 0:
118 break
119 if len(dirname) > need and need > 0:
120 dirname = dirname[:need]
121
122 path = os.path.join(path, dirname)
123 try:
124 os.mkdir(path)
125 # On Windows, chdir() can fail
126 # even if mkdir() succeeded
127 os.chdir(path)
128 except FileNotFoundError:
129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130 # ERROR_FILENAME_EXCED_RANGE (206) errors
131 # ("The filename or extension is too long")
132 break
133 except OSError as exc:
134 if exc.errno == errno.ENAMETOOLONG:
135 break
136 else:
137 raise
138
139 expected = path
140
141 if support.verbose:
142 print(f"Tested current directory length: {len(cwd)}")
143
Victor Stinner689830e2019-06-26 17:31:12 +0200144 def test_getcwdb(self):
145 cwd = os.getcwdb()
146 self.assertIsInstance(cwd, bytes)
147 self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000153 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155 tearDown = setUp
156
157 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000159 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000160 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161
Christian Heimesfdab48e2008-01-20 09:06:41 +0000162 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164 # We must allocate two consecutive file descriptors, otherwise
165 # it will mess up other file descriptors (perhaps even the three
166 # standard ones).
167 second = os.dup(first)
168 try:
169 retries = 0
170 while second != first + 1:
171 os.close(first)
172 retries += 1
173 if retries > 10:
174 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000175 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000176 first, second = second, os.dup(second)
177 finally:
178 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000179 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000180 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000181 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000182
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000183 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000184 def test_rename(self):
185 path = support.TESTFN
186 old = sys.getrefcount(path)
187 self.assertRaises(TypeError, os.rename, path, 0)
188 new = sys.getrefcount(path)
189 self.assertEqual(old, new)
190
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 def test_read(self):
192 with open(support.TESTFN, "w+b") as fobj:
193 fobj.write(b"spam")
194 fobj.flush()
195 fd = fobj.fileno()
196 os.lseek(fd, 0, 0)
197 s = os.read(fd, 4)
198 self.assertEqual(type(s), bytes)
199 self.assertEqual(s, b"spam")
200
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200201 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200202 # Skip the test on 32-bit platforms: the number of bytes must fit in a
203 # Py_ssize_t type
204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200208 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200210
211 # Issue #21932: Make sure that os.read() does not raise an
212 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200213 with open(support.TESTFN, "rb") as fp:
214 data = os.read(fp.fileno(), size)
215
Victor Stinner8c663fd2017-11-08 14:44:44 -0800216 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200217 # operating system is free to return less bytes than requested.
218 self.assertEqual(data, b'test')
219
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000220 def test_write(self):
221 # os.write() accepts bytes- and buffer-like objects but not strings
222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223 self.assertRaises(TypeError, os.write, fd, "beans")
224 os.write(fd, b"bacon\n")
225 os.write(fd, bytearray(b"eggs\n"))
226 os.write(fd, memoryview(b"spam\n"))
227 os.close(fd)
228 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000229 self.assertEqual(fobj.read().splitlines(),
230 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000231
Victor Stinnere0daff12011-03-20 23:36:35 +0100232 def write_windows_console(self, *args):
233 retcode = subprocess.call(args,
234 # use a new console to not flood the test output
235 creationflags=subprocess.CREATE_NEW_CONSOLE,
236 # use a shell to hide the console window (SW_HIDE)
237 shell=True)
238 self.assertEqual(retcode, 0)
239
240 @unittest.skipUnless(sys.platform == 'win32',
241 'test specific to the Windows console')
242 def test_write_windows_console(self):
243 # Issue #11395: the Windows console returns an error (12: not enough
244 # space error) on writing into stdout if stdout mode is binary and the
245 # length is greater than 66,000 bytes (or less, depending on heap
246 # usage).
247 code = "print('x' * 100000)"
248 self.write_windows_console(sys.executable, "-c", code)
249 self.write_windows_console(sys.executable, "-u", "-c", code)
250
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000251 def fdopen_helper(self, *args):
252 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253 f = os.fdopen(fd, *args)
254 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000255
256 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258 os.close(fd)
259
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000260 self.fdopen_helper()
261 self.fdopen_helper('r')
262 self.fdopen_helper('r', 100)
263
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100264 def test_replace(self):
265 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100266 self.addCleanup(support.unlink, support.TESTFN)
267 self.addCleanup(support.unlink, TESTFN2)
268
269 create_file(support.TESTFN, b"1")
270 create_file(TESTFN2, b"2")
271
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100272 os.replace(support.TESTFN, TESTFN2)
273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274 with open(TESTFN2, 'r') as f:
275 self.assertEqual(f.read(), "1")
276
Martin Panterbf19d162015-09-09 01:01:13 +0000277 def test_open_keywords(self):
278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279 dir_fd=None)
280 os.close(f)
281
282 def test_symlink_keywords(self):
283 symlink = support.get_attribute(os, "symlink")
284 try:
285 symlink(src='target', dst=support.TESTFN,
286 target_is_directory=False, dir_fd=None)
287 except (NotImplementedError, OSError):
288 pass # No OS support or unprivileged user
289
Pablo Galindoaac4d032019-05-31 19:39:47 +0100290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291 def test_copy_file_range_invalid_values(self):
292 with self.assertRaises(ValueError):
293 os.copy_file_range(0, 1, -10)
294
295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296 def test_copy_file_range(self):
297 TESTFN2 = support.TESTFN + ".3"
298 data = b'0123456789'
299
300 create_file(support.TESTFN, data)
301 self.addCleanup(support.unlink, support.TESTFN)
302
303 in_file = open(support.TESTFN, 'rb')
304 self.addCleanup(in_file.close)
305 in_fd = in_file.fileno()
306
307 out_file = open(TESTFN2, 'w+b')
308 self.addCleanup(support.unlink, TESTFN2)
309 self.addCleanup(out_file.close)
310 out_fd = out_file.fileno()
311
312 try:
313 i = os.copy_file_range(in_fd, out_fd, 5)
314 except OSError as e:
315 # Handle the case in which Python was compiled
316 # in a system with the syscall but without support
317 # in the kernel.
318 if e.errno != errno.ENOSYS:
319 raise
320 self.skipTest(e)
321 else:
322 # The number of copied bytes can be less than
323 # the number of bytes originally requested.
324 self.assertIn(i, range(0, 6));
325
326 with open(TESTFN2, 'rb') as in_file:
327 self.assertEqual(in_file.read(), data[:i])
328
329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330 def test_copy_file_range_offset(self):
331 TESTFN4 = support.TESTFN + ".4"
332 data = b'0123456789'
333 bytes_to_copy = 6
334 in_skip = 3
335 out_seek = 5
336
337 create_file(support.TESTFN, data)
338 self.addCleanup(support.unlink, support.TESTFN)
339
340 in_file = open(support.TESTFN, 'rb')
341 self.addCleanup(in_file.close)
342 in_fd = in_file.fileno()
343
344 out_file = open(TESTFN4, 'w+b')
345 self.addCleanup(support.unlink, TESTFN4)
346 self.addCleanup(out_file.close)
347 out_fd = out_file.fileno()
348
349 try:
350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351 offset_src=in_skip,
352 offset_dst=out_seek)
353 except OSError as e:
354 # Handle the case in which Python was compiled
355 # in a system with the syscall but without support
356 # in the kernel.
357 if e.errno != errno.ENOSYS:
358 raise
359 self.skipTest(e)
360 else:
361 # The number of copied bytes can be less than
362 # the number of bytes originally requested.
363 self.assertIn(i, range(0, bytes_to_copy+1));
364
365 with open(TESTFN4, 'rb') as in_file:
366 read = in_file.read()
367 # seeked bytes (5) are zero'ed
368 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369 # 012 are skipped (in_skip)
370 # 345678 are copied in the file (in_skip + bytes_to_copy)
371 self.assertEqual(read[out_seek:],
372 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200373
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200377 self.fname = support.TESTFN
378 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100379 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000382 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383
384 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000385 self.assertEqual(result[stat.ST_SIZE], 3)
386 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000388 # Make sure all the attributes are there
389 members = dir(result)
390 for name in dir(stat):
391 if name[:3] == 'ST_':
392 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000393 if name.endswith("TIME"):
394 def trunc(x): return int(x)
395 else:
396 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000397 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000398 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000399 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000400
Larry Hastings6fe20b32012-04-19 15:07:49 -0700401 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700403 for name in 'st_atime st_mtime st_ctime'.split():
404 floaty = int(getattr(result, name) * 100000)
405 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000408 try:
409 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200410 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 except IndexError:
412 pass
413
414 # Make sure that assignment fails
415 try:
416 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200417 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000418 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 pass
420
421 try:
422 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000424 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 except AttributeError:
431 pass
432
433 # Use the stat_result constructor with a too-short tuple.
434 try:
435 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200436 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000437 except TypeError:
438 pass
439
Ezio Melotti42da6632011-03-15 05:18:48 +0200440 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000441 try:
442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443 except TypeError:
444 pass
445
Antoine Pitrou38425292010-09-21 18:19:07 +0000446 def test_stat_attributes(self):
447 self.check_stat_attributes(self.fname)
448
449 def test_stat_attributes_bytes(self):
450 try:
451 fname = self.fname.encode(sys.getfilesystemencoding())
452 except UnicodeEncodeError:
453 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700454 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 def test_stat_result_pickle(self):
457 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200458 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459 p = pickle.dumps(result, proto)
460 self.assertIn(b'stat_result', p)
461 if proto < 4:
462 self.assertIn(b'cos\nstat_result\n', p)
463 unpickled = pickle.loads(p)
464 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000467 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700468 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000469
470 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000471 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000472
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000473 # Make sure all the attributes are there.
474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475 'ffree', 'favail', 'flag', 'namemax')
476 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100479 self.assertTrue(isinstance(result.f_fsid, int))
480
481 # Test that the size of the tuple doesn't change
482 self.assertEqual(len(result), 10)
483
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484 # Make sure that assignment really fails
485 try:
486 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200487 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000488 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000489 pass
490
491 try:
492 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000494 except AttributeError:
495 pass
496
497 # Use the constructor with a too-short tuple.
498 try:
499 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200500 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501 except TypeError:
502 pass
503
Ezio Melotti42da6632011-03-15 05:18:48 +0200504 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000505 try:
506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507 except TypeError:
508 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000509
Christian Heimes25827622013-10-12 01:27:08 +0200510 @unittest.skipUnless(hasattr(os, 'statvfs'),
511 "need os.statvfs()")
512 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700513 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200514
Serhiy Storchakabad12572014-12-15 14:03:42 +0200515 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516 p = pickle.dumps(result, proto)
517 self.assertIn(b'statvfs_result', p)
518 if proto < 4:
519 self.assertIn(b'cos\nstatvfs_result\n', p)
520 unpickled = pickle.loads(p)
521 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 def test_1686475(self):
525 # Verify that an open file can be stat'ed
526 try:
527 os.stat(r"c:\pagefile.sys")
528 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600529 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200530 except OSError as e:
531 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535 def test_15261(self):
536 # Verify that stat'ing a closed fd does not cause crash
537 r, w = os.pipe()
538 try:
539 os.stat(r) # should not raise error
540 finally:
541 os.close(r)
542 os.close(w)
543 with self.assertRaises(OSError) as ctx:
544 os.stat(r)
545 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100546
Zachary Ware63f277b2014-06-19 09:46:37 -0500547 def check_file_attributes(self, result):
548 self.assertTrue(hasattr(result, 'st_file_attributes'))
549 self.assertTrue(isinstance(result.st_file_attributes, int))
550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552 @unittest.skipUnless(sys.platform == "win32",
553 "st_file_attributes is Win32 specific")
554 def test_file_attributes(self):
555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556 result = os.stat(self.fname)
557 self.check_file_attributes(result)
558 self.assertEqual(
559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560 0)
561
562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 dirname = support.TESTFN + "dir"
564 os.mkdir(dirname)
565 self.addCleanup(os.rmdir, dirname)
566
567 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500568 self.check_file_attributes(result)
569 self.assertEqual(
570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571 stat.FILE_ATTRIBUTE_DIRECTORY)
572
Berker Peksag0b4dc482016-09-17 15:49:59 +0300573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574 def test_access_denied(self):
575 # Default to FindFirstFile WIN32_FIND_DATA when access is
576 # denied. See issue 28075.
577 # os.environ['TEMP'] should be located on a volume that
578 # supports file ACLs.
579 fname = os.path.join(os.environ['TEMP'], self.fname)
580 self.addCleanup(support.unlink, fname)
581 create_file(fname, b'ABC')
582 # Deny the right to [S]YNCHRONIZE on the file to
583 # force CreateFile to fail with ERROR_ACCESS_DENIED.
584 DETACHED_PROCESS = 8
585 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500586 # bpo-30584: Use security identifier *S-1-5-32-545 instead
587 # of localized "Users" to not depend on the locale.
588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300589 creationflags=DETACHED_PROCESS
590 )
591 result = os.stat(fname)
592 self.assertNotEqual(result.st_size, 0)
593
Victor Stinner47aacc82015-06-12 17:26:23 +0200594
595class UtimeTests(unittest.TestCase):
596 def setUp(self):
597 self.dirname = support.TESTFN
598 self.fname = os.path.join(self.dirname, "f1")
599
600 self.addCleanup(support.rmtree, self.dirname)
601 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100602 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200603
Victor Stinner47aacc82015-06-12 17:26:23 +0200604 def support_subsecond(self, filename):
605 # Heuristic to check if the filesystem supports timestamp with
606 # subsecond resolution: check if float and int timestamps are different
607 st = os.stat(filename)
608 return ((st.st_atime != st[7])
609 or (st.st_mtime != st[8])
610 or (st.st_ctime != st[9]))
611
612 def _test_utime(self, set_time, filename=None):
613 if not filename:
614 filename = self.fname
615
616 support_subsecond = self.support_subsecond(filename)
617 if support_subsecond:
618 # Timestamp with a resolution of 1 microsecond (10^-6).
619 #
620 # The resolution of the C internal function used by os.utime()
621 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
622 # test with a resolution of 1 ns requires more work:
623 # see the issue #15745.
624 atime_ns = 1002003000 # 1.002003 seconds
625 mtime_ns = 4005006000 # 4.005006 seconds
626 else:
627 # use a resolution of 1 second
628 atime_ns = 5 * 10**9
629 mtime_ns = 8 * 10**9
630
631 set_time(filename, (atime_ns, mtime_ns))
632 st = os.stat(filename)
633
634 if support_subsecond:
635 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
636 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
637 else:
638 self.assertEqual(st.st_atime, atime_ns * 1e-9)
639 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
640 self.assertEqual(st.st_atime_ns, atime_ns)
641 self.assertEqual(st.st_mtime_ns, mtime_ns)
642
643 def test_utime(self):
644 def set_time(filename, ns):
645 # test the ns keyword parameter
646 os.utime(filename, ns=ns)
647 self._test_utime(set_time)
648
649 @staticmethod
650 def ns_to_sec(ns):
651 # Convert a number of nanosecond (int) to a number of seconds (float).
652 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
653 # issue, os.utime() rounds towards minus infinity.
654 return (ns * 1e-9) + 0.5e-9
655
656 def test_utime_by_indexed(self):
657 # pass times as floating point seconds as the second indexed parameter
658 def set_time(filename, ns):
659 atime_ns, mtime_ns = ns
660 atime = self.ns_to_sec(atime_ns)
661 mtime = self.ns_to_sec(mtime_ns)
662 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
663 # or utime(time_t)
664 os.utime(filename, (atime, mtime))
665 self._test_utime(set_time)
666
667 def test_utime_by_times(self):
668 def set_time(filename, ns):
669 atime_ns, mtime_ns = ns
670 atime = self.ns_to_sec(atime_ns)
671 mtime = self.ns_to_sec(mtime_ns)
672 # test the times keyword parameter
673 os.utime(filename, times=(atime, mtime))
674 self._test_utime(set_time)
675
676 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
677 "follow_symlinks support for utime required "
678 "for this test.")
679 def test_utime_nofollow_symlinks(self):
680 def set_time(filename, ns):
681 # use follow_symlinks=False to test utimensat(timespec)
682 # or lutimes(timeval)
683 os.utime(filename, ns=ns, follow_symlinks=False)
684 self._test_utime(set_time)
685
686 @unittest.skipUnless(os.utime in os.supports_fd,
687 "fd support for utime required for this test.")
688 def test_utime_fd(self):
689 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100690 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200691 # use a file descriptor to test futimens(timespec)
692 # or futimes(timeval)
693 os.utime(fp.fileno(), ns=ns)
694 self._test_utime(set_time)
695
696 @unittest.skipUnless(os.utime in os.supports_dir_fd,
697 "dir_fd support for utime required for this test.")
698 def test_utime_dir_fd(self):
699 def set_time(filename, ns):
700 dirname, name = os.path.split(filename)
701 dirfd = os.open(dirname, os.O_RDONLY)
702 try:
703 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
704 os.utime(name, dir_fd=dirfd, ns=ns)
705 finally:
706 os.close(dirfd)
707 self._test_utime(set_time)
708
709 def test_utime_directory(self):
710 def set_time(filename, ns):
711 # test calling os.utime() on a directory
712 os.utime(filename, ns=ns)
713 self._test_utime(set_time, filename=self.dirname)
714
715 def _test_utime_current(self, set_time):
716 # Get the system clock
717 current = time.time()
718
719 # Call os.utime() to set the timestamp to the current system clock
720 set_time(self.fname)
721
722 if not self.support_subsecond(self.fname):
723 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700724 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200725 # On Windows, the usual resolution of time.time() is 15.6 ms.
726 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700727 #
728 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
729 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200730 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200731 st = os.stat(self.fname)
732 msg = ("st_time=%r, current=%r, dt=%r"
733 % (st.st_mtime, current, st.st_mtime - current))
734 self.assertAlmostEqual(st.st_mtime, current,
735 delta=delta, msg=msg)
736
737 def test_utime_current(self):
738 def set_time(filename):
739 # Set to the current time in the new way
740 os.utime(self.fname)
741 self._test_utime_current(set_time)
742
743 def test_utime_current_old(self):
744 def set_time(filename):
745 # Set to the current time in the old explicit way.
746 os.utime(self.fname, None)
747 self._test_utime_current(set_time)
748
749 def get_file_system(self, path):
750 if sys.platform == 'win32':
751 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
752 import ctypes
753 kernel32 = ctypes.windll.kernel32
754 buf = ctypes.create_unicode_buffer("", 100)
755 ok = kernel32.GetVolumeInformationW(root, None, 0,
756 None, None, None,
757 buf, len(buf))
758 if ok:
759 return buf.value
760 # return None if the filesystem is unknown
761
762 def test_large_time(self):
763 # Many filesystems are limited to the year 2038. At least, the test
764 # pass with NTFS filesystem.
765 if self.get_file_system(self.dirname) != "NTFS":
766 self.skipTest("requires NTFS")
767
768 large = 5000000000 # some day in 2128
769 os.utime(self.fname, (large, large))
770 self.assertEqual(os.stat(self.fname).st_mtime, large)
771
772 def test_utime_invalid_arguments(self):
773 # seconds and nanoseconds parameters are mutually exclusive
774 with self.assertRaises(ValueError):
775 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200776 with self.assertRaises(TypeError):
777 os.utime(self.fname, [5, 5])
778 with self.assertRaises(TypeError):
779 os.utime(self.fname, (5,))
780 with self.assertRaises(TypeError):
781 os.utime(self.fname, (5, 5, 5))
782 with self.assertRaises(TypeError):
783 os.utime(self.fname, ns=[5, 5])
784 with self.assertRaises(TypeError):
785 os.utime(self.fname, ns=(5,))
786 with self.assertRaises(TypeError):
787 os.utime(self.fname, ns=(5, 5, 5))
788
789 if os.utime not in os.supports_follow_symlinks:
790 with self.assertRaises(NotImplementedError):
791 os.utime(self.fname, (5, 5), follow_symlinks=False)
792 if os.utime not in os.supports_fd:
793 with open(self.fname, 'wb', 0) as fp:
794 with self.assertRaises(TypeError):
795 os.utime(fp.fileno(), (5, 5))
796 if os.utime not in os.supports_dir_fd:
797 with self.assertRaises(NotImplementedError):
798 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200799
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300800 @support.cpython_only
801 def test_issue31577(self):
802 # The interpreter shouldn't crash in case utime() received a bad
803 # ns argument.
804 def get_bad_int(divmod_ret_val):
805 class BadInt:
806 def __divmod__(*args):
807 return divmod_ret_val
808 return BadInt()
809 with self.assertRaises(TypeError):
810 os.utime(self.fname, ns=(get_bad_int(42), 1))
811 with self.assertRaises(TypeError):
812 os.utime(self.fname, ns=(get_bad_int(()), 1))
813 with self.assertRaises(TypeError):
814 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
815
Victor Stinner47aacc82015-06-12 17:26:23 +0200816
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000817from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000818
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000819class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000820 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000821 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000822
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000823 def setUp(self):
824 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000825 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000826 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000827 for key, value in self._reference().items():
828 os.environ[key] = value
829
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000830 def tearDown(self):
831 os.environ.clear()
832 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000834 os.environb.clear()
835 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000836
Christian Heimes90333392007-11-01 19:08:42 +0000837 def _reference(self):
838 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
839
840 def _empty_mapping(self):
841 os.environ.clear()
842 return os.environ
843
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000844 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200845 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
846 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000847 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000848 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300849 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200850 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300851 value = popen.read().strip()
852 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000853
Xavier de Gayed1415312016-07-22 12:15:29 +0200854 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
855 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000856 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200857 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
858 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300859 it = iter(popen)
860 self.assertEqual(next(it), "line1\n")
861 self.assertEqual(next(it), "line2\n")
862 self.assertEqual(next(it), "line3\n")
863 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000864
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000865 # Verify environ keys and values from the OS are of the
866 # correct str type.
867 def test_keyvalue_types(self):
868 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000869 self.assertEqual(type(key), str)
870 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000871
Christian Heimes90333392007-11-01 19:08:42 +0000872 def test_items(self):
873 for key, value in self._reference().items():
874 self.assertEqual(os.environ.get(key), value)
875
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000876 # Issue 7310
877 def test___repr__(self):
878 """Check that the repr() of os.environ looks like environ({...})."""
879 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000880 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
881 '{!r}: {!r}'.format(key, value)
882 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000883
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000884 def test_get_exec_path(self):
885 defpath_list = os.defpath.split(os.pathsep)
886 test_path = ['/monty', '/python', '', '/flying/circus']
887 test_env = {'PATH': os.pathsep.join(test_path)}
888
889 saved_environ = os.environ
890 try:
891 os.environ = dict(test_env)
892 # Test that defaulting to os.environ works.
893 self.assertSequenceEqual(test_path, os.get_exec_path())
894 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
895 finally:
896 os.environ = saved_environ
897
898 # No PATH environment variable
899 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
900 # Empty PATH environment variable
901 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
902 # Supplied PATH environment variable
903 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
904
Victor Stinnerb745a742010-05-18 17:17:23 +0000905 if os.supports_bytes_environ:
906 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000907 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000908 # ignore BytesWarning warning
909 with warnings.catch_warnings(record=True):
910 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000911 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000912 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000913 pass
914 else:
915 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000916
917 # bytes key and/or value
918 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
919 ['abc'])
920 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
921 ['abc'])
922 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
923 ['abc'])
924
925 @unittest.skipUnless(os.supports_bytes_environ,
926 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000927 def test_environb(self):
928 # os.environ -> os.environb
929 value = 'euro\u20ac'
930 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000931 value_bytes = value.encode(sys.getfilesystemencoding(),
932 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000933 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000934 msg = "U+20AC character is not encodable to %s" % (
935 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000936 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000937 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000938 self.assertEqual(os.environ['unicode'], value)
939 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000940
941 # os.environb -> os.environ
942 value = b'\xff'
943 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000944 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000945 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000947
Victor Stinner13ff2452018-01-22 18:32:50 +0100948 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100949 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100950 def test_unset_error(self):
951 if sys.platform == "win32":
952 # an environment variable is limited to 32,767 characters
953 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100954 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100955 else:
956 # "=" is not allowed in a variable name
957 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100958 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100959
Victor Stinner6d101392013-04-14 16:35:04 +0200960 def test_key_type(self):
961 missing = 'missingkey'
962 self.assertNotIn(missing, os.environ)
963
Victor Stinner839e5ea2013-04-14 16:43:03 +0200964 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200965 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200966 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200967 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200968
Victor Stinner839e5ea2013-04-14 16:43:03 +0200969 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200970 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200971 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200972 self.assertTrue(cm.exception.__suppress_context__)
973
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300974 def _test_environ_iteration(self, collection):
975 iterator = iter(collection)
976 new_key = "__new_key__"
977
978 next(iterator) # start iteration over os.environ.items
979
980 # add a new key in os.environ mapping
981 os.environ[new_key] = "test_environ_iteration"
982
983 try:
984 next(iterator) # force iteration over modified mapping
985 self.assertEqual(os.environ[new_key], "test_environ_iteration")
986 finally:
987 del os.environ[new_key]
988
989 def test_iter_error_when_changing_os_environ(self):
990 self._test_environ_iteration(os.environ)
991
992 def test_iter_error_when_changing_os_environ_items(self):
993 self._test_environ_iteration(os.environ.items())
994
995 def test_iter_error_when_changing_os_environ_values(self):
996 self._test_environ_iteration(os.environ.values())
997
Victor Stinner6d101392013-04-14 16:35:04 +0200998
Tim Petersc4e09402003-04-25 07:11:48 +0000999class WalkTests(unittest.TestCase):
1000 """Tests for os.walk()."""
1001
Victor Stinner0561c532015-03-12 10:28:24 +01001002 # Wrapper to hide minor differences between os.walk and os.fwalk
1003 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001004 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001005 if 'follow_symlinks' in kwargs:
1006 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001007 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001008
Charles-François Natali7372b062012-02-05 15:15:38 +01001009 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001010 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +01001011 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001012
1013 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001014 # TESTFN/
1015 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001016 # tmp1
1017 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001018 # tmp2
1019 # SUB11/ no kids
1020 # SUB2/ a file kid and a dirsymlink kid
1021 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001022 # SUB21/ not readable
1023 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001024 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001025 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001026 # broken_link2
1027 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001028 # TEST2/
1029 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +01001030 self.walk_path = join(support.TESTFN, "TEST1")
1031 self.sub1_path = join(self.walk_path, "SUB1")
1032 self.sub11_path = join(self.sub1_path, "SUB11")
1033 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001034 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001035 tmp1_path = join(self.walk_path, "tmp1")
1036 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001037 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001038 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001039 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001040 t2_path = join(support.TESTFN, "TEST2")
1041 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001042 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001043 broken_link2_path = join(sub2_path, "broken_link2")
1044 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001045
1046 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001047 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001048 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001049 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001050 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001051
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001052 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +01001053 with open(path, "x") as f:
1054 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001055
Victor Stinner0561c532015-03-12 10:28:24 +01001056 if support.can_symlink():
1057 os.symlink(os.path.abspath(t2_path), self.link_path)
1058 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001059 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1060 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001061 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001062 ["broken_link", "broken_link2", "broken_link3",
1063 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001064 else:
pxinwr3e028b22019-02-15 13:04:47 +08001065 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001066
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001067 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001068 try:
1069 os.listdir(sub21_path)
1070 except PermissionError:
1071 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1072 else:
1073 os.chmod(sub21_path, stat.S_IRWXU)
1074 os.unlink(tmp5_path)
1075 os.rmdir(sub21_path)
1076 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001077
Victor Stinner0561c532015-03-12 10:28:24 +01001078 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001079 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001080 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001081
Tim Petersc4e09402003-04-25 07:11:48 +00001082 self.assertEqual(len(all), 4)
1083 # We can't know which order SUB1 and SUB2 will appear in.
1084 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1085 # flipped: TESTFN, SUB2, SUB1, SUB11
1086 flipped = all[0][1][0] != "SUB1"
1087 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001088 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001089 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001090 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1091 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1092 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1093 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001094
Brett Cannon3f9183b2016-08-26 14:44:48 -07001095 def test_walk_prune(self, walk_path=None):
1096 if walk_path is None:
1097 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001098 # Prune the search.
1099 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001100 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001101 all.append((root, dirs, files))
1102 # Don't descend into SUB1.
1103 if 'SUB1' in dirs:
1104 # Note that this also mutates the dirs we appended to all!
1105 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001106
Victor Stinner0561c532015-03-12 10:28:24 +01001107 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001108 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001109
1110 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001111 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001112 self.assertEqual(all[1], self.sub2_tree)
1113
Brett Cannon3f9183b2016-08-26 14:44:48 -07001114 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001115 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001116
Victor Stinner0561c532015-03-12 10:28:24 +01001117 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001118 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001119 all = list(self.walk(self.walk_path, topdown=False))
1120
Victor Stinner53b0a412016-03-26 01:12:36 +01001121 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001122 # We can't know which order SUB1 and SUB2 will appear in.
1123 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1124 # flipped: SUB2, SUB11, SUB1, TESTFN
1125 flipped = all[3][1][0] != "SUB1"
1126 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001127 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001128 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001129 self.assertEqual(all[3],
1130 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1131 self.assertEqual(all[flipped],
1132 (self.sub11_path, [], []))
1133 self.assertEqual(all[flipped + 1],
1134 (self.sub1_path, ["SUB11"], ["tmp2"]))
1135 self.assertEqual(all[2 - 2 * flipped],
1136 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001137
Victor Stinner0561c532015-03-12 10:28:24 +01001138 def test_walk_symlink(self):
1139 if not support.can_symlink():
1140 self.skipTest("need symlink support")
1141
1142 # Walk, following symlinks.
1143 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1144 for root, dirs, files in walk_it:
1145 if root == self.link_path:
1146 self.assertEqual(dirs, [])
1147 self.assertEqual(files, ["tmp4"])
1148 break
1149 else:
1150 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001151
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001152 def test_walk_bad_dir(self):
1153 # Walk top-down.
1154 errors = []
1155 walk_it = self.walk(self.walk_path, onerror=errors.append)
1156 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001157 self.assertEqual(errors, [])
1158 dir1 = 'SUB1'
1159 path1 = os.path.join(root, dir1)
1160 path1new = os.path.join(root, dir1 + '.new')
1161 os.rename(path1, path1new)
1162 try:
1163 roots = [r for r, d, f in walk_it]
1164 self.assertTrue(errors)
1165 self.assertNotIn(path1, roots)
1166 self.assertNotIn(path1new, roots)
1167 for dir2 in dirs:
1168 if dir2 != dir1:
1169 self.assertIn(os.path.join(root, dir2), roots)
1170 finally:
1171 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001172
Charles-François Natali7372b062012-02-05 15:15:38 +01001173
1174@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1175class FwalkTests(WalkTests):
1176 """Tests for os.fwalk()."""
1177
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001178 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001179 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001180 yield (root, dirs, files)
1181
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001182 def fwalk(self, *args, **kwargs):
1183 return os.fwalk(*args, **kwargs)
1184
Larry Hastingsc48fe982012-06-25 04:49:05 -07001185 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1186 """
1187 compare with walk() results.
1188 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001189 walk_kwargs = walk_kwargs.copy()
1190 fwalk_kwargs = fwalk_kwargs.copy()
1191 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1192 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1193 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001194
Charles-François Natali7372b062012-02-05 15:15:38 +01001195 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001196 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001197 expected[root] = (set(dirs), set(files))
1198
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001199 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001200 self.assertIn(root, expected)
1201 self.assertEqual(expected[root], (set(dirs), set(files)))
1202
Larry Hastingsc48fe982012-06-25 04:49:05 -07001203 def test_compare_to_walk(self):
1204 kwargs = {'top': support.TESTFN}
1205 self._compare_to_walk(kwargs, kwargs)
1206
Charles-François Natali7372b062012-02-05 15:15:38 +01001207 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001208 try:
1209 fd = os.open(".", os.O_RDONLY)
1210 walk_kwargs = {'top': support.TESTFN}
1211 fwalk_kwargs = walk_kwargs.copy()
1212 fwalk_kwargs['dir_fd'] = fd
1213 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1214 finally:
1215 os.close(fd)
1216
1217 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001218 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001219 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1220 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001221 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001222 # check that the FD is valid
1223 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001224 # redundant check
1225 os.stat(rootfd)
1226 # check that listdir() returns consistent information
1227 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001228
1229 def test_fd_leak(self):
1230 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1231 # we both check that calling fwalk() a large number of times doesn't
1232 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1233 minfd = os.dup(1)
1234 os.close(minfd)
1235 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001236 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001237 pass
1238 newfd = os.dup(1)
1239 self.addCleanup(os.close, newfd)
1240 self.assertEqual(newfd, minfd)
1241
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001242class BytesWalkTests(WalkTests):
1243 """Tests for os.walk() with bytes."""
1244 def walk(self, top, **kwargs):
1245 if 'follow_symlinks' in kwargs:
1246 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1247 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1248 root = os.fsdecode(broot)
1249 dirs = list(map(os.fsdecode, bdirs))
1250 files = list(map(os.fsdecode, bfiles))
1251 yield (root, dirs, files)
1252 bdirs[:] = list(map(os.fsencode, dirs))
1253 bfiles[:] = list(map(os.fsencode, files))
1254
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001255@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1256class BytesFwalkTests(FwalkTests):
1257 """Tests for os.walk() with bytes."""
1258 def fwalk(self, top='.', *args, **kwargs):
1259 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1260 root = os.fsdecode(broot)
1261 dirs = list(map(os.fsdecode, bdirs))
1262 files = list(map(os.fsdecode, bfiles))
1263 yield (root, dirs, files, topfd)
1264 bdirs[:] = list(map(os.fsencode, dirs))
1265 bfiles[:] = list(map(os.fsencode, files))
1266
Charles-François Natali7372b062012-02-05 15:15:38 +01001267
Guido van Rossume7ba4952007-06-06 23:52:48 +00001268class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001270 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001271
1272 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001273 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001274 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1275 os.makedirs(path) # Should work
1276 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1277 os.makedirs(path)
1278
1279 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001280 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001281 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1282 os.makedirs(path)
1283 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1284 'dir5', 'dir6')
1285 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001286
Serhiy Storchakae304e332017-03-24 13:27:42 +02001287 def test_mode(self):
1288 with support.temp_umask(0o002):
1289 base = support.TESTFN
1290 parent = os.path.join(base, 'dir1')
1291 path = os.path.join(parent, 'dir2')
1292 os.makedirs(path, 0o555)
1293 self.assertTrue(os.path.exists(path))
1294 self.assertTrue(os.path.isdir(path))
1295 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001296 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1297 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001298
Terry Reedy5a22b652010-12-02 07:05:56 +00001299 def test_exist_ok_existing_directory(self):
1300 path = os.path.join(support.TESTFN, 'dir1')
1301 mode = 0o777
1302 old_mask = os.umask(0o022)
1303 os.makedirs(path, mode)
1304 self.assertRaises(OSError, os.makedirs, path, mode)
1305 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001306 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001307 os.makedirs(path, mode=mode, exist_ok=True)
1308 os.umask(old_mask)
1309
Martin Pantera82642f2015-11-19 04:48:44 +00001310 # Issue #25583: A drive root could raise PermissionError on Windows
1311 os.makedirs(os.path.abspath('/'), exist_ok=True)
1312
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001313 def test_exist_ok_s_isgid_directory(self):
1314 path = os.path.join(support.TESTFN, 'dir1')
1315 S_ISGID = stat.S_ISGID
1316 mode = 0o777
1317 old_mask = os.umask(0o022)
1318 try:
1319 existing_testfn_mode = stat.S_IMODE(
1320 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001321 try:
1322 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001323 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001324 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001325 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1326 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1327 # The os should apply S_ISGID from the parent dir for us, but
1328 # this test need not depend on that behavior. Be explicit.
1329 os.makedirs(path, mode | S_ISGID)
1330 # http://bugs.python.org/issue14992
1331 # Should not fail when the bit is already set.
1332 os.makedirs(path, mode, exist_ok=True)
1333 # remove the bit.
1334 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001335 # May work even when the bit is not already set when demanded.
1336 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001337 finally:
1338 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001339
1340 def test_exist_ok_existing_regular_file(self):
1341 base = support.TESTFN
1342 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001343 with open(path, 'w') as f:
1344 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001345 self.assertRaises(OSError, os.makedirs, path)
1346 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1347 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1348 os.remove(path)
1349
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001350 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001351 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001352 'dir4', 'dir5', 'dir6')
1353 # If the tests failed, the bottom-most directory ('../dir6')
1354 # may not have been created, so we look for the outermost directory
1355 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001356 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001357 path = os.path.dirname(path)
1358
1359 os.removedirs(path)
1360
Andrew Svetlov405faed2012-12-25 12:18:09 +02001361
R David Murrayf2ad1732014-12-25 18:36:56 -05001362@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1363class ChownFileTests(unittest.TestCase):
1364
Berker Peksag036a71b2015-07-21 09:29:48 +03001365 @classmethod
1366 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001367 os.mkdir(support.TESTFN)
1368
1369 def test_chown_uid_gid_arguments_must_be_index(self):
1370 stat = os.stat(support.TESTFN)
1371 uid = stat.st_uid
1372 gid = stat.st_gid
1373 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1374 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1375 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1376 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1377 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1378
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001379 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1380 def test_chown_gid(self):
1381 groups = os.getgroups()
1382 if len(groups) < 2:
1383 self.skipTest("test needs at least 2 groups")
1384
R David Murrayf2ad1732014-12-25 18:36:56 -05001385 gid_1, gid_2 = groups[:2]
1386 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001387
R David Murrayf2ad1732014-12-25 18:36:56 -05001388 os.chown(support.TESTFN, uid, gid_1)
1389 gid = os.stat(support.TESTFN).st_gid
1390 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001391
R David Murrayf2ad1732014-12-25 18:36:56 -05001392 os.chown(support.TESTFN, uid, gid_2)
1393 gid = os.stat(support.TESTFN).st_gid
1394 self.assertEqual(gid, gid_2)
1395
1396 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1397 "test needs root privilege and more than one user")
1398 def test_chown_with_root(self):
1399 uid_1, uid_2 = all_users[:2]
1400 gid = os.stat(support.TESTFN).st_gid
1401 os.chown(support.TESTFN, uid_1, gid)
1402 uid = os.stat(support.TESTFN).st_uid
1403 self.assertEqual(uid, uid_1)
1404 os.chown(support.TESTFN, uid_2, gid)
1405 uid = os.stat(support.TESTFN).st_uid
1406 self.assertEqual(uid, uid_2)
1407
1408 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1409 "test needs non-root account and more than one user")
1410 def test_chown_without_permission(self):
1411 uid_1, uid_2 = all_users[:2]
1412 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001413 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001414 os.chown(support.TESTFN, uid_1, gid)
1415 os.chown(support.TESTFN, uid_2, gid)
1416
Berker Peksag036a71b2015-07-21 09:29:48 +03001417 @classmethod
1418 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001419 os.rmdir(support.TESTFN)
1420
1421
Andrew Svetlov405faed2012-12-25 12:18:09 +02001422class RemoveDirsTests(unittest.TestCase):
1423 def setUp(self):
1424 os.makedirs(support.TESTFN)
1425
1426 def tearDown(self):
1427 support.rmtree(support.TESTFN)
1428
1429 def test_remove_all(self):
1430 dira = os.path.join(support.TESTFN, 'dira')
1431 os.mkdir(dira)
1432 dirb = os.path.join(dira, 'dirb')
1433 os.mkdir(dirb)
1434 os.removedirs(dirb)
1435 self.assertFalse(os.path.exists(dirb))
1436 self.assertFalse(os.path.exists(dira))
1437 self.assertFalse(os.path.exists(support.TESTFN))
1438
1439 def test_remove_partial(self):
1440 dira = os.path.join(support.TESTFN, 'dira')
1441 os.mkdir(dira)
1442 dirb = os.path.join(dira, 'dirb')
1443 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001444 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001445 os.removedirs(dirb)
1446 self.assertFalse(os.path.exists(dirb))
1447 self.assertTrue(os.path.exists(dira))
1448 self.assertTrue(os.path.exists(support.TESTFN))
1449
1450 def test_remove_nothing(self):
1451 dira = os.path.join(support.TESTFN, 'dira')
1452 os.mkdir(dira)
1453 dirb = os.path.join(dira, 'dirb')
1454 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001455 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001456 with self.assertRaises(OSError):
1457 os.removedirs(dirb)
1458 self.assertTrue(os.path.exists(dirb))
1459 self.assertTrue(os.path.exists(dira))
1460 self.assertTrue(os.path.exists(support.TESTFN))
1461
1462
Guido van Rossume7ba4952007-06-06 23:52:48 +00001463class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001464 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001465 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001466 f.write(b'hello')
1467 f.close()
1468 with open(os.devnull, 'rb') as f:
1469 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001470
Andrew Svetlov405faed2012-12-25 12:18:09 +02001471
Guido van Rossume7ba4952007-06-06 23:52:48 +00001472class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001473 def test_urandom_length(self):
1474 self.assertEqual(len(os.urandom(0)), 0)
1475 self.assertEqual(len(os.urandom(1)), 1)
1476 self.assertEqual(len(os.urandom(10)), 10)
1477 self.assertEqual(len(os.urandom(100)), 100)
1478 self.assertEqual(len(os.urandom(1000)), 1000)
1479
1480 def test_urandom_value(self):
1481 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001482 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001483 data2 = os.urandom(16)
1484 self.assertNotEqual(data1, data2)
1485
1486 def get_urandom_subprocess(self, count):
1487 code = '\n'.join((
1488 'import os, sys',
1489 'data = os.urandom(%s)' % count,
1490 'sys.stdout.buffer.write(data)',
1491 'sys.stdout.buffer.flush()'))
1492 out = assert_python_ok('-c', code)
1493 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001494 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001495 return stdout
1496
1497 def test_urandom_subprocess(self):
1498 data1 = self.get_urandom_subprocess(16)
1499 data2 = self.get_urandom_subprocess(16)
1500 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001501
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001502
Victor Stinner9b1f4742016-09-06 16:18:52 -07001503@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1504class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001505 @classmethod
1506 def setUpClass(cls):
1507 try:
1508 os.getrandom(1)
1509 except OSError as exc:
1510 if exc.errno == errno.ENOSYS:
1511 # Python compiled on a more recent Linux version
1512 # than the current Linux kernel
1513 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1514 else:
1515 raise
1516
Victor Stinner9b1f4742016-09-06 16:18:52 -07001517 def test_getrandom_type(self):
1518 data = os.getrandom(16)
1519 self.assertIsInstance(data, bytes)
1520 self.assertEqual(len(data), 16)
1521
1522 def test_getrandom0(self):
1523 empty = os.getrandom(0)
1524 self.assertEqual(empty, b'')
1525
1526 def test_getrandom_random(self):
1527 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1528
1529 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1530 # resource /dev/random
1531
1532 def test_getrandom_nonblock(self):
1533 # The call must not fail. Check also that the flag exists
1534 try:
1535 os.getrandom(1, os.GRND_NONBLOCK)
1536 except BlockingIOError:
1537 # System urandom is not initialized yet
1538 pass
1539
1540 def test_getrandom_value(self):
1541 data1 = os.getrandom(16)
1542 data2 = os.getrandom(16)
1543 self.assertNotEqual(data1, data2)
1544
1545
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001546# os.urandom() doesn't use a file descriptor when it is implemented with the
1547# getentropy() function, the getrandom() function or the getrandom() syscall
1548OS_URANDOM_DONT_USE_FD = (
1549 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1550 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1551 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001552
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001553@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1554 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001555@unittest.skipIf(sys.platform == "vxworks",
1556 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001557class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001558 @unittest.skipUnless(resource, "test requires the resource module")
1559 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001560 # Check urandom() failing when it is not able to open /dev/random.
1561 # We spawn a new process to make the test more robust (if getrlimit()
1562 # failed to restore the file descriptor limit after this, the whole
1563 # test suite would crash; this actually happened on the OS X Tiger
1564 # buildbot).
1565 code = """if 1:
1566 import errno
1567 import os
1568 import resource
1569
1570 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1571 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1572 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001573 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001574 except OSError as e:
1575 assert e.errno == errno.EMFILE, e.errno
1576 else:
1577 raise AssertionError("OSError not raised")
1578 """
1579 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001580
Antoine Pitroue472aea2014-04-26 14:33:03 +02001581 def test_urandom_fd_closed(self):
1582 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1583 # closed.
1584 code = """if 1:
1585 import os
1586 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001587 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001588 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001589 with test.support.SuppressCrashReport():
1590 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001591 sys.stdout.buffer.write(os.urandom(4))
1592 """
1593 rc, out, err = assert_python_ok('-Sc', code)
1594
1595 def test_urandom_fd_reopened(self):
1596 # Issue #21207: urandom() should detect its fd to /dev/urandom
1597 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001598 self.addCleanup(support.unlink, support.TESTFN)
1599 create_file(support.TESTFN, b"x" * 256)
1600
Antoine Pitroue472aea2014-04-26 14:33:03 +02001601 code = """if 1:
1602 import os
1603 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001604 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001605 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001606 with test.support.SuppressCrashReport():
1607 for fd in range(3, 256):
1608 try:
1609 os.close(fd)
1610 except OSError:
1611 pass
1612 else:
1613 # Found the urandom fd (XXX hopefully)
1614 break
1615 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001616 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001617 new_fd = f.fileno()
1618 # Issue #26935: posix allows new_fd and fd to be equal but
1619 # some libc implementations have dup2 return an error in this
1620 # case.
1621 if new_fd != fd:
1622 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001623 sys.stdout.buffer.write(os.urandom(4))
1624 sys.stdout.buffer.write(os.urandom(4))
1625 """.format(TESTFN=support.TESTFN)
1626 rc, out, err = assert_python_ok('-Sc', code)
1627 self.assertEqual(len(out), 8)
1628 self.assertNotEqual(out[0:4], out[4:8])
1629 rc, out2, err2 = assert_python_ok('-Sc', code)
1630 self.assertEqual(len(out2), 8)
1631 self.assertNotEqual(out2, out)
1632
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001633
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001634@contextlib.contextmanager
1635def _execvpe_mockup(defpath=None):
1636 """
1637 Stubs out execv and execve functions when used as context manager.
1638 Records exec calls. The mock execv and execve functions always raise an
1639 exception as they would normally never return.
1640 """
1641 # A list of tuples containing (function name, first arg, args)
1642 # of calls to execv or execve that have been made.
1643 calls = []
1644
1645 def mock_execv(name, *args):
1646 calls.append(('execv', name, args))
1647 raise RuntimeError("execv called")
1648
1649 def mock_execve(name, *args):
1650 calls.append(('execve', name, args))
1651 raise OSError(errno.ENOTDIR, "execve called")
1652
1653 try:
1654 orig_execv = os.execv
1655 orig_execve = os.execve
1656 orig_defpath = os.defpath
1657 os.execv = mock_execv
1658 os.execve = mock_execve
1659 if defpath is not None:
1660 os.defpath = defpath
1661 yield calls
1662 finally:
1663 os.execv = orig_execv
1664 os.execve = orig_execve
1665 os.defpath = orig_defpath
1666
pxinwrf2d7ac72019-05-21 18:46:37 +08001667@unittest.skipUnless(hasattr(os, 'execv'),
1668 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001669class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001670 @unittest.skipIf(USING_LINUXTHREADS,
1671 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001672 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001673 self.assertRaises(OSError, os.execvpe, 'no such app-',
1674 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001675
Steve Dowerbce26262016-11-19 19:17:26 -08001676 def test_execv_with_bad_arglist(self):
1677 self.assertRaises(ValueError, os.execv, 'notepad', ())
1678 self.assertRaises(ValueError, os.execv, 'notepad', [])
1679 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1680 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1681
Thomas Heller6790d602007-08-30 17:15:14 +00001682 def test_execvpe_with_bad_arglist(self):
1683 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001684 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1685 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001686
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001687 @unittest.skipUnless(hasattr(os, '_execvpe'),
1688 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001689 def _test_internal_execvpe(self, test_type):
1690 program_path = os.sep + 'absolutepath'
1691 if test_type is bytes:
1692 program = b'executable'
1693 fullpath = os.path.join(os.fsencode(program_path), program)
1694 native_fullpath = fullpath
1695 arguments = [b'progname', 'arg1', 'arg2']
1696 else:
1697 program = 'executable'
1698 arguments = ['progname', 'arg1', 'arg2']
1699 fullpath = os.path.join(program_path, program)
1700 if os.name != "nt":
1701 native_fullpath = os.fsencode(fullpath)
1702 else:
1703 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001704 env = {'spam': 'beans'}
1705
Victor Stinnerb745a742010-05-18 17:17:23 +00001706 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001707 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001708 self.assertRaises(RuntimeError,
1709 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001710 self.assertEqual(len(calls), 1)
1711 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1712
Victor Stinnerb745a742010-05-18 17:17:23 +00001713 # test os._execvpe() with a relative path:
1714 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001715 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001716 self.assertRaises(OSError,
1717 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001718 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001719 self.assertSequenceEqual(calls[0],
1720 ('execve', native_fullpath, (arguments, env)))
1721
1722 # test os._execvpe() with a relative path:
1723 # os.get_exec_path() reads the 'PATH' variable
1724 with _execvpe_mockup() as calls:
1725 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001726 if test_type is bytes:
1727 env_path[b'PATH'] = program_path
1728 else:
1729 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001730 self.assertRaises(OSError,
1731 os._execvpe, program, arguments, env=env_path)
1732 self.assertEqual(len(calls), 1)
1733 self.assertSequenceEqual(calls[0],
1734 ('execve', native_fullpath, (arguments, env_path)))
1735
1736 def test_internal_execvpe_str(self):
1737 self._test_internal_execvpe(str)
1738 if os.name != "nt":
1739 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001740
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001741 def test_execve_invalid_env(self):
1742 args = [sys.executable, '-c', 'pass']
1743
Ville Skyttä49b27342017-08-03 09:00:59 +03001744 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001745 newenv = os.environ.copy()
1746 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1747 with self.assertRaises(ValueError):
1748 os.execve(args[0], args, newenv)
1749
Ville Skyttä49b27342017-08-03 09:00:59 +03001750 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001751 newenv = os.environ.copy()
1752 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1753 with self.assertRaises(ValueError):
1754 os.execve(args[0], args, newenv)
1755
Ville Skyttä49b27342017-08-03 09:00:59 +03001756 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001757 newenv = os.environ.copy()
1758 newenv["FRUIT=ORANGE"] = "lemon"
1759 with self.assertRaises(ValueError):
1760 os.execve(args[0], args, newenv)
1761
Alexey Izbyshev83460312018-10-20 03:28:22 +03001762 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1763 def test_execve_with_empty_path(self):
1764 # bpo-32890: Check GetLastError() misuse
1765 try:
1766 os.execve('', ['arg'], {})
1767 except OSError as e:
1768 self.assertTrue(e.winerror is None or e.winerror != 0)
1769 else:
1770 self.fail('No OSError raised')
1771
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001772
Serhiy Storchaka43767632013-11-03 21:31:38 +02001773@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001774class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001775 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001776 try:
1777 os.stat(support.TESTFN)
1778 except FileNotFoundError:
1779 exists = False
1780 except OSError as exc:
1781 exists = True
1782 self.fail("file %s must not exist; os.stat failed with %s"
1783 % (support.TESTFN, exc))
1784 else:
1785 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001786
Thomas Wouters477c8d52006-05-27 19:21:47 +00001787 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001788 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001789
1790 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001791 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001792
1793 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001794 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001795
1796 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001797 self.addCleanup(support.unlink, support.TESTFN)
1798
Victor Stinnere77c9742016-03-25 10:28:23 +01001799 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001800 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001801
1802 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001803 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001804
Thomas Wouters477c8d52006-05-27 19:21:47 +00001805 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001806 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001807
Victor Stinnere77c9742016-03-25 10:28:23 +01001808
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001809class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001810 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001811 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1812 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001813 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001814 def get_single(f):
1815 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001816 if hasattr(os, f):
1817 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001818 return helper
1819 for f in singles:
1820 locals()["test_"+f] = get_single(f)
1821
Benjamin Peterson7522c742009-01-19 21:00:09 +00001822 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001823 try:
1824 f(support.make_bad_fd(), *args)
1825 except OSError as e:
1826 self.assertEqual(e.errno, errno.EBADF)
1827 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001828 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001829 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001830
Serhiy Storchaka43767632013-11-03 21:31:38 +02001831 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001832 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001833 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001834
Serhiy Storchaka43767632013-11-03 21:31:38 +02001835 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001836 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001837 fd = support.make_bad_fd()
1838 # Make sure none of the descriptors we are about to close are
1839 # currently valid (issue 6542).
1840 for i in range(10):
1841 try: os.fstat(fd+i)
1842 except OSError:
1843 pass
1844 else:
1845 break
1846 if i < 2:
1847 raise unittest.SkipTest(
1848 "Unable to acquire a range of invalid file descriptors")
1849 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001850
Serhiy Storchaka43767632013-11-03 21:31:38 +02001851 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001852 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001853 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001854
Serhiy Storchaka43767632013-11-03 21:31:38 +02001855 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001856 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001857 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001858
Serhiy Storchaka43767632013-11-03 21:31:38 +02001859 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001860 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001862
Serhiy Storchaka43767632013-11-03 21:31:38 +02001863 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001864 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865 self.check(os.pathconf, "PC_NAME_MAX")
1866 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001867
Serhiy Storchaka43767632013-11-03 21:31:38 +02001868 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001869 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001870 self.check(os.truncate, 0)
1871 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001872
Serhiy Storchaka43767632013-11-03 21:31:38 +02001873 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001874 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001875 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001876
Serhiy Storchaka43767632013-11-03 21:31:38 +02001877 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001878 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001879 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001880
Victor Stinner57ddf782014-01-08 15:21:28 +01001881 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1882 def test_readv(self):
1883 buf = bytearray(10)
1884 self.check(os.readv, [buf])
1885
Serhiy Storchaka43767632013-11-03 21:31:38 +02001886 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001887 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001888 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001889
Serhiy Storchaka43767632013-11-03 21:31:38 +02001890 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001891 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001893
Victor Stinner57ddf782014-01-08 15:21:28 +01001894 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1895 def test_writev(self):
1896 self.check(os.writev, [b'abc'])
1897
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001898 def test_inheritable(self):
1899 self.check(os.get_inheritable)
1900 self.check(os.set_inheritable, True)
1901
1902 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1903 'needs os.get_blocking() and os.set_blocking()')
1904 def test_blocking(self):
1905 self.check(os.get_blocking)
1906 self.check(os.set_blocking, True)
1907
Brian Curtin1b9df392010-11-24 20:24:31 +00001908
1909class LinkTests(unittest.TestCase):
1910 def setUp(self):
1911 self.file1 = support.TESTFN
1912 self.file2 = os.path.join(support.TESTFN + "2")
1913
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001914 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001915 for file in (self.file1, self.file2):
1916 if os.path.exists(file):
1917 os.unlink(file)
1918
Brian Curtin1b9df392010-11-24 20:24:31 +00001919 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001920 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001921
xdegaye6a55d092017-11-12 17:57:04 +01001922 try:
1923 os.link(file1, file2)
1924 except PermissionError as e:
1925 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001926 with open(file1, "r") as f1, open(file2, "r") as f2:
1927 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1928
1929 def test_link(self):
1930 self._test_link(self.file1, self.file2)
1931
1932 def test_link_bytes(self):
1933 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1934 bytes(self.file2, sys.getfilesystemencoding()))
1935
Brian Curtinf498b752010-11-30 15:54:04 +00001936 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001937 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001938 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001939 except UnicodeError:
1940 raise unittest.SkipTest("Unable to encode for this platform.")
1941
Brian Curtinf498b752010-11-30 15:54:04 +00001942 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001943 self.file2 = self.file1 + "2"
1944 self._test_link(self.file1, self.file2)
1945
Serhiy Storchaka43767632013-11-03 21:31:38 +02001946@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1947class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001948 # uid_t and gid_t are 32-bit unsigned integers on Linux
1949 UID_OVERFLOW = (1 << 32)
1950 GID_OVERFLOW = (1 << 32)
1951
Serhiy Storchaka43767632013-11-03 21:31:38 +02001952 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1953 def test_setuid(self):
1954 if os.getuid() != 0:
1955 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001956 self.assertRaises(TypeError, os.setuid, 'not an int')
1957 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001958
Serhiy Storchaka43767632013-11-03 21:31:38 +02001959 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1960 def test_setgid(self):
1961 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1962 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001963 self.assertRaises(TypeError, os.setgid, 'not an int')
1964 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001965
Serhiy Storchaka43767632013-11-03 21:31:38 +02001966 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1967 def test_seteuid(self):
1968 if os.getuid() != 0:
1969 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001970 self.assertRaises(TypeError, os.setegid, 'not an int')
1971 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001972
Serhiy Storchaka43767632013-11-03 21:31:38 +02001973 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1974 def test_setegid(self):
1975 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1976 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001977 self.assertRaises(TypeError, os.setegid, 'not an int')
1978 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001979
Serhiy Storchaka43767632013-11-03 21:31:38 +02001980 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1981 def test_setreuid(self):
1982 if os.getuid() != 0:
1983 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001984 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1985 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1986 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1987 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001988
Serhiy Storchaka43767632013-11-03 21:31:38 +02001989 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1990 def test_setreuid_neg1(self):
1991 # Needs to accept -1. We run this in a subprocess to avoid
1992 # altering the test runner's process state (issue8045).
1993 subprocess.check_call([
1994 sys.executable, '-c',
1995 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001996
Serhiy Storchaka43767632013-11-03 21:31:38 +02001997 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1998 def test_setregid(self):
1999 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2000 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002001 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2002 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2003 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2004 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2007 def test_setregid_neg1(self):
2008 # Needs to accept -1. We run this in a subprocess to avoid
2009 # altering the test runner's process state (issue8045).
2010 subprocess.check_call([
2011 sys.executable, '-c',
2012 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002013
Serhiy Storchaka43767632013-11-03 21:31:38 +02002014@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2015class Pep383Tests(unittest.TestCase):
2016 def setUp(self):
2017 if support.TESTFN_UNENCODABLE:
2018 self.dir = support.TESTFN_UNENCODABLE
2019 elif support.TESTFN_NONASCII:
2020 self.dir = support.TESTFN_NONASCII
2021 else:
2022 self.dir = support.TESTFN
2023 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002024
Serhiy Storchaka43767632013-11-03 21:31:38 +02002025 bytesfn = []
2026 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002027 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002028 fn = os.fsencode(fn)
2029 except UnicodeEncodeError:
2030 return
2031 bytesfn.append(fn)
2032 add_filename(support.TESTFN_UNICODE)
2033 if support.TESTFN_UNENCODABLE:
2034 add_filename(support.TESTFN_UNENCODABLE)
2035 if support.TESTFN_NONASCII:
2036 add_filename(support.TESTFN_NONASCII)
2037 if not bytesfn:
2038 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002039
Serhiy Storchaka43767632013-11-03 21:31:38 +02002040 self.unicodefn = set()
2041 os.mkdir(self.dir)
2042 try:
2043 for fn in bytesfn:
2044 support.create_empty_file(os.path.join(self.bdir, fn))
2045 fn = os.fsdecode(fn)
2046 if fn in self.unicodefn:
2047 raise ValueError("duplicate filename")
2048 self.unicodefn.add(fn)
2049 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002050 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002051 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002052
Serhiy Storchaka43767632013-11-03 21:31:38 +02002053 def tearDown(self):
2054 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002055
Serhiy Storchaka43767632013-11-03 21:31:38 +02002056 def test_listdir(self):
2057 expected = self.unicodefn
2058 found = set(os.listdir(self.dir))
2059 self.assertEqual(found, expected)
2060 # test listdir without arguments
2061 current_directory = os.getcwd()
2062 try:
2063 os.chdir(os.sep)
2064 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2065 finally:
2066 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002067
Serhiy Storchaka43767632013-11-03 21:31:38 +02002068 def test_open(self):
2069 for fn in self.unicodefn:
2070 f = open(os.path.join(self.dir, fn), 'rb')
2071 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002072
Serhiy Storchaka43767632013-11-03 21:31:38 +02002073 @unittest.skipUnless(hasattr(os, 'statvfs'),
2074 "need os.statvfs()")
2075 def test_statvfs(self):
2076 # issue #9645
2077 for fn in self.unicodefn:
2078 # should not fail with file not found error
2079 fullname = os.path.join(self.dir, fn)
2080 os.statvfs(fullname)
2081
2082 def test_stat(self):
2083 for fn in self.unicodefn:
2084 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002085
Brian Curtineb24d742010-04-12 17:16:38 +00002086@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2087class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002088 def _kill(self, sig):
2089 # Start sys.executable as a subprocess and communicate from the
2090 # subprocess to the parent that the interpreter is ready. When it
2091 # becomes ready, send *sig* via os.kill to the subprocess and check
2092 # that the return code is equal to *sig*.
2093 import ctypes
2094 from ctypes import wintypes
2095 import msvcrt
2096
2097 # Since we can't access the contents of the process' stdout until the
2098 # process has exited, use PeekNamedPipe to see what's inside stdout
2099 # without waiting. This is done so we can tell that the interpreter
2100 # is started and running at a point where it could handle a signal.
2101 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2102 PeekNamedPipe.restype = wintypes.BOOL
2103 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2104 ctypes.POINTER(ctypes.c_char), # stdout buf
2105 wintypes.DWORD, # Buffer size
2106 ctypes.POINTER(wintypes.DWORD), # bytes read
2107 ctypes.POINTER(wintypes.DWORD), # bytes avail
2108 ctypes.POINTER(wintypes.DWORD)) # bytes left
2109 msg = "running"
2110 proc = subprocess.Popen([sys.executable, "-c",
2111 "import sys;"
2112 "sys.stdout.write('{}');"
2113 "sys.stdout.flush();"
2114 "input()".format(msg)],
2115 stdout=subprocess.PIPE,
2116 stderr=subprocess.PIPE,
2117 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002118 self.addCleanup(proc.stdout.close)
2119 self.addCleanup(proc.stderr.close)
2120 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002121
2122 count, max = 0, 100
2123 while count < max and proc.poll() is None:
2124 # Create a string buffer to store the result of stdout from the pipe
2125 buf = ctypes.create_string_buffer(len(msg))
2126 # Obtain the text currently in proc.stdout
2127 # Bytes read/avail/left are left as NULL and unused
2128 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2129 buf, ctypes.sizeof(buf), None, None, None)
2130 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2131 if buf.value:
2132 self.assertEqual(msg, buf.value.decode())
2133 break
2134 time.sleep(0.1)
2135 count += 1
2136 else:
2137 self.fail("Did not receive communication from the subprocess")
2138
Brian Curtineb24d742010-04-12 17:16:38 +00002139 os.kill(proc.pid, sig)
2140 self.assertEqual(proc.wait(), sig)
2141
2142 def test_kill_sigterm(self):
2143 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002144 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002145
2146 def test_kill_int(self):
2147 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002148 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002149
2150 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002151 tagname = "test_os_%s" % uuid.uuid1()
2152 m = mmap.mmap(-1, 1, tagname)
2153 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002154 # Run a script which has console control handling enabled.
2155 proc = subprocess.Popen([sys.executable,
2156 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002157 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002158 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2159 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002160 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002161 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002162 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002163 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002164 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002165 count += 1
2166 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002167 # Forcefully kill the process if we weren't able to signal it.
2168 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002169 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002170 os.kill(proc.pid, event)
2171 # proc.send_signal(event) could also be done here.
2172 # Allow time for the signal to be passed and the process to exit.
2173 time.sleep(0.5)
2174 if not proc.poll():
2175 # Forcefully kill the process if we weren't able to signal it.
2176 os.kill(proc.pid, signal.SIGINT)
2177 self.fail("subprocess did not stop on {}".format(name))
2178
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002179 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002180 def test_CTRL_C_EVENT(self):
2181 from ctypes import wintypes
2182 import ctypes
2183
2184 # Make a NULL value by creating a pointer with no argument.
2185 NULL = ctypes.POINTER(ctypes.c_int)()
2186 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2187 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2188 wintypes.BOOL)
2189 SetConsoleCtrlHandler.restype = wintypes.BOOL
2190
2191 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002192 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002193 # by subprocesses.
2194 SetConsoleCtrlHandler(NULL, 0)
2195
2196 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2197
2198 def test_CTRL_BREAK_EVENT(self):
2199 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2200
2201
Brian Curtind40e6f72010-07-08 21:39:08 +00002202@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002203class Win32ListdirTests(unittest.TestCase):
2204 """Test listdir on Windows."""
2205
2206 def setUp(self):
2207 self.created_paths = []
2208 for i in range(2):
2209 dir_name = 'SUB%d' % i
2210 dir_path = os.path.join(support.TESTFN, dir_name)
2211 file_name = 'FILE%d' % i
2212 file_path = os.path.join(support.TESTFN, file_name)
2213 os.makedirs(dir_path)
2214 with open(file_path, 'w') as f:
2215 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2216 self.created_paths.extend([dir_name, file_name])
2217 self.created_paths.sort()
2218
2219 def tearDown(self):
2220 shutil.rmtree(support.TESTFN)
2221
2222 def test_listdir_no_extended_path(self):
2223 """Test when the path is not an "extended" path."""
2224 # unicode
2225 self.assertEqual(
2226 sorted(os.listdir(support.TESTFN)),
2227 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002228
Tim Golden781bbeb2013-10-25 20:24:06 +01002229 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002230 self.assertEqual(
2231 sorted(os.listdir(os.fsencode(support.TESTFN))),
2232 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002233
2234 def test_listdir_extended_path(self):
2235 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002236 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002237 # unicode
2238 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2239 self.assertEqual(
2240 sorted(os.listdir(path)),
2241 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002242
Tim Golden781bbeb2013-10-25 20:24:06 +01002243 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002244 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2245 self.assertEqual(
2246 sorted(os.listdir(path)),
2247 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002248
2249
Berker Peksage0b5b202018-08-15 13:03:41 +03002250@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2251class ReadlinkTests(unittest.TestCase):
2252 filelink = 'readlinktest'
2253 filelink_target = os.path.abspath(__file__)
2254 filelinkb = os.fsencode(filelink)
2255 filelinkb_target = os.fsencode(filelink_target)
2256
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002257 def assertPathEqual(self, left, right):
2258 left = os.path.normcase(left)
2259 right = os.path.normcase(right)
2260 if sys.platform == 'win32':
2261 # Bad practice to blindly strip the prefix as it may be required to
2262 # correctly refer to the file, but we're only comparing paths here.
2263 has_prefix = lambda p: p.startswith(
2264 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2265 if has_prefix(left):
2266 left = left[4:]
2267 if has_prefix(right):
2268 right = right[4:]
2269 self.assertEqual(left, right)
2270
Berker Peksage0b5b202018-08-15 13:03:41 +03002271 def setUp(self):
2272 self.assertTrue(os.path.exists(self.filelink_target))
2273 self.assertTrue(os.path.exists(self.filelinkb_target))
2274 self.assertFalse(os.path.exists(self.filelink))
2275 self.assertFalse(os.path.exists(self.filelinkb))
2276
2277 def test_not_symlink(self):
2278 filelink_target = FakePath(self.filelink_target)
2279 self.assertRaises(OSError, os.readlink, self.filelink_target)
2280 self.assertRaises(OSError, os.readlink, filelink_target)
2281
2282 def test_missing_link(self):
2283 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2284 self.assertRaises(FileNotFoundError, os.readlink,
2285 FakePath('missing-link'))
2286
2287 @support.skip_unless_symlink
2288 def test_pathlike(self):
2289 os.symlink(self.filelink_target, self.filelink)
2290 self.addCleanup(support.unlink, self.filelink)
2291 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002292 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002293
2294 @support.skip_unless_symlink
2295 def test_pathlike_bytes(self):
2296 os.symlink(self.filelinkb_target, self.filelinkb)
2297 self.addCleanup(support.unlink, self.filelinkb)
2298 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002299 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002300 self.assertIsInstance(path, bytes)
2301
2302 @support.skip_unless_symlink
2303 def test_bytes(self):
2304 os.symlink(self.filelinkb_target, self.filelinkb)
2305 self.addCleanup(support.unlink, self.filelinkb)
2306 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002307 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002308 self.assertIsInstance(path, bytes)
2309
2310
Tim Golden781bbeb2013-10-25 20:24:06 +01002311@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002312@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002313class Win32SymlinkTests(unittest.TestCase):
2314 filelink = 'filelinktest'
2315 filelink_target = os.path.abspath(__file__)
2316 dirlink = 'dirlinktest'
2317 dirlink_target = os.path.dirname(filelink_target)
2318 missing_link = 'missing link'
2319
2320 def setUp(self):
2321 assert os.path.exists(self.dirlink_target)
2322 assert os.path.exists(self.filelink_target)
2323 assert not os.path.exists(self.dirlink)
2324 assert not os.path.exists(self.filelink)
2325 assert not os.path.exists(self.missing_link)
2326
2327 def tearDown(self):
2328 if os.path.exists(self.filelink):
2329 os.remove(self.filelink)
2330 if os.path.exists(self.dirlink):
2331 os.rmdir(self.dirlink)
2332 if os.path.lexists(self.missing_link):
2333 os.remove(self.missing_link)
2334
2335 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002336 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002337 self.assertTrue(os.path.exists(self.dirlink))
2338 self.assertTrue(os.path.isdir(self.dirlink))
2339 self.assertTrue(os.path.islink(self.dirlink))
2340 self.check_stat(self.dirlink, self.dirlink_target)
2341
2342 def test_file_link(self):
2343 os.symlink(self.filelink_target, self.filelink)
2344 self.assertTrue(os.path.exists(self.filelink))
2345 self.assertTrue(os.path.isfile(self.filelink))
2346 self.assertTrue(os.path.islink(self.filelink))
2347 self.check_stat(self.filelink, self.filelink_target)
2348
2349 def _create_missing_dir_link(self):
2350 'Create a "directory" link to a non-existent target'
2351 linkname = self.missing_link
2352 if os.path.lexists(linkname):
2353 os.remove(linkname)
2354 target = r'c:\\target does not exist.29r3c740'
2355 assert not os.path.exists(target)
2356 target_is_dir = True
2357 os.symlink(target, linkname, target_is_dir)
2358
2359 def test_remove_directory_link_to_missing_target(self):
2360 self._create_missing_dir_link()
2361 # For compatibility with Unix, os.remove will check the
2362 # directory status and call RemoveDirectory if the symlink
2363 # was created with target_is_dir==True.
2364 os.remove(self.missing_link)
2365
Brian Curtind40e6f72010-07-08 21:39:08 +00002366 def test_isdir_on_directory_link_to_missing_target(self):
2367 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002368 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002369
Brian Curtind40e6f72010-07-08 21:39:08 +00002370 def test_rmdir_on_directory_link_to_missing_target(self):
2371 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002372 os.rmdir(self.missing_link)
2373
2374 def check_stat(self, link, target):
2375 self.assertEqual(os.stat(link), os.stat(target))
2376 self.assertNotEqual(os.lstat(link), os.stat(link))
2377
Brian Curtind25aef52011-06-13 15:16:04 -05002378 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002379 self.assertEqual(os.stat(bytes_link), os.stat(target))
2380 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002381
2382 def test_12084(self):
2383 level1 = os.path.abspath(support.TESTFN)
2384 level2 = os.path.join(level1, "level2")
2385 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002386 self.addCleanup(support.rmtree, level1)
2387
2388 os.mkdir(level1)
2389 os.mkdir(level2)
2390 os.mkdir(level3)
2391
2392 file1 = os.path.abspath(os.path.join(level1, "file1"))
2393 create_file(file1)
2394
2395 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002396 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002397 os.chdir(level2)
2398 link = os.path.join(level2, "link")
2399 os.symlink(os.path.relpath(file1), "link")
2400 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002401
Victor Stinnerae39d232016-03-24 17:12:55 +01002402 # Check os.stat calls from the same dir as the link
2403 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002404
Victor Stinnerae39d232016-03-24 17:12:55 +01002405 # Check os.stat calls from a dir below the link
2406 os.chdir(level1)
2407 self.assertEqual(os.stat(file1),
2408 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002409
Victor Stinnerae39d232016-03-24 17:12:55 +01002410 # Check os.stat calls from a dir above the link
2411 os.chdir(level3)
2412 self.assertEqual(os.stat(file1),
2413 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002414 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002415 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002416
SSE43c34aad2018-02-13 00:10:35 +07002417 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2418 and os.path.exists(r'C:\ProgramData'),
2419 'Test directories not found')
2420 def test_29248(self):
2421 # os.symlink() calls CreateSymbolicLink, which creates
2422 # the reparse data buffer with the print name stored
2423 # first, so the offset is always 0. CreateSymbolicLink
2424 # stores the "PrintName" DOS path (e.g. "C:\") first,
2425 # with an offset of 0, followed by the "SubstituteName"
2426 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2427 # the other hand, seems to have been created manually
2428 # with an inverted order.
2429 target = os.readlink(r'C:\Users\All Users')
2430 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2431
Steve Dower6921e732018-03-05 14:26:08 -08002432 def test_buffer_overflow(self):
2433 # Older versions would have a buffer overflow when detecting
2434 # whether a link source was a directory. This test ensures we
2435 # no longer crash, but does not otherwise validate the behavior
2436 segment = 'X' * 27
2437 path = os.path.join(*[segment] * 10)
2438 test_cases = [
2439 # overflow with absolute src
2440 ('\\' + path, segment),
2441 # overflow dest with relative src
2442 (segment, path),
2443 # overflow when joining src
2444 (path[:180], path[:180]),
2445 ]
2446 for src, dest in test_cases:
2447 try:
2448 os.symlink(src, dest)
2449 except FileNotFoundError:
2450 pass
2451 else:
2452 try:
2453 os.remove(dest)
2454 except OSError:
2455 pass
2456 # Also test with bytes, since that is a separate code path.
2457 try:
2458 os.symlink(os.fsencode(src), os.fsencode(dest))
2459 except FileNotFoundError:
2460 pass
2461 else:
2462 try:
2463 os.remove(dest)
2464 except OSError:
2465 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002466
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002467 def test_appexeclink(self):
2468 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
2469 aliases = [os.path.join(root, a)
2470 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2471
2472 for alias in aliases:
2473 if support.verbose:
2474 print()
2475 print("Testing with", alias)
2476 st = os.lstat(alias)
2477 self.assertEqual(st, os.stat(alias))
2478 self.assertFalse(stat.S_ISLNK(st.st_mode))
2479 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2480 # testing the first one we see is sufficient
2481 break
2482 else:
2483 self.skipTest("test requires an app execution alias")
2484
Tim Golden0321cf22014-05-05 19:46:17 +01002485@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2486class Win32JunctionTests(unittest.TestCase):
2487 junction = 'junctiontest'
2488 junction_target = os.path.dirname(os.path.abspath(__file__))
2489
2490 def setUp(self):
2491 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002492 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002493
2494 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002495 if os.path.lexists(self.junction):
2496 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002497
2498 def test_create_junction(self):
2499 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002500 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002501 self.assertTrue(os.path.exists(self.junction))
2502 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002503 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2504 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002505
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002506 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002507 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002508 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2509 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002510
2511 def test_unlink_removes_junction(self):
2512 _winapi.CreateJunction(self.junction_target, self.junction)
2513 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002514 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002515
2516 os.unlink(self.junction)
2517 self.assertFalse(os.path.exists(self.junction))
2518
Mark Becwarb82bfac2019-02-02 16:08:23 -05002519@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2520class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002521 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002522 nt = support.import_module('nt')
2523 ctypes = support.import_module('ctypes')
2524 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002525
2526 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2527 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2528
2529 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2530 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2531 ctypes.wintypes.LPDWORD)
2532
2533 # This is a pseudo-handle that doesn't need to be closed
2534 hproc = kernel.GetCurrentProcess()
2535
2536 handle_count = ctypes.wintypes.DWORD()
2537 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2538 self.assertEqual(1, ok)
2539
2540 before_count = handle_count.value
2541
2542 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002543 filenames = [
2544 r'\\?\C:',
2545 r'\\?\NUL',
2546 r'\\?\CONIN',
2547 __file__,
2548 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002549
Berker Peksag6ef726a2019-04-22 18:46:28 +03002550 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002551 for name in filenames:
2552 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002553 nt._getfinalpathname(name)
2554 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002555 # Failure is expected
2556 pass
2557 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002558 os.stat(name)
2559 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002560 pass
2561
2562 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2563 self.assertEqual(1, ok)
2564
2565 handle_delta = handle_count.value - before_count
2566
2567 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002568
Jason R. Coombs3a092862013-05-27 23:21:28 -04002569@support.skip_unless_symlink
2570class NonLocalSymlinkTests(unittest.TestCase):
2571
2572 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002573 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002574 Create this structure:
2575
2576 base
2577 \___ some_dir
2578 """
2579 os.makedirs('base/some_dir')
2580
2581 def tearDown(self):
2582 shutil.rmtree('base')
2583
2584 def test_directory_link_nonlocal(self):
2585 """
2586 The symlink target should resolve relative to the link, not relative
2587 to the current directory.
2588
2589 Then, link base/some_link -> base/some_dir and ensure that some_link
2590 is resolved as a directory.
2591
2592 In issue13772, it was discovered that directory detection failed if
2593 the symlink target was not specified relative to the current
2594 directory, which was a defect in the implementation.
2595 """
2596 src = os.path.join('base', 'some_link')
2597 os.symlink('some_dir', src)
2598 assert os.path.isdir(src)
2599
2600
Victor Stinnere8d51452010-08-19 01:05:19 +00002601class FSEncodingTests(unittest.TestCase):
2602 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002603 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2604 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002605
Victor Stinnere8d51452010-08-19 01:05:19 +00002606 def test_identity(self):
2607 # assert fsdecode(fsencode(x)) == x
2608 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2609 try:
2610 bytesfn = os.fsencode(fn)
2611 except UnicodeEncodeError:
2612 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002613 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002614
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002615
Brett Cannonefb00c02012-02-29 18:31:31 -05002616
2617class DeviceEncodingTests(unittest.TestCase):
2618
2619 def test_bad_fd(self):
2620 # Return None when an fd doesn't actually exist.
2621 self.assertIsNone(os.device_encoding(123456))
2622
Paul Monson62dfd7d2019-04-25 11:36:45 -07002623 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002624 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002625 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002626 def test_device_encoding(self):
2627 encoding = os.device_encoding(0)
2628 self.assertIsNotNone(encoding)
2629 self.assertTrue(codecs.lookup(encoding))
2630
2631
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002632class PidTests(unittest.TestCase):
2633 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2634 def test_getppid(self):
2635 p = subprocess.Popen([sys.executable, '-c',
2636 'import os; print(os.getppid())'],
2637 stdout=subprocess.PIPE)
2638 stdout, _ = p.communicate()
2639 # We are the parent of our subprocess
2640 self.assertEqual(int(stdout), os.getpid())
2641
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002642 def test_waitpid(self):
2643 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002644 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002645 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002646 status = os.waitpid(pid, 0)
2647 self.assertEqual(status, (pid, 0))
2648
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002649
Victor Stinner4659ccf2016-09-14 10:57:00 +02002650class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002651 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002652 self.exitcode = 17
2653
2654 filename = support.TESTFN
2655 self.addCleanup(support.unlink, filename)
2656
2657 if not with_env:
2658 code = 'import sys; sys.exit(%s)' % self.exitcode
2659 else:
2660 self.env = dict(os.environ)
2661 # create an unique key
2662 self.key = str(uuid.uuid4())
2663 self.env[self.key] = self.key
2664 # read the variable from os.environ to check that it exists
2665 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2666 % (self.key, self.exitcode))
2667
2668 with open(filename, "w") as fp:
2669 fp.write(code)
2670
Berker Peksag81816462016-09-15 20:19:47 +03002671 args = [sys.executable, filename]
2672 if use_bytes:
2673 args = [os.fsencode(a) for a in args]
2674 self.env = {os.fsencode(k): os.fsencode(v)
2675 for k, v in self.env.items()}
2676
2677 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002678
Berker Peksag4af23d72016-09-15 20:32:44 +03002679 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002680 def test_spawnl(self):
2681 args = self.create_args()
2682 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2683 self.assertEqual(exitcode, self.exitcode)
2684
Berker Peksag4af23d72016-09-15 20:32:44 +03002685 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002686 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002687 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002688 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2689 self.assertEqual(exitcode, self.exitcode)
2690
Berker Peksag4af23d72016-09-15 20:32:44 +03002691 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002692 def test_spawnlp(self):
2693 args = self.create_args()
2694 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2695 self.assertEqual(exitcode, self.exitcode)
2696
Berker Peksag4af23d72016-09-15 20:32:44 +03002697 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002698 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002699 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002700 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2701 self.assertEqual(exitcode, self.exitcode)
2702
Berker Peksag4af23d72016-09-15 20:32:44 +03002703 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002704 def test_spawnv(self):
2705 args = self.create_args()
2706 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2707 self.assertEqual(exitcode, self.exitcode)
2708
Berker Peksag4af23d72016-09-15 20:32:44 +03002709 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002710 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002711 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002712 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2713 self.assertEqual(exitcode, self.exitcode)
2714
Berker Peksag4af23d72016-09-15 20:32:44 +03002715 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002716 def test_spawnvp(self):
2717 args = self.create_args()
2718 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2719 self.assertEqual(exitcode, self.exitcode)
2720
Berker Peksag4af23d72016-09-15 20:32:44 +03002721 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002722 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002723 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002724 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2725 self.assertEqual(exitcode, self.exitcode)
2726
Berker Peksag4af23d72016-09-15 20:32:44 +03002727 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002728 def test_nowait(self):
2729 args = self.create_args()
2730 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2731 result = os.waitpid(pid, 0)
2732 self.assertEqual(result[0], pid)
2733 status = result[1]
2734 if hasattr(os, 'WIFEXITED'):
2735 self.assertTrue(os.WIFEXITED(status))
2736 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2737 else:
2738 self.assertEqual(status, self.exitcode << 8)
2739
Berker Peksag4af23d72016-09-15 20:32:44 +03002740 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002741 def test_spawnve_bytes(self):
2742 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2743 args = self.create_args(with_env=True, use_bytes=True)
2744 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2745 self.assertEqual(exitcode, self.exitcode)
2746
Steve Dower859fd7b2016-11-19 18:53:19 -08002747 @requires_os_func('spawnl')
2748 def test_spawnl_noargs(self):
2749 args = self.create_args()
2750 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002751 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002752
2753 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002754 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002755 args = self.create_args()
2756 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002757 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002758
2759 @requires_os_func('spawnv')
2760 def test_spawnv_noargs(self):
2761 args = self.create_args()
2762 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2763 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002764 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2765 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002766
2767 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002768 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002769 args = self.create_args()
2770 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2771 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002772 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2773 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002774
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002775 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002776 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002777
Ville Skyttä49b27342017-08-03 09:00:59 +03002778 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002779 newenv = os.environ.copy()
2780 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2781 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002782 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002783 except ValueError:
2784 pass
2785 else:
2786 self.assertEqual(exitcode, 127)
2787
Ville Skyttä49b27342017-08-03 09:00:59 +03002788 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002789 newenv = os.environ.copy()
2790 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2791 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002792 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002793 except ValueError:
2794 pass
2795 else:
2796 self.assertEqual(exitcode, 127)
2797
Ville Skyttä49b27342017-08-03 09:00:59 +03002798 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002799 newenv = os.environ.copy()
2800 newenv["FRUIT=ORANGE"] = "lemon"
2801 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002802 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002803 except ValueError:
2804 pass
2805 else:
2806 self.assertEqual(exitcode, 127)
2807
Ville Skyttä49b27342017-08-03 09:00:59 +03002808 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002809 filename = support.TESTFN
2810 self.addCleanup(support.unlink, filename)
2811 with open(filename, "w") as fp:
2812 fp.write('import sys, os\n'
2813 'if os.getenv("FRUIT") != "orange=lemon":\n'
2814 ' raise AssertionError')
2815 args = [sys.executable, filename]
2816 newenv = os.environ.copy()
2817 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002818 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002819 self.assertEqual(exitcode, 0)
2820
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002821 @requires_os_func('spawnve')
2822 def test_spawnve_invalid_env(self):
2823 self._test_invalid_env(os.spawnve)
2824
2825 @requires_os_func('spawnvpe')
2826 def test_spawnvpe_invalid_env(self):
2827 self._test_invalid_env(os.spawnvpe)
2828
Serhiy Storchaka77703942017-06-25 07:33:01 +03002829
Brian Curtin0151b8e2010-09-24 13:43:43 +00002830# The introduction of this TestCase caused at least two different errors on
2831# *nix buildbots. Temporarily skip this to let the buildbots move along.
2832@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002833@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2834class LoginTests(unittest.TestCase):
2835 def test_getlogin(self):
2836 user_name = os.getlogin()
2837 self.assertNotEqual(len(user_name), 0)
2838
2839
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002840@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2841 "needs os.getpriority and os.setpriority")
2842class ProgramPriorityTests(unittest.TestCase):
2843 """Tests for os.getpriority() and os.setpriority()."""
2844
2845 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002846
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002847 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2848 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2849 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002850 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2851 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002852 raise unittest.SkipTest("unable to reliably test setpriority "
2853 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002854 else:
2855 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002856 finally:
2857 try:
2858 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2859 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002860 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002861 raise
2862
2863
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002864class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002865
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002866 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002867
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002868 def __init__(self, conn):
2869 asynchat.async_chat.__init__(self, conn)
2870 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002871 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002872 self.closed = False
2873 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002874
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002875 def handle_read(self):
2876 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002877 if self.accumulate:
2878 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002879
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002880 def get_data(self):
2881 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002882
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002883 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002884 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002885 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002886
2887 def handle_error(self):
2888 raise
2889
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002890 def __init__(self, address):
2891 threading.Thread.__init__(self)
2892 asyncore.dispatcher.__init__(self)
2893 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2894 self.bind(address)
2895 self.listen(5)
2896 self.host, self.port = self.socket.getsockname()[:2]
2897 self.handler_instance = None
2898 self._active = False
2899 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002900
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002901 # --- public API
2902
2903 @property
2904 def running(self):
2905 return self._active
2906
2907 def start(self):
2908 assert not self.running
2909 self.__flag = threading.Event()
2910 threading.Thread.start(self)
2911 self.__flag.wait()
2912
2913 def stop(self):
2914 assert self.running
2915 self._active = False
2916 self.join()
2917
2918 def wait(self):
2919 # wait for handler connection to be closed, then stop the server
2920 while not getattr(self.handler_instance, "closed", False):
2921 time.sleep(0.001)
2922 self.stop()
2923
2924 # --- internals
2925
2926 def run(self):
2927 self._active = True
2928 self.__flag.set()
2929 while self._active and asyncore.socket_map:
2930 self._active_lock.acquire()
2931 asyncore.loop(timeout=0.001, count=1)
2932 self._active_lock.release()
2933 asyncore.close_all()
2934
2935 def handle_accept(self):
2936 conn, addr = self.accept()
2937 self.handler_instance = self.Handler(conn)
2938
2939 def handle_connect(self):
2940 self.close()
2941 handle_read = handle_connect
2942
2943 def writable(self):
2944 return 0
2945
2946 def handle_error(self):
2947 raise
2948
2949
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002950@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2951class TestSendfile(unittest.TestCase):
2952
Victor Stinner8c663fd2017-11-08 14:44:44 -08002953 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002954 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002955 not sys.platform.startswith("solaris") and \
2956 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002957 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2958 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002959 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2960 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002961
2962 @classmethod
2963 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002964 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002965 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002966
2967 @classmethod
2968 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002969 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002970 support.unlink(support.TESTFN)
2971
2972 def setUp(self):
2973 self.server = SendfileTestServer((support.HOST, 0))
2974 self.server.start()
2975 self.client = socket.socket()
2976 self.client.connect((self.server.host, self.server.port))
2977 self.client.settimeout(1)
2978 # synchronize by waiting for "220 ready" response
2979 self.client.recv(1024)
2980 self.sockno = self.client.fileno()
2981 self.file = open(support.TESTFN, 'rb')
2982 self.fileno = self.file.fileno()
2983
2984 def tearDown(self):
2985 self.file.close()
2986 self.client.close()
2987 if self.server.running:
2988 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002989 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002990
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002991 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002992 """A higher level wrapper representing how an application is
2993 supposed to use sendfile().
2994 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002995 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002996 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002997 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002998 except OSError as err:
2999 if err.errno == errno.ECONNRESET:
3000 # disconnected
3001 raise
3002 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3003 # we have to retry send data
3004 continue
3005 else:
3006 raise
3007
3008 def test_send_whole_file(self):
3009 # normal send
3010 total_sent = 0
3011 offset = 0
3012 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003013 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003014 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3015 if sent == 0:
3016 break
3017 offset += sent
3018 total_sent += sent
3019 self.assertTrue(sent <= nbytes)
3020 self.assertEqual(offset, total_sent)
3021
3022 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003023 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003024 self.client.close()
3025 self.server.wait()
3026 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003027 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003028 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003029
3030 def test_send_at_certain_offset(self):
3031 # start sending a file at a certain offset
3032 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003033 offset = len(self.DATA) // 2
3034 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003035 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003036 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003037 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3038 if sent == 0:
3039 break
3040 offset += sent
3041 total_sent += sent
3042 self.assertTrue(sent <= nbytes)
3043
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003044 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003045 self.client.close()
3046 self.server.wait()
3047 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003048 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003049 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003050 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003051 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003052
3053 def test_offset_overflow(self):
3054 # specify an offset > file size
3055 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003056 try:
3057 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3058 except OSError as e:
3059 # Solaris can raise EINVAL if offset >= file length, ignore.
3060 if e.errno != errno.EINVAL:
3061 raise
3062 else:
3063 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003064 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003065 self.client.close()
3066 self.server.wait()
3067 data = self.server.handler_instance.get_data()
3068 self.assertEqual(data, b'')
3069
3070 def test_invalid_offset(self):
3071 with self.assertRaises(OSError) as cm:
3072 os.sendfile(self.sockno, self.fileno, -1, 4096)
3073 self.assertEqual(cm.exception.errno, errno.EINVAL)
3074
Martin Panterbf19d162015-09-09 01:01:13 +00003075 def test_keywords(self):
3076 # Keyword arguments should be supported
3077 os.sendfile(out=self.sockno, offset=0, count=4096,
3078 **{'in': self.fileno})
3079 if self.SUPPORT_HEADERS_TRAILERS:
3080 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00003081 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003082
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003083 # --- headers / trailers tests
3084
Serhiy Storchaka43767632013-11-03 21:31:38 +02003085 @requires_headers_trailers
3086 def test_headers(self):
3087 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003088 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003089 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003090 headers=[b"x" * 512, b"y" * 256])
3091 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003092 total_sent += sent
3093 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003094 while total_sent < len(expected_data):
3095 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003096 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3097 offset, nbytes)
3098 if sent == 0:
3099 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003100 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003101 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003102 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003103
Serhiy Storchaka43767632013-11-03 21:31:38 +02003104 self.assertEqual(total_sent, len(expected_data))
3105 self.client.close()
3106 self.server.wait()
3107 data = self.server.handler_instance.get_data()
3108 self.assertEqual(hash(data), hash(expected_data))
3109
3110 @requires_headers_trailers
3111 def test_trailers(self):
3112 TESTFN2 = support.TESTFN + "2"
3113 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003114
3115 self.addCleanup(support.unlink, TESTFN2)
3116 create_file(TESTFN2, file_data)
3117
3118 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003119 os.sendfile(self.sockno, f.fileno(), 0, 5,
3120 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003121 self.client.close()
3122 self.server.wait()
3123 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003124 self.assertEqual(data, b"abcde123456789")
3125
3126 @requires_headers_trailers
3127 @requires_32b
3128 def test_headers_overflow_32bits(self):
3129 self.server.handler_instance.accumulate = False
3130 with self.assertRaises(OSError) as cm:
3131 os.sendfile(self.sockno, self.fileno, 0, 0,
3132 headers=[b"x" * 2**16] * 2**15)
3133 self.assertEqual(cm.exception.errno, errno.EINVAL)
3134
3135 @requires_headers_trailers
3136 @requires_32b
3137 def test_trailers_overflow_32bits(self):
3138 self.server.handler_instance.accumulate = False
3139 with self.assertRaises(OSError) as cm:
3140 os.sendfile(self.sockno, self.fileno, 0, 0,
3141 trailers=[b"x" * 2**16] * 2**15)
3142 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003143
Serhiy Storchaka43767632013-11-03 21:31:38 +02003144 @requires_headers_trailers
3145 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3146 'test needs os.SF_NODISKIO')
3147 def test_flags(self):
3148 try:
3149 os.sendfile(self.sockno, self.fileno, 0, 4096,
3150 flags=os.SF_NODISKIO)
3151 except OSError as err:
3152 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3153 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003154
3155
Larry Hastings9cf065c2012-06-22 16:30:09 -07003156def supports_extended_attributes():
3157 if not hasattr(os, "setxattr"):
3158 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003159
Larry Hastings9cf065c2012-06-22 16:30:09 -07003160 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003161 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003162 try:
3163 os.setxattr(fp.fileno(), b"user.test", b"")
3164 except OSError:
3165 return False
3166 finally:
3167 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003168
3169 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003170
3171
3172@unittest.skipUnless(supports_extended_attributes(),
3173 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003174# Kernels < 2.6.39 don't respect setxattr flags.
3175@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003176class ExtendedAttributeTests(unittest.TestCase):
3177
Larry Hastings9cf065c2012-06-22 16:30:09 -07003178 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003179 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003180 self.addCleanup(support.unlink, fn)
3181 create_file(fn)
3182
Benjamin Peterson799bd802011-08-31 22:15:17 -04003183 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003184 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003185 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003186
Victor Stinnerf12e5062011-10-16 22:12:03 +02003187 init_xattr = listxattr(fn)
3188 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003189
Larry Hastings9cf065c2012-06-22 16:30:09 -07003190 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003191 xattr = set(init_xattr)
3192 xattr.add("user.test")
3193 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003194 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3195 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3196 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003197
Benjamin Peterson799bd802011-08-31 22:15:17 -04003198 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003199 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003200 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003201
Benjamin Peterson799bd802011-08-31 22:15:17 -04003202 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003203 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003204 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003205
Larry Hastings9cf065c2012-06-22 16:30:09 -07003206 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003207 xattr.add("user.test2")
3208 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003209 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003210
Benjamin Peterson799bd802011-08-31 22:15:17 -04003211 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003212 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003213 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003214
Victor Stinnerf12e5062011-10-16 22:12:03 +02003215 xattr.remove("user.test")
3216 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003217 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3218 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3219 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3220 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003221 many = sorted("user.test{}".format(i) for i in range(100))
3222 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003223 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003224 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003225
Larry Hastings9cf065c2012-06-22 16:30:09 -07003226 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003227 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003228 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003229
3230 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3231 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003232
3233 def test_simple(self):
3234 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3235 os.listxattr)
3236
3237 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003238 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3239 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003240
3241 def test_fds(self):
3242 def getxattr(path, *args):
3243 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003244 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003245 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003246 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003247 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003248 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003249 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003250 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003251 def listxattr(path, *args):
3252 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003253 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003254 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3255
3256
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003257@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3258class TermsizeTests(unittest.TestCase):
3259 def test_does_not_crash(self):
3260 """Check if get_terminal_size() returns a meaningful value.
3261
3262 There's no easy portable way to actually check the size of the
3263 terminal, so let's check if it returns something sensible instead.
3264 """
3265 try:
3266 size = os.get_terminal_size()
3267 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003268 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003269 # Under win32 a generic OSError can be thrown if the
3270 # handle cannot be retrieved
3271 self.skipTest("failed to query terminal size")
3272 raise
3273
Antoine Pitroucfade362012-02-08 23:48:59 +01003274 self.assertGreaterEqual(size.columns, 0)
3275 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003276
3277 def test_stty_match(self):
3278 """Check if stty returns the same results
3279
3280 stty actually tests stdin, so get_terminal_size is invoked on
3281 stdin explicitly. If stty succeeded, then get_terminal_size()
3282 should work too.
3283 """
3284 try:
3285 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003286 except (FileNotFoundError, subprocess.CalledProcessError,
3287 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003288 self.skipTest("stty invocation failed")
3289 expected = (int(size[1]), int(size[0])) # reversed order
3290
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003291 try:
3292 actual = os.get_terminal_size(sys.__stdin__.fileno())
3293 except OSError as e:
3294 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3295 # Under win32 a generic OSError can be thrown if the
3296 # handle cannot be retrieved
3297 self.skipTest("failed to query terminal size")
3298 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003299 self.assertEqual(expected, actual)
3300
3301
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003302@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003303@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003304class MemfdCreateTests(unittest.TestCase):
3305 def test_memfd_create(self):
3306 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3307 self.assertNotEqual(fd, -1)
3308 self.addCleanup(os.close, fd)
3309 self.assertFalse(os.get_inheritable(fd))
3310 with open(fd, "wb", closefd=False) as f:
3311 f.write(b'memfd_create')
3312 self.assertEqual(f.tell(), 12)
3313
3314 fd2 = os.memfd_create("Hi")
3315 self.addCleanup(os.close, fd2)
3316 self.assertFalse(os.get_inheritable(fd2))
3317
3318
Victor Stinner292c8352012-10-30 02:17:38 +01003319class OSErrorTests(unittest.TestCase):
3320 def setUp(self):
3321 class Str(str):
3322 pass
3323
Victor Stinnerafe17062012-10-31 22:47:43 +01003324 self.bytes_filenames = []
3325 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003326 if support.TESTFN_UNENCODABLE is not None:
3327 decoded = support.TESTFN_UNENCODABLE
3328 else:
3329 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003330 self.unicode_filenames.append(decoded)
3331 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003332 if support.TESTFN_UNDECODABLE is not None:
3333 encoded = support.TESTFN_UNDECODABLE
3334 else:
3335 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003336 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003337 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003338 self.bytes_filenames.append(memoryview(encoded))
3339
3340 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003341
3342 def test_oserror_filename(self):
3343 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003344 (self.filenames, os.chdir,),
3345 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003346 (self.filenames, os.lstat,),
3347 (self.filenames, os.open, os.O_RDONLY),
3348 (self.filenames, os.rmdir,),
3349 (self.filenames, os.stat,),
3350 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003351 ]
3352 if sys.platform == "win32":
3353 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003354 (self.bytes_filenames, os.rename, b"dst"),
3355 (self.bytes_filenames, os.replace, b"dst"),
3356 (self.unicode_filenames, os.rename, "dst"),
3357 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003358 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003359 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003360 else:
3361 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003362 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003363 (self.filenames, os.rename, "dst"),
3364 (self.filenames, os.replace, "dst"),
3365 ))
3366 if hasattr(os, "chown"):
3367 funcs.append((self.filenames, os.chown, 0, 0))
3368 if hasattr(os, "lchown"):
3369 funcs.append((self.filenames, os.lchown, 0, 0))
3370 if hasattr(os, "truncate"):
3371 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003372 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003373 funcs.append((self.filenames, os.chflags, 0))
3374 if hasattr(os, "lchflags"):
3375 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003376 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003377 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003378 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003379 if sys.platform == "win32":
3380 funcs.append((self.bytes_filenames, os.link, b"dst"))
3381 funcs.append((self.unicode_filenames, os.link, "dst"))
3382 else:
3383 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003384 if hasattr(os, "listxattr"):
3385 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003386 (self.filenames, os.listxattr,),
3387 (self.filenames, os.getxattr, "user.test"),
3388 (self.filenames, os.setxattr, "user.test", b'user'),
3389 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003390 ))
3391 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003392 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003393 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003394 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003395
Steve Dowercc16be82016-09-08 10:35:16 -07003396
Victor Stinnerafe17062012-10-31 22:47:43 +01003397 for filenames, func, *func_args in funcs:
3398 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003399 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003400 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003401 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003402 else:
3403 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3404 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003405 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003406 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003407 except UnicodeDecodeError:
3408 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003409 else:
3410 self.fail("No exception thrown by {}".format(func))
3411
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003412class CPUCountTests(unittest.TestCase):
3413 def test_cpu_count(self):
3414 cpus = os.cpu_count()
3415 if cpus is not None:
3416 self.assertIsInstance(cpus, int)
3417 self.assertGreater(cpus, 0)
3418 else:
3419 self.skipTest("Could not determine the number of CPUs")
3420
Victor Stinnerdaf45552013-08-28 00:53:59 +02003421
3422class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003423 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003424 fd = os.open(__file__, os.O_RDONLY)
3425 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003426 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003427
Victor Stinnerdaf45552013-08-28 00:53:59 +02003428 os.set_inheritable(fd, True)
3429 self.assertEqual(os.get_inheritable(fd), True)
3430
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003431 @unittest.skipIf(fcntl is None, "need fcntl")
3432 def test_get_inheritable_cloexec(self):
3433 fd = os.open(__file__, os.O_RDONLY)
3434 self.addCleanup(os.close, fd)
3435 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003436
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003437 # clear FD_CLOEXEC flag
3438 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3439 flags &= ~fcntl.FD_CLOEXEC
3440 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003441
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003442 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003443
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003444 @unittest.skipIf(fcntl is None, "need fcntl")
3445 def test_set_inheritable_cloexec(self):
3446 fd = os.open(__file__, os.O_RDONLY)
3447 self.addCleanup(os.close, fd)
3448 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3449 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003450
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003451 os.set_inheritable(fd, True)
3452 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3453 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003454
Victor Stinnerdaf45552013-08-28 00:53:59 +02003455 def test_open(self):
3456 fd = os.open(__file__, os.O_RDONLY)
3457 self.addCleanup(os.close, fd)
3458 self.assertEqual(os.get_inheritable(fd), False)
3459
3460 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3461 def test_pipe(self):
3462 rfd, wfd = os.pipe()
3463 self.addCleanup(os.close, rfd)
3464 self.addCleanup(os.close, wfd)
3465 self.assertEqual(os.get_inheritable(rfd), False)
3466 self.assertEqual(os.get_inheritable(wfd), False)
3467
3468 def test_dup(self):
3469 fd1 = os.open(__file__, os.O_RDONLY)
3470 self.addCleanup(os.close, fd1)
3471
3472 fd2 = os.dup(fd1)
3473 self.addCleanup(os.close, fd2)
3474 self.assertEqual(os.get_inheritable(fd2), False)
3475
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003476 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3477 def test_dup_nul(self):
3478 # os.dup() was creating inheritable fds for character files.
3479 fd1 = os.open('NUL', os.O_RDONLY)
3480 self.addCleanup(os.close, fd1)
3481 fd2 = os.dup(fd1)
3482 self.addCleanup(os.close, fd2)
3483 self.assertFalse(os.get_inheritable(fd2))
3484
Victor Stinnerdaf45552013-08-28 00:53:59 +02003485 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3486 def test_dup2(self):
3487 fd = os.open(__file__, os.O_RDONLY)
3488 self.addCleanup(os.close, fd)
3489
3490 # inheritable by default
3491 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003492 self.addCleanup(os.close, fd2)
3493 self.assertEqual(os.dup2(fd, fd2), fd2)
3494 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003495
3496 # force non-inheritable
3497 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003498 self.addCleanup(os.close, fd3)
3499 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3500 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003501
3502 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3503 def test_openpty(self):
3504 master_fd, slave_fd = os.openpty()
3505 self.addCleanup(os.close, master_fd)
3506 self.addCleanup(os.close, slave_fd)
3507 self.assertEqual(os.get_inheritable(master_fd), False)
3508 self.assertEqual(os.get_inheritable(slave_fd), False)
3509
3510
Brett Cannon3f9183b2016-08-26 14:44:48 -07003511class PathTConverterTests(unittest.TestCase):
3512 # tuples of (function name, allows fd arguments, additional arguments to
3513 # function, cleanup function)
3514 functions = [
3515 ('stat', True, (), None),
3516 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003517 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003518 ('chflags', False, (0,), None),
3519 ('lchflags', False, (0,), None),
3520 ('open', False, (0,), getattr(os, 'close', None)),
3521 ]
3522
3523 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003524 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003525 if os.name == 'nt':
3526 bytes_fspath = bytes_filename = None
3527 else:
3528 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003529 bytes_fspath = FakePath(bytes_filename)
3530 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003531 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003532 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003533
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003534 int_fspath = FakePath(fd)
3535 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003536
3537 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3538 with self.subTest(name=name):
3539 try:
3540 fn = getattr(os, name)
3541 except AttributeError:
3542 continue
3543
Brett Cannon8f96a302016-08-26 19:30:11 -07003544 for path in (str_filename, bytes_filename, str_fspath,
3545 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003546 if path is None:
3547 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003548 with self.subTest(name=name, path=path):
3549 result = fn(path, *extra_args)
3550 if cleanup_fn is not None:
3551 cleanup_fn(result)
3552
3553 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003554 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003555 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003556
3557 if allow_fd:
3558 result = fn(fd, *extra_args) # should not fail
3559 if cleanup_fn is not None:
3560 cleanup_fn(result)
3561 else:
3562 with self.assertRaisesRegex(
3563 TypeError,
3564 'os.PathLike'):
3565 fn(fd, *extra_args)
3566
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003567 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003568 msg = r'__fspath__\(\) to return str or bytes, not %s'
3569 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003570 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003571 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003572 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003573 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003574 os.stat(FakePath(object()))
3575
Brett Cannon3f9183b2016-08-26 14:44:48 -07003576
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003577@unittest.skipUnless(hasattr(os, 'get_blocking'),
3578 'needs os.get_blocking() and os.set_blocking()')
3579class BlockingTests(unittest.TestCase):
3580 def test_blocking(self):
3581 fd = os.open(__file__, os.O_RDONLY)
3582 self.addCleanup(os.close, fd)
3583 self.assertEqual(os.get_blocking(fd), True)
3584
3585 os.set_blocking(fd, False)
3586 self.assertEqual(os.get_blocking(fd), False)
3587
3588 os.set_blocking(fd, True)
3589 self.assertEqual(os.get_blocking(fd), True)
3590
3591
Yury Selivanov97e2e062014-09-26 12:33:06 -04003592
3593class ExportsTests(unittest.TestCase):
3594 def test_os_all(self):
3595 self.assertIn('open', os.__all__)
3596 self.assertIn('walk', os.__all__)
3597
3598
Victor Stinner6036e442015-03-08 01:58:04 +01003599class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003600 check_no_resource_warning = support.check_no_resource_warning
3601
Victor Stinner6036e442015-03-08 01:58:04 +01003602 def setUp(self):
3603 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003604 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003605 self.addCleanup(support.rmtree, self.path)
3606 os.mkdir(self.path)
3607
3608 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003609 path = self.bytes_path if isinstance(name, bytes) else self.path
3610 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003611 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003612 return filename
3613
3614 def get_entries(self, names):
3615 entries = dict((entry.name, entry)
3616 for entry in os.scandir(self.path))
3617 self.assertEqual(sorted(entries.keys()), names)
3618 return entries
3619
3620 def assert_stat_equal(self, stat1, stat2, skip_fields):
3621 if skip_fields:
3622 for attr in dir(stat1):
3623 if not attr.startswith("st_"):
3624 continue
3625 if attr in ("st_dev", "st_ino", "st_nlink"):
3626 continue
3627 self.assertEqual(getattr(stat1, attr),
3628 getattr(stat2, attr),
3629 (stat1, stat2, attr))
3630 else:
3631 self.assertEqual(stat1, stat2)
3632
3633 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003634 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003635 self.assertEqual(entry.name, name)
3636 self.assertEqual(entry.path, os.path.join(self.path, name))
3637 self.assertEqual(entry.inode(),
3638 os.stat(entry.path, follow_symlinks=False).st_ino)
3639
3640 entry_stat = os.stat(entry.path)
3641 self.assertEqual(entry.is_dir(),
3642 stat.S_ISDIR(entry_stat.st_mode))
3643 self.assertEqual(entry.is_file(),
3644 stat.S_ISREG(entry_stat.st_mode))
3645 self.assertEqual(entry.is_symlink(),
3646 os.path.islink(entry.path))
3647
3648 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3649 self.assertEqual(entry.is_dir(follow_symlinks=False),
3650 stat.S_ISDIR(entry_lstat.st_mode))
3651 self.assertEqual(entry.is_file(follow_symlinks=False),
3652 stat.S_ISREG(entry_lstat.st_mode))
3653
3654 self.assert_stat_equal(entry.stat(),
3655 entry_stat,
3656 os.name == 'nt' and not is_symlink)
3657 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3658 entry_lstat,
3659 os.name == 'nt')
3660
3661 def test_attributes(self):
3662 link = hasattr(os, 'link')
3663 symlink = support.can_symlink()
3664
3665 dirname = os.path.join(self.path, "dir")
3666 os.mkdir(dirname)
3667 filename = self.create_file("file.txt")
3668 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003669 try:
3670 os.link(filename, os.path.join(self.path, "link_file.txt"))
3671 except PermissionError as e:
3672 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003673 if symlink:
3674 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3675 target_is_directory=True)
3676 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3677
3678 names = ['dir', 'file.txt']
3679 if link:
3680 names.append('link_file.txt')
3681 if symlink:
3682 names.extend(('symlink_dir', 'symlink_file.txt'))
3683 entries = self.get_entries(names)
3684
3685 entry = entries['dir']
3686 self.check_entry(entry, 'dir', True, False, False)
3687
3688 entry = entries['file.txt']
3689 self.check_entry(entry, 'file.txt', False, True, False)
3690
3691 if link:
3692 entry = entries['link_file.txt']
3693 self.check_entry(entry, 'link_file.txt', False, True, False)
3694
3695 if symlink:
3696 entry = entries['symlink_dir']
3697 self.check_entry(entry, 'symlink_dir', True, False, True)
3698
3699 entry = entries['symlink_file.txt']
3700 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3701
3702 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003703 path = self.bytes_path if isinstance(name, bytes) else self.path
3704 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003705 self.assertEqual(len(entries), 1)
3706
3707 entry = entries[0]
3708 self.assertEqual(entry.name, name)
3709 return entry
3710
Brett Cannon96881cd2016-06-10 14:37:21 -07003711 def create_file_entry(self, name='file.txt'):
3712 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003713 return self.get_entry(os.path.basename(filename))
3714
3715 def test_current_directory(self):
3716 filename = self.create_file()
3717 old_dir = os.getcwd()
3718 try:
3719 os.chdir(self.path)
3720
3721 # call scandir() without parameter: it must list the content
3722 # of the current directory
3723 entries = dict((entry.name, entry) for entry in os.scandir())
3724 self.assertEqual(sorted(entries.keys()),
3725 [os.path.basename(filename)])
3726 finally:
3727 os.chdir(old_dir)
3728
3729 def test_repr(self):
3730 entry = self.create_file_entry()
3731 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3732
Brett Cannon96881cd2016-06-10 14:37:21 -07003733 def test_fspath_protocol(self):
3734 entry = self.create_file_entry()
3735 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3736
3737 def test_fspath_protocol_bytes(self):
3738 bytes_filename = os.fsencode('bytesfile.txt')
3739 bytes_entry = self.create_file_entry(name=bytes_filename)
3740 fspath = os.fspath(bytes_entry)
3741 self.assertIsInstance(fspath, bytes)
3742 self.assertEqual(fspath,
3743 os.path.join(os.fsencode(self.path),bytes_filename))
3744
Victor Stinner6036e442015-03-08 01:58:04 +01003745 def test_removed_dir(self):
3746 path = os.path.join(self.path, 'dir')
3747
3748 os.mkdir(path)
3749 entry = self.get_entry('dir')
3750 os.rmdir(path)
3751
3752 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3753 if os.name == 'nt':
3754 self.assertTrue(entry.is_dir())
3755 self.assertFalse(entry.is_file())
3756 self.assertFalse(entry.is_symlink())
3757 if os.name == 'nt':
3758 self.assertRaises(FileNotFoundError, entry.inode)
3759 # don't fail
3760 entry.stat()
3761 entry.stat(follow_symlinks=False)
3762 else:
3763 self.assertGreater(entry.inode(), 0)
3764 self.assertRaises(FileNotFoundError, entry.stat)
3765 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3766
3767 def test_removed_file(self):
3768 entry = self.create_file_entry()
3769 os.unlink(entry.path)
3770
3771 self.assertFalse(entry.is_dir())
3772 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3773 if os.name == 'nt':
3774 self.assertTrue(entry.is_file())
3775 self.assertFalse(entry.is_symlink())
3776 if os.name == 'nt':
3777 self.assertRaises(FileNotFoundError, entry.inode)
3778 # don't fail
3779 entry.stat()
3780 entry.stat(follow_symlinks=False)
3781 else:
3782 self.assertGreater(entry.inode(), 0)
3783 self.assertRaises(FileNotFoundError, entry.stat)
3784 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3785
3786 def test_broken_symlink(self):
3787 if not support.can_symlink():
3788 return self.skipTest('cannot create symbolic link')
3789
3790 filename = self.create_file("file.txt")
3791 os.symlink(filename,
3792 os.path.join(self.path, "symlink.txt"))
3793 entries = self.get_entries(['file.txt', 'symlink.txt'])
3794 entry = entries['symlink.txt']
3795 os.unlink(filename)
3796
3797 self.assertGreater(entry.inode(), 0)
3798 self.assertFalse(entry.is_dir())
3799 self.assertFalse(entry.is_file()) # broken symlink returns False
3800 self.assertFalse(entry.is_dir(follow_symlinks=False))
3801 self.assertFalse(entry.is_file(follow_symlinks=False))
3802 self.assertTrue(entry.is_symlink())
3803 self.assertRaises(FileNotFoundError, entry.stat)
3804 # don't fail
3805 entry.stat(follow_symlinks=False)
3806
3807 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003808 self.create_file("file.txt")
3809
3810 path_bytes = os.fsencode(self.path)
3811 entries = list(os.scandir(path_bytes))
3812 self.assertEqual(len(entries), 1, entries)
3813 entry = entries[0]
3814
3815 self.assertEqual(entry.name, b'file.txt')
3816 self.assertEqual(entry.path,
3817 os.fsencode(os.path.join(self.path, 'file.txt')))
3818
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003819 def test_bytes_like(self):
3820 self.create_file("file.txt")
3821
3822 for cls in bytearray, memoryview:
3823 path_bytes = cls(os.fsencode(self.path))
3824 with self.assertWarns(DeprecationWarning):
3825 entries = list(os.scandir(path_bytes))
3826 self.assertEqual(len(entries), 1, entries)
3827 entry = entries[0]
3828
3829 self.assertEqual(entry.name, b'file.txt')
3830 self.assertEqual(entry.path,
3831 os.fsencode(os.path.join(self.path, 'file.txt')))
3832 self.assertIs(type(entry.name), bytes)
3833 self.assertIs(type(entry.path), bytes)
3834
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003835 @unittest.skipUnless(os.listdir in os.supports_fd,
3836 'fd support for listdir required for this test.')
3837 def test_fd(self):
3838 self.assertIn(os.scandir, os.supports_fd)
3839 self.create_file('file.txt')
3840 expected_names = ['file.txt']
3841 if support.can_symlink():
3842 os.symlink('file.txt', os.path.join(self.path, 'link'))
3843 expected_names.append('link')
3844
3845 fd = os.open(self.path, os.O_RDONLY)
3846 try:
3847 with os.scandir(fd) as it:
3848 entries = list(it)
3849 names = [entry.name for entry in entries]
3850 self.assertEqual(sorted(names), expected_names)
3851 self.assertEqual(names, os.listdir(fd))
3852 for entry in entries:
3853 self.assertEqual(entry.path, entry.name)
3854 self.assertEqual(os.fspath(entry), entry.name)
3855 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3856 if os.stat in os.supports_dir_fd:
3857 st = os.stat(entry.name, dir_fd=fd)
3858 self.assertEqual(entry.stat(), st)
3859 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3860 self.assertEqual(entry.stat(follow_symlinks=False), st)
3861 finally:
3862 os.close(fd)
3863
Victor Stinner6036e442015-03-08 01:58:04 +01003864 def test_empty_path(self):
3865 self.assertRaises(FileNotFoundError, os.scandir, '')
3866
3867 def test_consume_iterator_twice(self):
3868 self.create_file("file.txt")
3869 iterator = os.scandir(self.path)
3870
3871 entries = list(iterator)
3872 self.assertEqual(len(entries), 1, entries)
3873
3874 # check than consuming the iterator twice doesn't raise exception
3875 entries2 = list(iterator)
3876 self.assertEqual(len(entries2), 0, entries2)
3877
3878 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003879 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003880 self.assertRaises(TypeError, os.scandir, obj)
3881
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003882 def test_close(self):
3883 self.create_file("file.txt")
3884 self.create_file("file2.txt")
3885 iterator = os.scandir(self.path)
3886 next(iterator)
3887 iterator.close()
3888 # multiple closes
3889 iterator.close()
3890 with self.check_no_resource_warning():
3891 del iterator
3892
3893 def test_context_manager(self):
3894 self.create_file("file.txt")
3895 self.create_file("file2.txt")
3896 with os.scandir(self.path) as iterator:
3897 next(iterator)
3898 with self.check_no_resource_warning():
3899 del iterator
3900
3901 def test_context_manager_close(self):
3902 self.create_file("file.txt")
3903 self.create_file("file2.txt")
3904 with os.scandir(self.path) as iterator:
3905 next(iterator)
3906 iterator.close()
3907
3908 def test_context_manager_exception(self):
3909 self.create_file("file.txt")
3910 self.create_file("file2.txt")
3911 with self.assertRaises(ZeroDivisionError):
3912 with os.scandir(self.path) as iterator:
3913 next(iterator)
3914 1/0
3915 with self.check_no_resource_warning():
3916 del iterator
3917
3918 def test_resource_warning(self):
3919 self.create_file("file.txt")
3920 self.create_file("file2.txt")
3921 iterator = os.scandir(self.path)
3922 next(iterator)
3923 with self.assertWarns(ResourceWarning):
3924 del iterator
3925 support.gc_collect()
3926 # exhausted iterator
3927 iterator = os.scandir(self.path)
3928 list(iterator)
3929 with self.check_no_resource_warning():
3930 del iterator
3931
Victor Stinner6036e442015-03-08 01:58:04 +01003932
Ethan Furmancdc08792016-06-02 15:06:09 -07003933class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003934
3935 # Abstracted so it can be overridden to test pure Python implementation
3936 # if a C version is provided.
3937 fspath = staticmethod(os.fspath)
3938
Ethan Furmancdc08792016-06-02 15:06:09 -07003939 def test_return_bytes(self):
3940 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003941 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003942
3943 def test_return_string(self):
3944 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003945 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003946
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003947 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003948 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003949 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003950
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003951 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003952 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3953 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3954
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003955 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003956 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3957 self.assertTrue(issubclass(FakePath, os.PathLike))
3958 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003959
Ethan Furmancdc08792016-06-02 15:06:09 -07003960 def test_garbage_in_exception_out(self):
3961 vapor = type('blah', (), {})
3962 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003963 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003964
3965 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003966 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003967
Brett Cannon044283a2016-07-15 10:41:49 -07003968 def test_bad_pathlike(self):
3969 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003970 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003971 # __fspath__ attribute that is not callable.
3972 c = type('foo', (), {})
3973 c.__fspath__ = 1
3974 self.assertRaises(TypeError, self.fspath, c())
3975 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003976 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003977 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003978
Victor Stinnerc29b5852017-11-02 07:28:27 -07003979
3980class TimesTests(unittest.TestCase):
3981 def test_times(self):
3982 times = os.times()
3983 self.assertIsInstance(times, os.times_result)
3984
3985 for field in ('user', 'system', 'children_user', 'children_system',
3986 'elapsed'):
3987 value = getattr(times, field)
3988 self.assertIsInstance(value, float)
3989
3990 if os.name == 'nt':
3991 self.assertEqual(times.children_user, 0)
3992 self.assertEqual(times.children_system, 0)
3993 self.assertEqual(times.elapsed, 0)
3994
3995
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003996# Only test if the C version is provided, otherwise TestPEP519 already tested
3997# the pure Python implementation.
3998if hasattr(os, "_fspath"):
3999 class TestPEP519PurePython(TestPEP519):
4000
4001 """Explicitly test the pure Python implementation of os.fspath()."""
4002
4003 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004004
4005
Fred Drake2e2be372001-09-20 21:33:42 +00004006if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004007 unittest.main()