blob: 03152072c1bf5363c267ce9153a45e6d4200f0ef [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020025import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020026import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070028import types
Victor Stinner47aacc82015-06-12 17:26:23 +020029import unittest
30import uuid
31import warnings
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030034from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080035from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080036from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070037from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020038
Antoine Pitrouec34ab52013-08-16 20:44:38 +020039try:
40 import resource
41except ImportError:
42 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020043try:
44 import fcntl
45except ImportError:
46 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010047try:
48 import _winapi
49except ImportError:
50 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020051try:
R David Murrayf2ad1732014-12-25 18:36:56 -050052 import pwd
53 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010054except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050055 all_users = []
56try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020057 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020058except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020060
Berker Peksagce643912015-05-06 06:33:17 +030061from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080062from test.support import unix_shell
63from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000064
Victor Stinner923590e2016-03-24 09:11:48 +010065
R David Murrayf2ad1732014-12-25 18:36:56 -050066root_in_posix = False
67if hasattr(os, 'geteuid'):
68 root_in_posix = (os.geteuid() == 0)
69
Mark Dickinson7cf03892010-04-16 13:45:35 +000070# Detect whether we're on a Linux system that uses the (now outdated
71# and unmaintained) linuxthreads threading library. There's an issue
72# when combining linuxthreads with a failed execv call: see
73# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020074if hasattr(sys, 'thread_info') and sys.thread_info.version:
75 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
76else:
77 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000078
Stefan Krahebee49a2013-01-17 15:31:00 +010079# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
80HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
81
Victor Stinner923590e2016-03-24 09:11:48 +010082
Berker Peksag4af23d72016-09-15 20:32:44 +030083def requires_os_func(name):
84 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
85
86
Victor Stinnerae39d232016-03-24 17:12:55 +010087def create_file(filename, content=b'content'):
88 with open(filename, "xb", 0) as fp:
89 fp.write(content)
90
91
Victor Stinner689830e2019-06-26 17:31:12 +020092class MiscTests(unittest.TestCase):
93 def test_getcwd(self):
94 cwd = os.getcwd()
95 self.assertIsInstance(cwd, str)
96
Victor Stinnerec3e20a2019-06-28 18:01:59 +020097 def test_getcwd_long_path(self):
98 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
99 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
100 # longer path if longer paths support is enabled. Internally, the os
101 # module uses MAXPATHLEN which is at least 1024.
102 #
103 # Use a directory name of 200 characters to fit into Windows MAX_PATH
104 # limit.
105 #
106 # On Windows, the test can stop when trying to create a path longer
107 # than MAX_PATH if long paths support is disabled:
108 # see RtlAreLongPathsEnabled().
109 min_len = 2000 # characters
110 dirlen = 200 # characters
111 dirname = 'python_test_dir_'
112 dirname = dirname + ('a' * (dirlen - len(dirname)))
113
114 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800115 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200116 expected = path
117
118 while True:
119 cwd = os.getcwd()
120 self.assertEqual(cwd, expected)
121
122 need = min_len - (len(cwd) + len(os.path.sep))
123 if need <= 0:
124 break
125 if len(dirname) > need and need > 0:
126 dirname = dirname[:need]
127
128 path = os.path.join(path, dirname)
129 try:
130 os.mkdir(path)
131 # On Windows, chdir() can fail
132 # even if mkdir() succeeded
133 os.chdir(path)
134 except FileNotFoundError:
135 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
136 # ERROR_FILENAME_EXCED_RANGE (206) errors
137 # ("The filename or extension is too long")
138 break
139 except OSError as exc:
140 if exc.errno == errno.ENAMETOOLONG:
141 break
142 else:
143 raise
144
145 expected = path
146
147 if support.verbose:
148 print(f"Tested current directory length: {len(cwd)}")
149
Victor Stinner689830e2019-06-26 17:31:12 +0200150 def test_getcwdb(self):
151 cwd = os.getcwdb()
152 self.assertIsInstance(cwd, bytes)
153 self.assertEqual(os.fsdecode(cwd), os.getcwd())
154
155
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000156# Tests creating TESTFN
157class FileTests(unittest.TestCase):
158 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800159 if os.path.lexists(os_helper.TESTFN):
160 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161 tearDown = setUp
162
163 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800164 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000165 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800166 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000167
Christian Heimesfdab48e2008-01-20 09:06:41 +0000168 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800169 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000170 # We must allocate two consecutive file descriptors, otherwise
171 # it will mess up other file descriptors (perhaps even the three
172 # standard ones).
173 second = os.dup(first)
174 try:
175 retries = 0
176 while second != first + 1:
177 os.close(first)
178 retries += 1
179 if retries > 10:
180 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000181 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000182 first, second = second, os.dup(second)
183 finally:
184 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000185 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000186 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000187 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000188
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000189 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000190 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800191 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000192 old = sys.getrefcount(path)
193 self.assertRaises(TypeError, os.rename, path, 0)
194 new = sys.getrefcount(path)
195 self.assertEqual(old, new)
196
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000197 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800198 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000199 fobj.write(b"spam")
200 fobj.flush()
201 fd = fobj.fileno()
202 os.lseek(fd, 0, 0)
203 s = os.read(fd, 4)
204 self.assertEqual(type(s), bytes)
205 self.assertEqual(s, b"spam")
206
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200207 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200208 # Skip the test on 32-bit platforms: the number of bytes must fit in a
209 # Py_ssize_t type
210 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
211 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200212 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
213 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800214 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
215 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200216
217 # Issue #21932: Make sure that os.read() does not raise an
218 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800219 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200220 data = os.read(fp.fileno(), size)
221
Victor Stinner8c663fd2017-11-08 14:44:44 -0800222 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200223 # operating system is free to return less bytes than requested.
224 self.assertEqual(data, b'test')
225
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000226 def test_write(self):
227 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800228 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000229 self.assertRaises(TypeError, os.write, fd, "beans")
230 os.write(fd, b"bacon\n")
231 os.write(fd, bytearray(b"eggs\n"))
232 os.write(fd, memoryview(b"spam\n"))
233 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800234 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000235 self.assertEqual(fobj.read().splitlines(),
236 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000237
Victor Stinnere0daff12011-03-20 23:36:35 +0100238 def write_windows_console(self, *args):
239 retcode = subprocess.call(args,
240 # use a new console to not flood the test output
241 creationflags=subprocess.CREATE_NEW_CONSOLE,
242 # use a shell to hide the console window (SW_HIDE)
243 shell=True)
244 self.assertEqual(retcode, 0)
245
246 @unittest.skipUnless(sys.platform == 'win32',
247 'test specific to the Windows console')
248 def test_write_windows_console(self):
249 # Issue #11395: the Windows console returns an error (12: not enough
250 # space error) on writing into stdout if stdout mode is binary and the
251 # length is greater than 66,000 bytes (or less, depending on heap
252 # usage).
253 code = "print('x' * 100000)"
254 self.write_windows_console(sys.executable, "-c", code)
255 self.write_windows_console(sys.executable, "-u", "-c", code)
256
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000257 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800258 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200259 f = os.fdopen(fd, *args)
260 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000261
262 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800263 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200264 os.close(fd)
265
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000266 self.fdopen_helper()
267 self.fdopen_helper('r')
268 self.fdopen_helper('r', 100)
269
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100270 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800271 TESTFN2 = os_helper.TESTFN + ".2"
272 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
273 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100274
Hai Shi0c4f0f32020-06-30 21:46:31 +0800275 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100276 create_file(TESTFN2, b"2")
277
Hai Shi0c4f0f32020-06-30 21:46:31 +0800278 os.replace(os_helper.TESTFN, TESTFN2)
279 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100280 with open(TESTFN2, 'r') as f:
281 self.assertEqual(f.read(), "1")
282
Martin Panterbf19d162015-09-09 01:01:13 +0000283 def test_open_keywords(self):
284 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
285 dir_fd=None)
286 os.close(f)
287
288 def test_symlink_keywords(self):
289 symlink = support.get_attribute(os, "symlink")
290 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800291 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000292 target_is_directory=False, dir_fd=None)
293 except (NotImplementedError, OSError):
294 pass # No OS support or unprivileged user
295
Pablo Galindoaac4d032019-05-31 19:39:47 +0100296 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
297 def test_copy_file_range_invalid_values(self):
298 with self.assertRaises(ValueError):
299 os.copy_file_range(0, 1, -10)
300
301 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
302 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800303 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100304 data = b'0123456789'
305
Hai Shi0c4f0f32020-06-30 21:46:31 +0800306 create_file(os_helper.TESTFN, data)
307 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100308
Hai Shi0c4f0f32020-06-30 21:46:31 +0800309 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100310 self.addCleanup(in_file.close)
311 in_fd = in_file.fileno()
312
313 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800314 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100315 self.addCleanup(out_file.close)
316 out_fd = out_file.fileno()
317
318 try:
319 i = os.copy_file_range(in_fd, out_fd, 5)
320 except OSError as e:
321 # Handle the case in which Python was compiled
322 # in a system with the syscall but without support
323 # in the kernel.
324 if e.errno != errno.ENOSYS:
325 raise
326 self.skipTest(e)
327 else:
328 # The number of copied bytes can be less than
329 # the number of bytes originally requested.
330 self.assertIn(i, range(0, 6));
331
332 with open(TESTFN2, 'rb') as in_file:
333 self.assertEqual(in_file.read(), data[:i])
334
335 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
336 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800337 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100338 data = b'0123456789'
339 bytes_to_copy = 6
340 in_skip = 3
341 out_seek = 5
342
Hai Shi0c4f0f32020-06-30 21:46:31 +0800343 create_file(os_helper.TESTFN, data)
344 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100345
Hai Shi0c4f0f32020-06-30 21:46:31 +0800346 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100347 self.addCleanup(in_file.close)
348 in_fd = in_file.fileno()
349
350 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800351 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100352 self.addCleanup(out_file.close)
353 out_fd = out_file.fileno()
354
355 try:
356 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
357 offset_src=in_skip,
358 offset_dst=out_seek)
359 except OSError as e:
360 # Handle the case in which Python was compiled
361 # in a system with the syscall but without support
362 # in the kernel.
363 if e.errno != errno.ENOSYS:
364 raise
365 self.skipTest(e)
366 else:
367 # The number of copied bytes can be less than
368 # the number of bytes originally requested.
369 self.assertIn(i, range(0, bytes_to_copy+1));
370
371 with open(TESTFN4, 'rb') as in_file:
372 read = in_file.read()
373 # seeked bytes (5) are zero'ed
374 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
375 # 012 are skipped (in_skip)
376 # 345678 are copied in the file (in_skip + bytes_to_copy)
377 self.assertEqual(read[out_seek:],
378 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200379
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000380# Test attributes on return values from os.*stat* family.
381class StatAttributeTests(unittest.TestCase):
382 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800383 self.fname = os_helper.TESTFN
384 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100385 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386
Antoine Pitrou38425292010-09-21 18:19:07 +0000387 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000388 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389
390 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000391 self.assertEqual(result[stat.ST_SIZE], 3)
392 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000393
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000394 # Make sure all the attributes are there
395 members = dir(result)
396 for name in dir(stat):
397 if name[:3] == 'ST_':
398 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000399 if name.endswith("TIME"):
400 def trunc(x): return int(x)
401 else:
402 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000403 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000404 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000405 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000406
Larry Hastings6fe20b32012-04-19 15:07:49 -0700407 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700408 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700409 for name in 'st_atime st_mtime st_ctime'.split():
410 floaty = int(getattr(result, name) * 100000)
411 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700412 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700413
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000414 try:
415 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200416 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000417 except IndexError:
418 pass
419
420 # Make sure that assignment fails
421 try:
422 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200423 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000424 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000425 pass
426
427 try:
428 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200429 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000430 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000431 pass
432
433 try:
434 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200435 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000436 except AttributeError:
437 pass
438
439 # Use the stat_result constructor with a too-short tuple.
440 try:
441 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200442 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000443 except TypeError:
444 pass
445
Ezio Melotti42da6632011-03-15 05:18:48 +0200446 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000447 try:
448 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
449 except TypeError:
450 pass
451
Antoine Pitrou38425292010-09-21 18:19:07 +0000452 def test_stat_attributes(self):
453 self.check_stat_attributes(self.fname)
454
455 def test_stat_attributes_bytes(self):
456 try:
457 fname = self.fname.encode(sys.getfilesystemencoding())
458 except UnicodeEncodeError:
459 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700460 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000461
Christian Heimes25827622013-10-12 01:27:08 +0200462 def test_stat_result_pickle(self):
463 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200464 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
465 p = pickle.dumps(result, proto)
466 self.assertIn(b'stat_result', p)
467 if proto < 4:
468 self.assertIn(b'cos\nstat_result\n', p)
469 unpickled = pickle.loads(p)
470 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200471
Serhiy Storchaka43767632013-11-03 21:31:38 +0200472 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000473 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700474 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000475
476 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000478
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000479 # Make sure all the attributes are there.
480 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
481 'ffree', 'favail', 'flag', 'namemax')
482 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000483 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000484
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100485 self.assertTrue(isinstance(result.f_fsid, int))
486
487 # Test that the size of the tuple doesn't change
488 self.assertEqual(len(result), 10)
489
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000490 # Make sure that assignment really fails
491 try:
492 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200493 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000494 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000495 pass
496
497 try:
498 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200499 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000500 except AttributeError:
501 pass
502
503 # Use the constructor with a too-short tuple.
504 try:
505 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200506 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000507 except TypeError:
508 pass
509
Ezio Melotti42da6632011-03-15 05:18:48 +0200510 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000511 try:
512 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
513 except TypeError:
514 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000515
Christian Heimes25827622013-10-12 01:27:08 +0200516 @unittest.skipUnless(hasattr(os, 'statvfs'),
517 "need os.statvfs()")
518 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700519 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200520
Serhiy Storchakabad12572014-12-15 14:03:42 +0200521 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
522 p = pickle.dumps(result, proto)
523 self.assertIn(b'statvfs_result', p)
524 if proto < 4:
525 self.assertIn(b'cos\nstatvfs_result\n', p)
526 unpickled = pickle.loads(p)
527 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200528
Serhiy Storchaka43767632013-11-03 21:31:38 +0200529 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
530 def test_1686475(self):
531 # Verify that an open file can be stat'ed
532 try:
533 os.stat(r"c:\pagefile.sys")
534 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600535 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200536 except OSError as e:
537 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000538
Serhiy Storchaka43767632013-11-03 21:31:38 +0200539 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
540 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
541 def test_15261(self):
542 # Verify that stat'ing a closed fd does not cause crash
543 r, w = os.pipe()
544 try:
545 os.stat(r) # should not raise error
546 finally:
547 os.close(r)
548 os.close(w)
549 with self.assertRaises(OSError) as ctx:
550 os.stat(r)
551 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100552
Zachary Ware63f277b2014-06-19 09:46:37 -0500553 def check_file_attributes(self, result):
554 self.assertTrue(hasattr(result, 'st_file_attributes'))
555 self.assertTrue(isinstance(result.st_file_attributes, int))
556 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
557
558 @unittest.skipUnless(sys.platform == "win32",
559 "st_file_attributes is Win32 specific")
560 def test_file_attributes(self):
561 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
562 result = os.stat(self.fname)
563 self.check_file_attributes(result)
564 self.assertEqual(
565 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
566 0)
567
568 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800569 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200570 os.mkdir(dirname)
571 self.addCleanup(os.rmdir, dirname)
572
573 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500574 self.check_file_attributes(result)
575 self.assertEqual(
576 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
577 stat.FILE_ATTRIBUTE_DIRECTORY)
578
Berker Peksag0b4dc482016-09-17 15:49:59 +0300579 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
580 def test_access_denied(self):
581 # Default to FindFirstFile WIN32_FIND_DATA when access is
582 # denied. See issue 28075.
583 # os.environ['TEMP'] should be located on a volume that
584 # supports file ACLs.
585 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800586 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300587 create_file(fname, b'ABC')
588 # Deny the right to [S]YNCHRONIZE on the file to
589 # force CreateFile to fail with ERROR_ACCESS_DENIED.
590 DETACHED_PROCESS = 8
591 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500592 # bpo-30584: Use security identifier *S-1-5-32-545 instead
593 # of localized "Users" to not depend on the locale.
594 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300595 creationflags=DETACHED_PROCESS
596 )
597 result = os.stat(fname)
598 self.assertNotEqual(result.st_size, 0)
599
Steve Dower772ec0f2019-09-04 14:42:54 -0700600 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
601 def test_stat_block_device(self):
602 # bpo-38030: os.stat fails for block devices
603 # Test a filename like "//./C:"
604 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
605 result = os.stat(fname)
606 self.assertEqual(result.st_mode, stat.S_IFBLK)
607
Victor Stinner47aacc82015-06-12 17:26:23 +0200608
609class UtimeTests(unittest.TestCase):
610 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800611 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200612 self.fname = os.path.join(self.dirname, "f1")
613
Hai Shi0c4f0f32020-06-30 21:46:31 +0800614 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200615 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100616 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200617
Victor Stinner47aacc82015-06-12 17:26:23 +0200618 def support_subsecond(self, filename):
619 # Heuristic to check if the filesystem supports timestamp with
620 # subsecond resolution: check if float and int timestamps are different
621 st = os.stat(filename)
622 return ((st.st_atime != st[7])
623 or (st.st_mtime != st[8])
624 or (st.st_ctime != st[9]))
625
626 def _test_utime(self, set_time, filename=None):
627 if not filename:
628 filename = self.fname
629
630 support_subsecond = self.support_subsecond(filename)
631 if support_subsecond:
632 # Timestamp with a resolution of 1 microsecond (10^-6).
633 #
634 # The resolution of the C internal function used by os.utime()
635 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
636 # test with a resolution of 1 ns requires more work:
637 # see the issue #15745.
638 atime_ns = 1002003000 # 1.002003 seconds
639 mtime_ns = 4005006000 # 4.005006 seconds
640 else:
641 # use a resolution of 1 second
642 atime_ns = 5 * 10**9
643 mtime_ns = 8 * 10**9
644
645 set_time(filename, (atime_ns, mtime_ns))
646 st = os.stat(filename)
647
648 if support_subsecond:
649 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
650 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
651 else:
652 self.assertEqual(st.st_atime, atime_ns * 1e-9)
653 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
654 self.assertEqual(st.st_atime_ns, atime_ns)
655 self.assertEqual(st.st_mtime_ns, mtime_ns)
656
657 def test_utime(self):
658 def set_time(filename, ns):
659 # test the ns keyword parameter
660 os.utime(filename, ns=ns)
661 self._test_utime(set_time)
662
663 @staticmethod
664 def ns_to_sec(ns):
665 # Convert a number of nanosecond (int) to a number of seconds (float).
666 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
667 # issue, os.utime() rounds towards minus infinity.
668 return (ns * 1e-9) + 0.5e-9
669
670 def test_utime_by_indexed(self):
671 # pass times as floating point seconds as the second indexed parameter
672 def set_time(filename, ns):
673 atime_ns, mtime_ns = ns
674 atime = self.ns_to_sec(atime_ns)
675 mtime = self.ns_to_sec(mtime_ns)
676 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
677 # or utime(time_t)
678 os.utime(filename, (atime, mtime))
679 self._test_utime(set_time)
680
681 def test_utime_by_times(self):
682 def set_time(filename, ns):
683 atime_ns, mtime_ns = ns
684 atime = self.ns_to_sec(atime_ns)
685 mtime = self.ns_to_sec(mtime_ns)
686 # test the times keyword parameter
687 os.utime(filename, times=(atime, mtime))
688 self._test_utime(set_time)
689
690 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
691 "follow_symlinks support for utime required "
692 "for this test.")
693 def test_utime_nofollow_symlinks(self):
694 def set_time(filename, ns):
695 # use follow_symlinks=False to test utimensat(timespec)
696 # or lutimes(timeval)
697 os.utime(filename, ns=ns, follow_symlinks=False)
698 self._test_utime(set_time)
699
700 @unittest.skipUnless(os.utime in os.supports_fd,
701 "fd support for utime required for this test.")
702 def test_utime_fd(self):
703 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100704 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200705 # use a file descriptor to test futimens(timespec)
706 # or futimes(timeval)
707 os.utime(fp.fileno(), ns=ns)
708 self._test_utime(set_time)
709
710 @unittest.skipUnless(os.utime in os.supports_dir_fd,
711 "dir_fd support for utime required for this test.")
712 def test_utime_dir_fd(self):
713 def set_time(filename, ns):
714 dirname, name = os.path.split(filename)
715 dirfd = os.open(dirname, os.O_RDONLY)
716 try:
717 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
718 os.utime(name, dir_fd=dirfd, ns=ns)
719 finally:
720 os.close(dirfd)
721 self._test_utime(set_time)
722
723 def test_utime_directory(self):
724 def set_time(filename, ns):
725 # test calling os.utime() on a directory
726 os.utime(filename, ns=ns)
727 self._test_utime(set_time, filename=self.dirname)
728
729 def _test_utime_current(self, set_time):
730 # Get the system clock
731 current = time.time()
732
733 # Call os.utime() to set the timestamp to the current system clock
734 set_time(self.fname)
735
736 if not self.support_subsecond(self.fname):
737 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700738 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200739 # On Windows, the usual resolution of time.time() is 15.6 ms.
740 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700741 #
742 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
743 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200744 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200745 st = os.stat(self.fname)
746 msg = ("st_time=%r, current=%r, dt=%r"
747 % (st.st_mtime, current, st.st_mtime - current))
748 self.assertAlmostEqual(st.st_mtime, current,
749 delta=delta, msg=msg)
750
751 def test_utime_current(self):
752 def set_time(filename):
753 # Set to the current time in the new way
754 os.utime(self.fname)
755 self._test_utime_current(set_time)
756
757 def test_utime_current_old(self):
758 def set_time(filename):
759 # Set to the current time in the old explicit way.
760 os.utime(self.fname, None)
761 self._test_utime_current(set_time)
762
763 def get_file_system(self, path):
764 if sys.platform == 'win32':
765 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
766 import ctypes
767 kernel32 = ctypes.windll.kernel32
768 buf = ctypes.create_unicode_buffer("", 100)
769 ok = kernel32.GetVolumeInformationW(root, None, 0,
770 None, None, None,
771 buf, len(buf))
772 if ok:
773 return buf.value
774 # return None if the filesystem is unknown
775
776 def test_large_time(self):
777 # Many filesystems are limited to the year 2038. At least, the test
778 # pass with NTFS filesystem.
779 if self.get_file_system(self.dirname) != "NTFS":
780 self.skipTest("requires NTFS")
781
782 large = 5000000000 # some day in 2128
783 os.utime(self.fname, (large, large))
784 self.assertEqual(os.stat(self.fname).st_mtime, large)
785
786 def test_utime_invalid_arguments(self):
787 # seconds and nanoseconds parameters are mutually exclusive
788 with self.assertRaises(ValueError):
789 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200790 with self.assertRaises(TypeError):
791 os.utime(self.fname, [5, 5])
792 with self.assertRaises(TypeError):
793 os.utime(self.fname, (5,))
794 with self.assertRaises(TypeError):
795 os.utime(self.fname, (5, 5, 5))
796 with self.assertRaises(TypeError):
797 os.utime(self.fname, ns=[5, 5])
798 with self.assertRaises(TypeError):
799 os.utime(self.fname, ns=(5,))
800 with self.assertRaises(TypeError):
801 os.utime(self.fname, ns=(5, 5, 5))
802
803 if os.utime not in os.supports_follow_symlinks:
804 with self.assertRaises(NotImplementedError):
805 os.utime(self.fname, (5, 5), follow_symlinks=False)
806 if os.utime not in os.supports_fd:
807 with open(self.fname, 'wb', 0) as fp:
808 with self.assertRaises(TypeError):
809 os.utime(fp.fileno(), (5, 5))
810 if os.utime not in os.supports_dir_fd:
811 with self.assertRaises(NotImplementedError):
812 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200813
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300814 @support.cpython_only
815 def test_issue31577(self):
816 # The interpreter shouldn't crash in case utime() received a bad
817 # ns argument.
818 def get_bad_int(divmod_ret_val):
819 class BadInt:
820 def __divmod__(*args):
821 return divmod_ret_val
822 return BadInt()
823 with self.assertRaises(TypeError):
824 os.utime(self.fname, ns=(get_bad_int(42), 1))
825 with self.assertRaises(TypeError):
826 os.utime(self.fname, ns=(get_bad_int(()), 1))
827 with self.assertRaises(TypeError):
828 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
829
Victor Stinner47aacc82015-06-12 17:26:23 +0200830
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000831from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000832
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000833class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000834 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000835 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000836
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000837 def setUp(self):
838 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000839 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000840 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000841 for key, value in self._reference().items():
842 os.environ[key] = value
843
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000844 def tearDown(self):
845 os.environ.clear()
846 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000847 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000848 os.environb.clear()
849 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000850
Christian Heimes90333392007-11-01 19:08:42 +0000851 def _reference(self):
852 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
853
854 def _empty_mapping(self):
855 os.environ.clear()
856 return os.environ
857
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000858 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200859 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
860 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000861 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000862 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300863 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200864 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300865 value = popen.read().strip()
866 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000867
Xavier de Gayed1415312016-07-22 12:15:29 +0200868 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
869 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000870 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200871 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
872 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300873 it = iter(popen)
874 self.assertEqual(next(it), "line1\n")
875 self.assertEqual(next(it), "line2\n")
876 self.assertEqual(next(it), "line3\n")
877 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000878
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000879 # Verify environ keys and values from the OS are of the
880 # correct str type.
881 def test_keyvalue_types(self):
882 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000883 self.assertEqual(type(key), str)
884 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000885
Christian Heimes90333392007-11-01 19:08:42 +0000886 def test_items(self):
887 for key, value in self._reference().items():
888 self.assertEqual(os.environ.get(key), value)
889
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000890 # Issue 7310
891 def test___repr__(self):
892 """Check that the repr() of os.environ looks like environ({...})."""
893 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000894 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
895 '{!r}: {!r}'.format(key, value)
896 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000897
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000898 def test_get_exec_path(self):
899 defpath_list = os.defpath.split(os.pathsep)
900 test_path = ['/monty', '/python', '', '/flying/circus']
901 test_env = {'PATH': os.pathsep.join(test_path)}
902
903 saved_environ = os.environ
904 try:
905 os.environ = dict(test_env)
906 # Test that defaulting to os.environ works.
907 self.assertSequenceEqual(test_path, os.get_exec_path())
908 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
909 finally:
910 os.environ = saved_environ
911
912 # No PATH environment variable
913 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
914 # Empty PATH environment variable
915 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
916 # Supplied PATH environment variable
917 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
918
Victor Stinnerb745a742010-05-18 17:17:23 +0000919 if os.supports_bytes_environ:
920 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000921 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000922 # ignore BytesWarning warning
923 with warnings.catch_warnings(record=True):
924 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000925 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000926 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000927 pass
928 else:
929 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000930
931 # bytes key and/or value
932 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
933 ['abc'])
934 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
935 ['abc'])
936 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
937 ['abc'])
938
939 @unittest.skipUnless(os.supports_bytes_environ,
940 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000941 def test_environb(self):
942 # os.environ -> os.environb
943 value = 'euro\u20ac'
944 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000945 value_bytes = value.encode(sys.getfilesystemencoding(),
946 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000947 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000948 msg = "U+20AC character is not encodable to %s" % (
949 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000950 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000951 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(os.environ['unicode'], value)
953 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000954
955 # os.environb -> os.environ
956 value = b'\xff'
957 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000958 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000959 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000960 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000961
Victor Stinner161e7b32020-01-24 11:53:44 +0100962 def test_putenv_unsetenv(self):
963 name = "PYTHONTESTVAR"
964 value = "testvalue"
965 code = f'import os; print(repr(os.environ.get({name!r})))'
966
Hai Shi0c4f0f32020-06-30 21:46:31 +0800967 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +0100968 env.pop(name, None)
969
970 os.putenv(name, value)
971 proc = subprocess.run([sys.executable, '-c', code], check=True,
972 stdout=subprocess.PIPE, text=True)
973 self.assertEqual(proc.stdout.rstrip(), repr(value))
974
975 os.unsetenv(name)
976 proc = subprocess.run([sys.executable, '-c', code], check=True,
977 stdout=subprocess.PIPE, text=True)
978 self.assertEqual(proc.stdout.rstrip(), repr(None))
979
Victor Stinner13ff2452018-01-22 18:32:50 +0100980 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100981 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +0100982 def test_putenv_unsetenv_error(self):
983 # Empty variable name is invalid.
984 # "=" and null character are not allowed in a variable name.
985 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
986 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
987 self.assertRaises((OSError, ValueError), os.unsetenv, name)
988
Victor Stinnerb73dd022020-01-22 21:11:17 +0100989 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +0100990 # On Windows, an environment variable string ("name=value" string)
991 # is limited to 32,767 characters
992 longstr = 'x' * 32_768
993 self.assertRaises(ValueError, os.putenv, longstr, "1")
994 self.assertRaises(ValueError, os.putenv, "X", longstr)
995 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +0100996
Victor Stinner6d101392013-04-14 16:35:04 +0200997 def test_key_type(self):
998 missing = 'missingkey'
999 self.assertNotIn(missing, os.environ)
1000
Victor Stinner839e5ea2013-04-14 16:43:03 +02001001 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001002 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001003 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001004 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001005
Victor Stinner839e5ea2013-04-14 16:43:03 +02001006 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001007 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001008 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001009 self.assertTrue(cm.exception.__suppress_context__)
1010
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001011 def _test_environ_iteration(self, collection):
1012 iterator = iter(collection)
1013 new_key = "__new_key__"
1014
1015 next(iterator) # start iteration over os.environ.items
1016
1017 # add a new key in os.environ mapping
1018 os.environ[new_key] = "test_environ_iteration"
1019
1020 try:
1021 next(iterator) # force iteration over modified mapping
1022 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1023 finally:
1024 del os.environ[new_key]
1025
1026 def test_iter_error_when_changing_os_environ(self):
1027 self._test_environ_iteration(os.environ)
1028
1029 def test_iter_error_when_changing_os_environ_items(self):
1030 self._test_environ_iteration(os.environ.items())
1031
1032 def test_iter_error_when_changing_os_environ_values(self):
1033 self._test_environ_iteration(os.environ.values())
1034
Charles Burklandd648ef12020-03-13 09:04:43 -07001035 def _test_underlying_process_env(self, var, expected):
1036 if not (unix_shell and os.path.exists(unix_shell)):
1037 return
1038
1039 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1040 value = popen.read().strip()
1041
1042 self.assertEqual(expected, value)
1043
1044 def test_or_operator(self):
1045 overridden_key = '_TEST_VAR_'
1046 original_value = 'original_value'
1047 os.environ[overridden_key] = original_value
1048
1049 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1050 expected = dict(os.environ)
1051 expected.update(new_vars_dict)
1052
1053 actual = os.environ | new_vars_dict
1054 self.assertDictEqual(expected, actual)
1055 self.assertEqual('3', actual[overridden_key])
1056
1057 new_vars_items = new_vars_dict.items()
1058 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1059
1060 self._test_underlying_process_env('_A_', '')
1061 self._test_underlying_process_env(overridden_key, original_value)
1062
1063 def test_ior_operator(self):
1064 overridden_key = '_TEST_VAR_'
1065 os.environ[overridden_key] = 'original_value'
1066
1067 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1068 expected = dict(os.environ)
1069 expected.update(new_vars_dict)
1070
1071 os.environ |= new_vars_dict
1072 self.assertEqual(expected, os.environ)
1073 self.assertEqual('3', os.environ[overridden_key])
1074
1075 self._test_underlying_process_env('_A_', '1')
1076 self._test_underlying_process_env(overridden_key, '3')
1077
1078 def test_ior_operator_invalid_dicts(self):
1079 os_environ_copy = os.environ.copy()
1080 with self.assertRaises(TypeError):
1081 dict_with_bad_key = {1: '_A_'}
1082 os.environ |= dict_with_bad_key
1083
1084 with self.assertRaises(TypeError):
1085 dict_with_bad_val = {'_A_': 1}
1086 os.environ |= dict_with_bad_val
1087
1088 # Check nothing was added.
1089 self.assertEqual(os_environ_copy, os.environ)
1090
1091 def test_ior_operator_key_value_iterable(self):
1092 overridden_key = '_TEST_VAR_'
1093 os.environ[overridden_key] = 'original_value'
1094
1095 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1096 expected = dict(os.environ)
1097 expected.update(new_vars_items)
1098
1099 os.environ |= new_vars_items
1100 self.assertEqual(expected, os.environ)
1101 self.assertEqual('3', os.environ[overridden_key])
1102
1103 self._test_underlying_process_env('_A_', '1')
1104 self._test_underlying_process_env(overridden_key, '3')
1105
1106 def test_ror_operator(self):
1107 overridden_key = '_TEST_VAR_'
1108 original_value = 'original_value'
1109 os.environ[overridden_key] = original_value
1110
1111 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1112 expected = dict(new_vars_dict)
1113 expected.update(os.environ)
1114
1115 actual = new_vars_dict | os.environ
1116 self.assertDictEqual(expected, actual)
1117 self.assertEqual(original_value, actual[overridden_key])
1118
1119 new_vars_items = new_vars_dict.items()
1120 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1121
1122 self._test_underlying_process_env('_A_', '')
1123 self._test_underlying_process_env(overridden_key, original_value)
1124
Victor Stinner6d101392013-04-14 16:35:04 +02001125
Tim Petersc4e09402003-04-25 07:11:48 +00001126class WalkTests(unittest.TestCase):
1127 """Tests for os.walk()."""
1128
Victor Stinner0561c532015-03-12 10:28:24 +01001129 # Wrapper to hide minor differences between os.walk and os.fwalk
1130 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001131 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001132 if 'follow_symlinks' in kwargs:
1133 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001134 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001135
Charles-François Natali7372b062012-02-05 15:15:38 +01001136 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001137 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001138 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001139
1140 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001141 # TESTFN/
1142 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001143 # tmp1
1144 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001145 # tmp2
1146 # SUB11/ no kids
1147 # SUB2/ a file kid and a dirsymlink kid
1148 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001149 # SUB21/ not readable
1150 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001151 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001152 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001153 # broken_link2
1154 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001155 # TEST2/
1156 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001157 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001158 self.sub1_path = join(self.walk_path, "SUB1")
1159 self.sub11_path = join(self.sub1_path, "SUB11")
1160 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001161 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001162 tmp1_path = join(self.walk_path, "tmp1")
1163 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001164 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001165 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001166 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001167 t2_path = join(os_helper.TESTFN, "TEST2")
1168 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001169 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001170 broken_link2_path = join(sub2_path, "broken_link2")
1171 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001172
1173 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001174 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001175 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001176 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001177 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001178
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001179 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001180 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001181 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001182
Hai Shi0c4f0f32020-06-30 21:46:31 +08001183 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001184 os.symlink(os.path.abspath(t2_path), self.link_path)
1185 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001186 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1187 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001188 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001189 ["broken_link", "broken_link2", "broken_link3",
1190 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001191 else:
pxinwr3e028b22019-02-15 13:04:47 +08001192 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001193
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001194 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001195 try:
1196 os.listdir(sub21_path)
1197 except PermissionError:
1198 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1199 else:
1200 os.chmod(sub21_path, stat.S_IRWXU)
1201 os.unlink(tmp5_path)
1202 os.rmdir(sub21_path)
1203 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001204
Victor Stinner0561c532015-03-12 10:28:24 +01001205 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001206 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001207 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001208
Tim Petersc4e09402003-04-25 07:11:48 +00001209 self.assertEqual(len(all), 4)
1210 # We can't know which order SUB1 and SUB2 will appear in.
1211 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1212 # flipped: TESTFN, SUB2, SUB1, SUB11
1213 flipped = all[0][1][0] != "SUB1"
1214 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001215 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001216 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001217 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1218 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1219 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1220 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001221
Brett Cannon3f9183b2016-08-26 14:44:48 -07001222 def test_walk_prune(self, walk_path=None):
1223 if walk_path is None:
1224 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001225 # Prune the search.
1226 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001227 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001228 all.append((root, dirs, files))
1229 # Don't descend into SUB1.
1230 if 'SUB1' in dirs:
1231 # Note that this also mutates the dirs we appended to all!
1232 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001233
Victor Stinner0561c532015-03-12 10:28:24 +01001234 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001235 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001236
1237 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001238 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001239 self.assertEqual(all[1], self.sub2_tree)
1240
Brett Cannon3f9183b2016-08-26 14:44:48 -07001241 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001242 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001243
Victor Stinner0561c532015-03-12 10:28:24 +01001244 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001245 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001246 all = list(self.walk(self.walk_path, topdown=False))
1247
Victor Stinner53b0a412016-03-26 01:12:36 +01001248 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001249 # We can't know which order SUB1 and SUB2 will appear in.
1250 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1251 # flipped: SUB2, SUB11, SUB1, TESTFN
1252 flipped = all[3][1][0] != "SUB1"
1253 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001254 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001255 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001256 self.assertEqual(all[3],
1257 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1258 self.assertEqual(all[flipped],
1259 (self.sub11_path, [], []))
1260 self.assertEqual(all[flipped + 1],
1261 (self.sub1_path, ["SUB11"], ["tmp2"]))
1262 self.assertEqual(all[2 - 2 * flipped],
1263 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001264
Victor Stinner0561c532015-03-12 10:28:24 +01001265 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001266 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001267 self.skipTest("need symlink support")
1268
1269 # Walk, following symlinks.
1270 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1271 for root, dirs, files in walk_it:
1272 if root == self.link_path:
1273 self.assertEqual(dirs, [])
1274 self.assertEqual(files, ["tmp4"])
1275 break
1276 else:
1277 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001278
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001279 def test_walk_bad_dir(self):
1280 # Walk top-down.
1281 errors = []
1282 walk_it = self.walk(self.walk_path, onerror=errors.append)
1283 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001284 self.assertEqual(errors, [])
1285 dir1 = 'SUB1'
1286 path1 = os.path.join(root, dir1)
1287 path1new = os.path.join(root, dir1 + '.new')
1288 os.rename(path1, path1new)
1289 try:
1290 roots = [r for r, d, f in walk_it]
1291 self.assertTrue(errors)
1292 self.assertNotIn(path1, roots)
1293 self.assertNotIn(path1new, roots)
1294 for dir2 in dirs:
1295 if dir2 != dir1:
1296 self.assertIn(os.path.join(root, dir2), roots)
1297 finally:
1298 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001299
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001300 def test_walk_many_open_files(self):
1301 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001302 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001303 p = os.path.join(base, *(['d']*depth))
1304 os.makedirs(p)
1305
1306 iters = [self.walk(base, topdown=False) for j in range(100)]
1307 for i in range(depth + 1):
1308 expected = (p, ['d'] if i else [], [])
1309 for it in iters:
1310 self.assertEqual(next(it), expected)
1311 p = os.path.dirname(p)
1312
1313 iters = [self.walk(base, topdown=True) for j in range(100)]
1314 p = base
1315 for i in range(depth + 1):
1316 expected = (p, ['d'] if i < depth else [], [])
1317 for it in iters:
1318 self.assertEqual(next(it), expected)
1319 p = os.path.join(p, 'd')
1320
Charles-François Natali7372b062012-02-05 15:15:38 +01001321
1322@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1323class FwalkTests(WalkTests):
1324 """Tests for os.fwalk()."""
1325
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001326 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001327 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001328 yield (root, dirs, files)
1329
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001330 def fwalk(self, *args, **kwargs):
1331 return os.fwalk(*args, **kwargs)
1332
Larry Hastingsc48fe982012-06-25 04:49:05 -07001333 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1334 """
1335 compare with walk() results.
1336 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001337 walk_kwargs = walk_kwargs.copy()
1338 fwalk_kwargs = fwalk_kwargs.copy()
1339 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1340 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1341 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001342
Charles-François Natali7372b062012-02-05 15:15:38 +01001343 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001344 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001345 expected[root] = (set(dirs), set(files))
1346
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001347 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001348 self.assertIn(root, expected)
1349 self.assertEqual(expected[root], (set(dirs), set(files)))
1350
Larry Hastingsc48fe982012-06-25 04:49:05 -07001351 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001352 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001353 self._compare_to_walk(kwargs, kwargs)
1354
Charles-François Natali7372b062012-02-05 15:15:38 +01001355 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001356 try:
1357 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001358 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001359 fwalk_kwargs = walk_kwargs.copy()
1360 fwalk_kwargs['dir_fd'] = fd
1361 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1362 finally:
1363 os.close(fd)
1364
1365 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001366 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001367 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001368 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001369 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001370 # check that the FD is valid
1371 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001372 # redundant check
1373 os.stat(rootfd)
1374 # check that listdir() returns consistent information
1375 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001376
1377 def test_fd_leak(self):
1378 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1379 # we both check that calling fwalk() a large number of times doesn't
1380 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1381 minfd = os.dup(1)
1382 os.close(minfd)
1383 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001384 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001385 pass
1386 newfd = os.dup(1)
1387 self.addCleanup(os.close, newfd)
1388 self.assertEqual(newfd, minfd)
1389
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001390 # fwalk() keeps file descriptors open
1391 test_walk_many_open_files = None
1392
1393
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001394class BytesWalkTests(WalkTests):
1395 """Tests for os.walk() with bytes."""
1396 def walk(self, top, **kwargs):
1397 if 'follow_symlinks' in kwargs:
1398 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1399 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1400 root = os.fsdecode(broot)
1401 dirs = list(map(os.fsdecode, bdirs))
1402 files = list(map(os.fsdecode, bfiles))
1403 yield (root, dirs, files)
1404 bdirs[:] = list(map(os.fsencode, dirs))
1405 bfiles[:] = list(map(os.fsencode, files))
1406
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001407@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1408class BytesFwalkTests(FwalkTests):
1409 """Tests for os.walk() with bytes."""
1410 def fwalk(self, top='.', *args, **kwargs):
1411 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1412 root = os.fsdecode(broot)
1413 dirs = list(map(os.fsdecode, bdirs))
1414 files = list(map(os.fsdecode, bfiles))
1415 yield (root, dirs, files, topfd)
1416 bdirs[:] = list(map(os.fsencode, dirs))
1417 bfiles[:] = list(map(os.fsencode, files))
1418
Charles-François Natali7372b062012-02-05 15:15:38 +01001419
Guido van Rossume7ba4952007-06-06 23:52:48 +00001420class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001421 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001422 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001423
1424 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001425 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001426 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1427 os.makedirs(path) # Should work
1428 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1429 os.makedirs(path)
1430
1431 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001432 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001433 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1434 os.makedirs(path)
1435 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1436 'dir5', 'dir6')
1437 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001438
Serhiy Storchakae304e332017-03-24 13:27:42 +02001439 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001440 with os_helper.temp_umask(0o002):
1441 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001442 parent = os.path.join(base, 'dir1')
1443 path = os.path.join(parent, 'dir2')
1444 os.makedirs(path, 0o555)
1445 self.assertTrue(os.path.exists(path))
1446 self.assertTrue(os.path.isdir(path))
1447 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001448 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1449 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001450
Terry Reedy5a22b652010-12-02 07:05:56 +00001451 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001452 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001453 mode = 0o777
1454 old_mask = os.umask(0o022)
1455 os.makedirs(path, mode)
1456 self.assertRaises(OSError, os.makedirs, path, mode)
1457 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001458 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001459 os.makedirs(path, mode=mode, exist_ok=True)
1460 os.umask(old_mask)
1461
Martin Pantera82642f2015-11-19 04:48:44 +00001462 # Issue #25583: A drive root could raise PermissionError on Windows
1463 os.makedirs(os.path.abspath('/'), exist_ok=True)
1464
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001465 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001466 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001467 S_ISGID = stat.S_ISGID
1468 mode = 0o777
1469 old_mask = os.umask(0o022)
1470 try:
1471 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001472 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001473 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001474 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001475 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001476 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001477 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001478 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1479 # The os should apply S_ISGID from the parent dir for us, but
1480 # this test need not depend on that behavior. Be explicit.
1481 os.makedirs(path, mode | S_ISGID)
1482 # http://bugs.python.org/issue14992
1483 # Should not fail when the bit is already set.
1484 os.makedirs(path, mode, exist_ok=True)
1485 # remove the bit.
1486 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001487 # May work even when the bit is not already set when demanded.
1488 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001489 finally:
1490 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001491
1492 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001493 base = os_helper.TESTFN
1494 path = os.path.join(os_helper.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001495 with open(path, 'w') as f:
1496 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001497 self.assertRaises(OSError, os.makedirs, path)
1498 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1499 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1500 os.remove(path)
1501
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001502 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001503 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001504 'dir4', 'dir5', 'dir6')
1505 # If the tests failed, the bottom-most directory ('../dir6')
1506 # may not have been created, so we look for the outermost directory
1507 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001508 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001509 path = os.path.dirname(path)
1510
1511 os.removedirs(path)
1512
Andrew Svetlov405faed2012-12-25 12:18:09 +02001513
R David Murrayf2ad1732014-12-25 18:36:56 -05001514@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1515class ChownFileTests(unittest.TestCase):
1516
Berker Peksag036a71b2015-07-21 09:29:48 +03001517 @classmethod
1518 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001519 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001520
1521 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001522 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001523 uid = stat.st_uid
1524 gid = stat.st_gid
1525 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001526 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1527 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1528 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1529 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001530
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001531 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1532 def test_chown_gid(self):
1533 groups = os.getgroups()
1534 if len(groups) < 2:
1535 self.skipTest("test needs at least 2 groups")
1536
R David Murrayf2ad1732014-12-25 18:36:56 -05001537 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001538 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001539
Hai Shi0c4f0f32020-06-30 21:46:31 +08001540 os.chown(os_helper.TESTFN, uid, gid_1)
1541 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001542 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001543
Hai Shi0c4f0f32020-06-30 21:46:31 +08001544 os.chown(os_helper.TESTFN, uid, gid_2)
1545 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001546 self.assertEqual(gid, gid_2)
1547
1548 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1549 "test needs root privilege and more than one user")
1550 def test_chown_with_root(self):
1551 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001552 gid = os.stat(os_helper.TESTFN).st_gid
1553 os.chown(os_helper.TESTFN, uid_1, gid)
1554 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001555 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001556 os.chown(os_helper.TESTFN, uid_2, gid)
1557 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001558 self.assertEqual(uid, uid_2)
1559
1560 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1561 "test needs non-root account and more than one user")
1562 def test_chown_without_permission(self):
1563 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001564 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001565 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001566 os.chown(os_helper.TESTFN, uid_1, gid)
1567 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001568
Berker Peksag036a71b2015-07-21 09:29:48 +03001569 @classmethod
1570 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001571 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001572
1573
Andrew Svetlov405faed2012-12-25 12:18:09 +02001574class RemoveDirsTests(unittest.TestCase):
1575 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001576 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001577
1578 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001579 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001580
1581 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001582 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001583 os.mkdir(dira)
1584 dirb = os.path.join(dira, 'dirb')
1585 os.mkdir(dirb)
1586 os.removedirs(dirb)
1587 self.assertFalse(os.path.exists(dirb))
1588 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001589 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001590
1591 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001592 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001593 os.mkdir(dira)
1594 dirb = os.path.join(dira, 'dirb')
1595 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001596 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001597 os.removedirs(dirb)
1598 self.assertFalse(os.path.exists(dirb))
1599 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001600 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001601
1602 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001603 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001604 os.mkdir(dira)
1605 dirb = os.path.join(dira, 'dirb')
1606 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001607 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001608 with self.assertRaises(OSError):
1609 os.removedirs(dirb)
1610 self.assertTrue(os.path.exists(dirb))
1611 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001612 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001613
1614
Guido van Rossume7ba4952007-06-06 23:52:48 +00001615class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001616 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001617 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001618 f.write(b'hello')
1619 f.close()
1620 with open(os.devnull, 'rb') as f:
1621 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001622
Andrew Svetlov405faed2012-12-25 12:18:09 +02001623
Guido van Rossume7ba4952007-06-06 23:52:48 +00001624class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001625 def test_urandom_length(self):
1626 self.assertEqual(len(os.urandom(0)), 0)
1627 self.assertEqual(len(os.urandom(1)), 1)
1628 self.assertEqual(len(os.urandom(10)), 10)
1629 self.assertEqual(len(os.urandom(100)), 100)
1630 self.assertEqual(len(os.urandom(1000)), 1000)
1631
1632 def test_urandom_value(self):
1633 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001634 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001635 data2 = os.urandom(16)
1636 self.assertNotEqual(data1, data2)
1637
1638 def get_urandom_subprocess(self, count):
1639 code = '\n'.join((
1640 'import os, sys',
1641 'data = os.urandom(%s)' % count,
1642 'sys.stdout.buffer.write(data)',
1643 'sys.stdout.buffer.flush()'))
1644 out = assert_python_ok('-c', code)
1645 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001646 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001647 return stdout
1648
1649 def test_urandom_subprocess(self):
1650 data1 = self.get_urandom_subprocess(16)
1651 data2 = self.get_urandom_subprocess(16)
1652 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001653
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001654
Victor Stinner9b1f4742016-09-06 16:18:52 -07001655@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1656class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001657 @classmethod
1658 def setUpClass(cls):
1659 try:
1660 os.getrandom(1)
1661 except OSError as exc:
1662 if exc.errno == errno.ENOSYS:
1663 # Python compiled on a more recent Linux version
1664 # than the current Linux kernel
1665 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1666 else:
1667 raise
1668
Victor Stinner9b1f4742016-09-06 16:18:52 -07001669 def test_getrandom_type(self):
1670 data = os.getrandom(16)
1671 self.assertIsInstance(data, bytes)
1672 self.assertEqual(len(data), 16)
1673
1674 def test_getrandom0(self):
1675 empty = os.getrandom(0)
1676 self.assertEqual(empty, b'')
1677
1678 def test_getrandom_random(self):
1679 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1680
1681 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1682 # resource /dev/random
1683
1684 def test_getrandom_nonblock(self):
1685 # The call must not fail. Check also that the flag exists
1686 try:
1687 os.getrandom(1, os.GRND_NONBLOCK)
1688 except BlockingIOError:
1689 # System urandom is not initialized yet
1690 pass
1691
1692 def test_getrandom_value(self):
1693 data1 = os.getrandom(16)
1694 data2 = os.getrandom(16)
1695 self.assertNotEqual(data1, data2)
1696
1697
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001698# os.urandom() doesn't use a file descriptor when it is implemented with the
1699# getentropy() function, the getrandom() function or the getrandom() syscall
1700OS_URANDOM_DONT_USE_FD = (
1701 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1702 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1703 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001704
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001705@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1706 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001707@unittest.skipIf(sys.platform == "vxworks",
1708 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001709class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001710 @unittest.skipUnless(resource, "test requires the resource module")
1711 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001712 # Check urandom() failing when it is not able to open /dev/random.
1713 # We spawn a new process to make the test more robust (if getrlimit()
1714 # failed to restore the file descriptor limit after this, the whole
1715 # test suite would crash; this actually happened on the OS X Tiger
1716 # buildbot).
1717 code = """if 1:
1718 import errno
1719 import os
1720 import resource
1721
1722 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1723 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1724 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001725 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001726 except OSError as e:
1727 assert e.errno == errno.EMFILE, e.errno
1728 else:
1729 raise AssertionError("OSError not raised")
1730 """
1731 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001732
Antoine Pitroue472aea2014-04-26 14:33:03 +02001733 def test_urandom_fd_closed(self):
1734 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1735 # closed.
1736 code = """if 1:
1737 import os
1738 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001739 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001740 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001741 with test.support.SuppressCrashReport():
1742 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001743 sys.stdout.buffer.write(os.urandom(4))
1744 """
1745 rc, out, err = assert_python_ok('-Sc', code)
1746
1747 def test_urandom_fd_reopened(self):
1748 # Issue #21207: urandom() should detect its fd to /dev/urandom
1749 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001750 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1751 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001752
Antoine Pitroue472aea2014-04-26 14:33:03 +02001753 code = """if 1:
1754 import os
1755 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001756 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001757 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001758 with test.support.SuppressCrashReport():
1759 for fd in range(3, 256):
1760 try:
1761 os.close(fd)
1762 except OSError:
1763 pass
1764 else:
1765 # Found the urandom fd (XXX hopefully)
1766 break
1767 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001768 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001769 new_fd = f.fileno()
1770 # Issue #26935: posix allows new_fd and fd to be equal but
1771 # some libc implementations have dup2 return an error in this
1772 # case.
1773 if new_fd != fd:
1774 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001775 sys.stdout.buffer.write(os.urandom(4))
1776 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001777 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001778 rc, out, err = assert_python_ok('-Sc', code)
1779 self.assertEqual(len(out), 8)
1780 self.assertNotEqual(out[0:4], out[4:8])
1781 rc, out2, err2 = assert_python_ok('-Sc', code)
1782 self.assertEqual(len(out2), 8)
1783 self.assertNotEqual(out2, out)
1784
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001785
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001786@contextlib.contextmanager
1787def _execvpe_mockup(defpath=None):
1788 """
1789 Stubs out execv and execve functions when used as context manager.
1790 Records exec calls. The mock execv and execve functions always raise an
1791 exception as they would normally never return.
1792 """
1793 # A list of tuples containing (function name, first arg, args)
1794 # of calls to execv or execve that have been made.
1795 calls = []
1796
1797 def mock_execv(name, *args):
1798 calls.append(('execv', name, args))
1799 raise RuntimeError("execv called")
1800
1801 def mock_execve(name, *args):
1802 calls.append(('execve', name, args))
1803 raise OSError(errno.ENOTDIR, "execve called")
1804
1805 try:
1806 orig_execv = os.execv
1807 orig_execve = os.execve
1808 orig_defpath = os.defpath
1809 os.execv = mock_execv
1810 os.execve = mock_execve
1811 if defpath is not None:
1812 os.defpath = defpath
1813 yield calls
1814 finally:
1815 os.execv = orig_execv
1816 os.execve = orig_execve
1817 os.defpath = orig_defpath
1818
pxinwrf2d7ac72019-05-21 18:46:37 +08001819@unittest.skipUnless(hasattr(os, 'execv'),
1820 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001821class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001822 @unittest.skipIf(USING_LINUXTHREADS,
1823 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001824 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001825 self.assertRaises(OSError, os.execvpe, 'no such app-',
1826 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001827
Steve Dowerbce26262016-11-19 19:17:26 -08001828 def test_execv_with_bad_arglist(self):
1829 self.assertRaises(ValueError, os.execv, 'notepad', ())
1830 self.assertRaises(ValueError, os.execv, 'notepad', [])
1831 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1832 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1833
Thomas Heller6790d602007-08-30 17:15:14 +00001834 def test_execvpe_with_bad_arglist(self):
1835 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001836 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1837 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001838
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001839 @unittest.skipUnless(hasattr(os, '_execvpe'),
1840 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001841 def _test_internal_execvpe(self, test_type):
1842 program_path = os.sep + 'absolutepath'
1843 if test_type is bytes:
1844 program = b'executable'
1845 fullpath = os.path.join(os.fsencode(program_path), program)
1846 native_fullpath = fullpath
1847 arguments = [b'progname', 'arg1', 'arg2']
1848 else:
1849 program = 'executable'
1850 arguments = ['progname', 'arg1', 'arg2']
1851 fullpath = os.path.join(program_path, program)
1852 if os.name != "nt":
1853 native_fullpath = os.fsencode(fullpath)
1854 else:
1855 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001856 env = {'spam': 'beans'}
1857
Victor Stinnerb745a742010-05-18 17:17:23 +00001858 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001859 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001860 self.assertRaises(RuntimeError,
1861 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001862 self.assertEqual(len(calls), 1)
1863 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1864
Victor Stinnerb745a742010-05-18 17:17:23 +00001865 # test os._execvpe() with a relative path:
1866 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001867 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001868 self.assertRaises(OSError,
1869 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001870 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001871 self.assertSequenceEqual(calls[0],
1872 ('execve', native_fullpath, (arguments, env)))
1873
1874 # test os._execvpe() with a relative path:
1875 # os.get_exec_path() reads the 'PATH' variable
1876 with _execvpe_mockup() as calls:
1877 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001878 if test_type is bytes:
1879 env_path[b'PATH'] = program_path
1880 else:
1881 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001882 self.assertRaises(OSError,
1883 os._execvpe, program, arguments, env=env_path)
1884 self.assertEqual(len(calls), 1)
1885 self.assertSequenceEqual(calls[0],
1886 ('execve', native_fullpath, (arguments, env_path)))
1887
1888 def test_internal_execvpe_str(self):
1889 self._test_internal_execvpe(str)
1890 if os.name != "nt":
1891 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001892
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001893 def test_execve_invalid_env(self):
1894 args = [sys.executable, '-c', 'pass']
1895
Ville Skyttä49b27342017-08-03 09:00:59 +03001896 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001897 newenv = os.environ.copy()
1898 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1899 with self.assertRaises(ValueError):
1900 os.execve(args[0], args, newenv)
1901
Ville Skyttä49b27342017-08-03 09:00:59 +03001902 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001903 newenv = os.environ.copy()
1904 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1905 with self.assertRaises(ValueError):
1906 os.execve(args[0], args, newenv)
1907
Ville Skyttä49b27342017-08-03 09:00:59 +03001908 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001909 newenv = os.environ.copy()
1910 newenv["FRUIT=ORANGE"] = "lemon"
1911 with self.assertRaises(ValueError):
1912 os.execve(args[0], args, newenv)
1913
Alexey Izbyshev83460312018-10-20 03:28:22 +03001914 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1915 def test_execve_with_empty_path(self):
1916 # bpo-32890: Check GetLastError() misuse
1917 try:
1918 os.execve('', ['arg'], {})
1919 except OSError as e:
1920 self.assertTrue(e.winerror is None or e.winerror != 0)
1921 else:
1922 self.fail('No OSError raised')
1923
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001924
Serhiy Storchaka43767632013-11-03 21:31:38 +02001925@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001926class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001927 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001928 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001929 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01001930 except FileNotFoundError:
1931 exists = False
1932 except OSError as exc:
1933 exists = True
1934 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08001935 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01001936 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001937 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001938
Thomas Wouters477c8d52006-05-27 19:21:47 +00001939 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001940 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001941
1942 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001943 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001944
1945 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001946 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001947
1948 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001949 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01001950
Hai Shi0c4f0f32020-06-30 21:46:31 +08001951 with open(os_helper.TESTFN, "x") as f:
1952 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001953
1954 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001955 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001956
Thomas Wouters477c8d52006-05-27 19:21:47 +00001957 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001958 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001959
Victor Stinnere77c9742016-03-25 10:28:23 +01001960
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001961class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001962 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001963 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1964 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001965 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001966 def get_single(f):
1967 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001968 if hasattr(os, f):
1969 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001970 return helper
1971 for f in singles:
1972 locals()["test_"+f] = get_single(f)
1973
Benjamin Peterson7522c742009-01-19 21:00:09 +00001974 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001975 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001976 f(os_helper.make_bad_fd(), *args)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001977 except OSError as e:
1978 self.assertEqual(e.errno, errno.EBADF)
1979 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001980 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001981 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001982
Serhiy Storchaka43767632013-11-03 21:31:38 +02001983 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001984 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001985 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001986
Serhiy Storchaka43767632013-11-03 21:31:38 +02001987 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001988 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001989 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02001990 # Make sure none of the descriptors we are about to close are
1991 # currently valid (issue 6542).
1992 for i in range(10):
1993 try: os.fstat(fd+i)
1994 except OSError:
1995 pass
1996 else:
1997 break
1998 if i < 2:
1999 raise unittest.SkipTest(
2000 "Unable to acquire a range of invalid file descriptors")
2001 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002002
Serhiy Storchaka43767632013-11-03 21:31:38 +02002003 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002004 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002005 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002006
Serhiy Storchaka43767632013-11-03 21:31:38 +02002007 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002008 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002009 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002010
Serhiy Storchaka43767632013-11-03 21:31:38 +02002011 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002012 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002013 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002014
Serhiy Storchaka43767632013-11-03 21:31:38 +02002015 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002016 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002017 self.check(os.pathconf, "PC_NAME_MAX")
2018 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002019
Serhiy Storchaka43767632013-11-03 21:31:38 +02002020 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002021 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002022 self.check(os.truncate, 0)
2023 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002024
Serhiy Storchaka43767632013-11-03 21:31:38 +02002025 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002026 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002027 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002028
Serhiy Storchaka43767632013-11-03 21:31:38 +02002029 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002030 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002031 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002032
Victor Stinner57ddf782014-01-08 15:21:28 +01002033 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2034 def test_readv(self):
2035 buf = bytearray(10)
2036 self.check(os.readv, [buf])
2037
Serhiy Storchaka43767632013-11-03 21:31:38 +02002038 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002039 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002040 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002041
Serhiy Storchaka43767632013-11-03 21:31:38 +02002042 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002043 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002044 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002045
Victor Stinner57ddf782014-01-08 15:21:28 +01002046 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2047 def test_writev(self):
2048 self.check(os.writev, [b'abc'])
2049
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002050 def test_inheritable(self):
2051 self.check(os.get_inheritable)
2052 self.check(os.set_inheritable, True)
2053
2054 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2055 'needs os.get_blocking() and os.set_blocking()')
2056 def test_blocking(self):
2057 self.check(os.get_blocking)
2058 self.check(os.set_blocking, True)
2059
Brian Curtin1b9df392010-11-24 20:24:31 +00002060
2061class LinkTests(unittest.TestCase):
2062 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002063 self.file1 = os_helper.TESTFN
2064 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002065
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002066 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002067 for file in (self.file1, self.file2):
2068 if os.path.exists(file):
2069 os.unlink(file)
2070
Brian Curtin1b9df392010-11-24 20:24:31 +00002071 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002072 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002073
xdegaye6a55d092017-11-12 17:57:04 +01002074 try:
2075 os.link(file1, file2)
2076 except PermissionError as e:
2077 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002078 with open(file1, "r") as f1, open(file2, "r") as f2:
2079 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2080
2081 def test_link(self):
2082 self._test_link(self.file1, self.file2)
2083
2084 def test_link_bytes(self):
2085 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2086 bytes(self.file2, sys.getfilesystemencoding()))
2087
Brian Curtinf498b752010-11-30 15:54:04 +00002088 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002089 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002090 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002091 except UnicodeError:
2092 raise unittest.SkipTest("Unable to encode for this platform.")
2093
Brian Curtinf498b752010-11-30 15:54:04 +00002094 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002095 self.file2 = self.file1 + "2"
2096 self._test_link(self.file1, self.file2)
2097
Serhiy Storchaka43767632013-11-03 21:31:38 +02002098@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2099class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002100 # uid_t and gid_t are 32-bit unsigned integers on Linux
2101 UID_OVERFLOW = (1 << 32)
2102 GID_OVERFLOW = (1 << 32)
2103
Serhiy Storchaka43767632013-11-03 21:31:38 +02002104 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2105 def test_setuid(self):
2106 if os.getuid() != 0:
2107 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002108 self.assertRaises(TypeError, os.setuid, 'not an int')
2109 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002110
Serhiy Storchaka43767632013-11-03 21:31:38 +02002111 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2112 def test_setgid(self):
2113 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2114 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002115 self.assertRaises(TypeError, os.setgid, 'not an int')
2116 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002117
Serhiy Storchaka43767632013-11-03 21:31:38 +02002118 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2119 def test_seteuid(self):
2120 if os.getuid() != 0:
2121 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002122 self.assertRaises(TypeError, os.setegid, 'not an int')
2123 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002124
Serhiy Storchaka43767632013-11-03 21:31:38 +02002125 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2126 def test_setegid(self):
2127 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2128 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002129 self.assertRaises(TypeError, os.setegid, 'not an int')
2130 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002131
Serhiy Storchaka43767632013-11-03 21:31:38 +02002132 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2133 def test_setreuid(self):
2134 if os.getuid() != 0:
2135 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002136 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2137 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2138 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2139 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002140
Serhiy Storchaka43767632013-11-03 21:31:38 +02002141 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2142 def test_setreuid_neg1(self):
2143 # Needs to accept -1. We run this in a subprocess to avoid
2144 # altering the test runner's process state (issue8045).
2145 subprocess.check_call([
2146 sys.executable, '-c',
2147 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002148
Serhiy Storchaka43767632013-11-03 21:31:38 +02002149 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2150 def test_setregid(self):
2151 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2152 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002153 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2154 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2155 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2156 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002157
Serhiy Storchaka43767632013-11-03 21:31:38 +02002158 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2159 def test_setregid_neg1(self):
2160 # Needs to accept -1. We run this in a subprocess to avoid
2161 # altering the test runner's process state (issue8045).
2162 subprocess.check_call([
2163 sys.executable, '-c',
2164 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002165
Serhiy Storchaka43767632013-11-03 21:31:38 +02002166@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2167class Pep383Tests(unittest.TestCase):
2168 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002169 if os_helper.TESTFN_UNENCODABLE:
2170 self.dir = os_helper.TESTFN_UNENCODABLE
2171 elif os_helper.TESTFN_NONASCII:
2172 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002173 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002174 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002175 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002176
Serhiy Storchaka43767632013-11-03 21:31:38 +02002177 bytesfn = []
2178 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002179 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002180 fn = os.fsencode(fn)
2181 except UnicodeEncodeError:
2182 return
2183 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002184 add_filename(os_helper.TESTFN_UNICODE)
2185 if os_helper.TESTFN_UNENCODABLE:
2186 add_filename(os_helper.TESTFN_UNENCODABLE)
2187 if os_helper.TESTFN_NONASCII:
2188 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002189 if not bytesfn:
2190 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002191
Serhiy Storchaka43767632013-11-03 21:31:38 +02002192 self.unicodefn = set()
2193 os.mkdir(self.dir)
2194 try:
2195 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002196 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002197 fn = os.fsdecode(fn)
2198 if fn in self.unicodefn:
2199 raise ValueError("duplicate filename")
2200 self.unicodefn.add(fn)
2201 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002202 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002203 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002204
Serhiy Storchaka43767632013-11-03 21:31:38 +02002205 def tearDown(self):
2206 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002207
Serhiy Storchaka43767632013-11-03 21:31:38 +02002208 def test_listdir(self):
2209 expected = self.unicodefn
2210 found = set(os.listdir(self.dir))
2211 self.assertEqual(found, expected)
2212 # test listdir without arguments
2213 current_directory = os.getcwd()
2214 try:
2215 os.chdir(os.sep)
2216 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2217 finally:
2218 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002219
Serhiy Storchaka43767632013-11-03 21:31:38 +02002220 def test_open(self):
2221 for fn in self.unicodefn:
2222 f = open(os.path.join(self.dir, fn), 'rb')
2223 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002224
Serhiy Storchaka43767632013-11-03 21:31:38 +02002225 @unittest.skipUnless(hasattr(os, 'statvfs'),
2226 "need os.statvfs()")
2227 def test_statvfs(self):
2228 # issue #9645
2229 for fn in self.unicodefn:
2230 # should not fail with file not found error
2231 fullname = os.path.join(self.dir, fn)
2232 os.statvfs(fullname)
2233
2234 def test_stat(self):
2235 for fn in self.unicodefn:
2236 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002237
Brian Curtineb24d742010-04-12 17:16:38 +00002238@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2239class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002240 def _kill(self, sig):
2241 # Start sys.executable as a subprocess and communicate from the
2242 # subprocess to the parent that the interpreter is ready. When it
2243 # becomes ready, send *sig* via os.kill to the subprocess and check
2244 # that the return code is equal to *sig*.
2245 import ctypes
2246 from ctypes import wintypes
2247 import msvcrt
2248
2249 # Since we can't access the contents of the process' stdout until the
2250 # process has exited, use PeekNamedPipe to see what's inside stdout
2251 # without waiting. This is done so we can tell that the interpreter
2252 # is started and running at a point where it could handle a signal.
2253 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2254 PeekNamedPipe.restype = wintypes.BOOL
2255 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2256 ctypes.POINTER(ctypes.c_char), # stdout buf
2257 wintypes.DWORD, # Buffer size
2258 ctypes.POINTER(wintypes.DWORD), # bytes read
2259 ctypes.POINTER(wintypes.DWORD), # bytes avail
2260 ctypes.POINTER(wintypes.DWORD)) # bytes left
2261 msg = "running"
2262 proc = subprocess.Popen([sys.executable, "-c",
2263 "import sys;"
2264 "sys.stdout.write('{}');"
2265 "sys.stdout.flush();"
2266 "input()".format(msg)],
2267 stdout=subprocess.PIPE,
2268 stderr=subprocess.PIPE,
2269 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002270 self.addCleanup(proc.stdout.close)
2271 self.addCleanup(proc.stderr.close)
2272 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002273
2274 count, max = 0, 100
2275 while count < max and proc.poll() is None:
2276 # Create a string buffer to store the result of stdout from the pipe
2277 buf = ctypes.create_string_buffer(len(msg))
2278 # Obtain the text currently in proc.stdout
2279 # Bytes read/avail/left are left as NULL and unused
2280 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2281 buf, ctypes.sizeof(buf), None, None, None)
2282 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2283 if buf.value:
2284 self.assertEqual(msg, buf.value.decode())
2285 break
2286 time.sleep(0.1)
2287 count += 1
2288 else:
2289 self.fail("Did not receive communication from the subprocess")
2290
Brian Curtineb24d742010-04-12 17:16:38 +00002291 os.kill(proc.pid, sig)
2292 self.assertEqual(proc.wait(), sig)
2293
2294 def test_kill_sigterm(self):
2295 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002296 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002297
2298 def test_kill_int(self):
2299 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002300 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002301
2302 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002303 tagname = "test_os_%s" % uuid.uuid1()
2304 m = mmap.mmap(-1, 1, tagname)
2305 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002306 # Run a script which has console control handling enabled.
2307 proc = subprocess.Popen([sys.executable,
2308 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002309 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002310 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2311 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002312 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002313 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002314 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002315 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002316 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002317 count += 1
2318 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002319 # Forcefully kill the process if we weren't able to signal it.
2320 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002321 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002322 os.kill(proc.pid, event)
2323 # proc.send_signal(event) could also be done here.
2324 # Allow time for the signal to be passed and the process to exit.
2325 time.sleep(0.5)
2326 if not proc.poll():
2327 # Forcefully kill the process if we weren't able to signal it.
2328 os.kill(proc.pid, signal.SIGINT)
2329 self.fail("subprocess did not stop on {}".format(name))
2330
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002331 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002332 def test_CTRL_C_EVENT(self):
2333 from ctypes import wintypes
2334 import ctypes
2335
2336 # Make a NULL value by creating a pointer with no argument.
2337 NULL = ctypes.POINTER(ctypes.c_int)()
2338 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2339 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2340 wintypes.BOOL)
2341 SetConsoleCtrlHandler.restype = wintypes.BOOL
2342
2343 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002344 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002345 # by subprocesses.
2346 SetConsoleCtrlHandler(NULL, 0)
2347
2348 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2349
2350 def test_CTRL_BREAK_EVENT(self):
2351 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2352
2353
Brian Curtind40e6f72010-07-08 21:39:08 +00002354@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002355class Win32ListdirTests(unittest.TestCase):
2356 """Test listdir on Windows."""
2357
2358 def setUp(self):
2359 self.created_paths = []
2360 for i in range(2):
2361 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002362 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002363 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002364 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002365 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002366 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002367 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2368 self.created_paths.extend([dir_name, file_name])
2369 self.created_paths.sort()
2370
2371 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002372 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002373
2374 def test_listdir_no_extended_path(self):
2375 """Test when the path is not an "extended" path."""
2376 # unicode
2377 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002378 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002379 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002380
Tim Golden781bbeb2013-10-25 20:24:06 +01002381 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002382 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002383 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002384 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002385
2386 def test_listdir_extended_path(self):
2387 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002388 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002389 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002390 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002391 self.assertEqual(
2392 sorted(os.listdir(path)),
2393 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002394
Tim Golden781bbeb2013-10-25 20:24:06 +01002395 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002396 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002397 self.assertEqual(
2398 sorted(os.listdir(path)),
2399 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002400
2401
Berker Peksage0b5b202018-08-15 13:03:41 +03002402@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2403class ReadlinkTests(unittest.TestCase):
2404 filelink = 'readlinktest'
2405 filelink_target = os.path.abspath(__file__)
2406 filelinkb = os.fsencode(filelink)
2407 filelinkb_target = os.fsencode(filelink_target)
2408
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002409 def assertPathEqual(self, left, right):
2410 left = os.path.normcase(left)
2411 right = os.path.normcase(right)
2412 if sys.platform == 'win32':
2413 # Bad practice to blindly strip the prefix as it may be required to
2414 # correctly refer to the file, but we're only comparing paths here.
2415 has_prefix = lambda p: p.startswith(
2416 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2417 if has_prefix(left):
2418 left = left[4:]
2419 if has_prefix(right):
2420 right = right[4:]
2421 self.assertEqual(left, right)
2422
Berker Peksage0b5b202018-08-15 13:03:41 +03002423 def setUp(self):
2424 self.assertTrue(os.path.exists(self.filelink_target))
2425 self.assertTrue(os.path.exists(self.filelinkb_target))
2426 self.assertFalse(os.path.exists(self.filelink))
2427 self.assertFalse(os.path.exists(self.filelinkb))
2428
2429 def test_not_symlink(self):
2430 filelink_target = FakePath(self.filelink_target)
2431 self.assertRaises(OSError, os.readlink, self.filelink_target)
2432 self.assertRaises(OSError, os.readlink, filelink_target)
2433
2434 def test_missing_link(self):
2435 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2436 self.assertRaises(FileNotFoundError, os.readlink,
2437 FakePath('missing-link'))
2438
Hai Shi0c4f0f32020-06-30 21:46:31 +08002439 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002440 def test_pathlike(self):
2441 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002442 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002443 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002444 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002445
Hai Shi0c4f0f32020-06-30 21:46:31 +08002446 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002447 def test_pathlike_bytes(self):
2448 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002449 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002450 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002451 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002452 self.assertIsInstance(path, bytes)
2453
Hai Shi0c4f0f32020-06-30 21:46:31 +08002454 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002455 def test_bytes(self):
2456 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002457 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002458 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002459 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002460 self.assertIsInstance(path, bytes)
2461
2462
Tim Golden781bbeb2013-10-25 20:24:06 +01002463@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002464@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002465class Win32SymlinkTests(unittest.TestCase):
2466 filelink = 'filelinktest'
2467 filelink_target = os.path.abspath(__file__)
2468 dirlink = 'dirlinktest'
2469 dirlink_target = os.path.dirname(filelink_target)
2470 missing_link = 'missing link'
2471
2472 def setUp(self):
2473 assert os.path.exists(self.dirlink_target)
2474 assert os.path.exists(self.filelink_target)
2475 assert not os.path.exists(self.dirlink)
2476 assert not os.path.exists(self.filelink)
2477 assert not os.path.exists(self.missing_link)
2478
2479 def tearDown(self):
2480 if os.path.exists(self.filelink):
2481 os.remove(self.filelink)
2482 if os.path.exists(self.dirlink):
2483 os.rmdir(self.dirlink)
2484 if os.path.lexists(self.missing_link):
2485 os.remove(self.missing_link)
2486
2487 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002488 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002489 self.assertTrue(os.path.exists(self.dirlink))
2490 self.assertTrue(os.path.isdir(self.dirlink))
2491 self.assertTrue(os.path.islink(self.dirlink))
2492 self.check_stat(self.dirlink, self.dirlink_target)
2493
2494 def test_file_link(self):
2495 os.symlink(self.filelink_target, self.filelink)
2496 self.assertTrue(os.path.exists(self.filelink))
2497 self.assertTrue(os.path.isfile(self.filelink))
2498 self.assertTrue(os.path.islink(self.filelink))
2499 self.check_stat(self.filelink, self.filelink_target)
2500
2501 def _create_missing_dir_link(self):
2502 'Create a "directory" link to a non-existent target'
2503 linkname = self.missing_link
2504 if os.path.lexists(linkname):
2505 os.remove(linkname)
2506 target = r'c:\\target does not exist.29r3c740'
2507 assert not os.path.exists(target)
2508 target_is_dir = True
2509 os.symlink(target, linkname, target_is_dir)
2510
2511 def test_remove_directory_link_to_missing_target(self):
2512 self._create_missing_dir_link()
2513 # For compatibility with Unix, os.remove will check the
2514 # directory status and call RemoveDirectory if the symlink
2515 # was created with target_is_dir==True.
2516 os.remove(self.missing_link)
2517
Brian Curtind40e6f72010-07-08 21:39:08 +00002518 def test_isdir_on_directory_link_to_missing_target(self):
2519 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002520 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002521
Brian Curtind40e6f72010-07-08 21:39:08 +00002522 def test_rmdir_on_directory_link_to_missing_target(self):
2523 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002524 os.rmdir(self.missing_link)
2525
2526 def check_stat(self, link, target):
2527 self.assertEqual(os.stat(link), os.stat(target))
2528 self.assertNotEqual(os.lstat(link), os.stat(link))
2529
Brian Curtind25aef52011-06-13 15:16:04 -05002530 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002531 self.assertEqual(os.stat(bytes_link), os.stat(target))
2532 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002533
2534 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002535 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002536 level2 = os.path.join(level1, "level2")
2537 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002538 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002539
2540 os.mkdir(level1)
2541 os.mkdir(level2)
2542 os.mkdir(level3)
2543
2544 file1 = os.path.abspath(os.path.join(level1, "file1"))
2545 create_file(file1)
2546
2547 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002548 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002549 os.chdir(level2)
2550 link = os.path.join(level2, "link")
2551 os.symlink(os.path.relpath(file1), "link")
2552 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002553
Victor Stinnerae39d232016-03-24 17:12:55 +01002554 # Check os.stat calls from the same dir as the link
2555 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002556
Victor Stinnerae39d232016-03-24 17:12:55 +01002557 # Check os.stat calls from a dir below the link
2558 os.chdir(level1)
2559 self.assertEqual(os.stat(file1),
2560 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002561
Victor Stinnerae39d232016-03-24 17:12:55 +01002562 # Check os.stat calls from a dir above the link
2563 os.chdir(level3)
2564 self.assertEqual(os.stat(file1),
2565 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002566 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002567 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002568
SSE43c34aad2018-02-13 00:10:35 +07002569 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2570 and os.path.exists(r'C:\ProgramData'),
2571 'Test directories not found')
2572 def test_29248(self):
2573 # os.symlink() calls CreateSymbolicLink, which creates
2574 # the reparse data buffer with the print name stored
2575 # first, so the offset is always 0. CreateSymbolicLink
2576 # stores the "PrintName" DOS path (e.g. "C:\") first,
2577 # with an offset of 0, followed by the "SubstituteName"
2578 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2579 # the other hand, seems to have been created manually
2580 # with an inverted order.
2581 target = os.readlink(r'C:\Users\All Users')
2582 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2583
Steve Dower6921e732018-03-05 14:26:08 -08002584 def test_buffer_overflow(self):
2585 # Older versions would have a buffer overflow when detecting
2586 # whether a link source was a directory. This test ensures we
2587 # no longer crash, but does not otherwise validate the behavior
2588 segment = 'X' * 27
2589 path = os.path.join(*[segment] * 10)
2590 test_cases = [
2591 # overflow with absolute src
2592 ('\\' + path, segment),
2593 # overflow dest with relative src
2594 (segment, path),
2595 # overflow when joining src
2596 (path[:180], path[:180]),
2597 ]
2598 for src, dest in test_cases:
2599 try:
2600 os.symlink(src, dest)
2601 except FileNotFoundError:
2602 pass
2603 else:
2604 try:
2605 os.remove(dest)
2606 except OSError:
2607 pass
2608 # Also test with bytes, since that is a separate code path.
2609 try:
2610 os.symlink(os.fsencode(src), os.fsencode(dest))
2611 except FileNotFoundError:
2612 pass
2613 else:
2614 try:
2615 os.remove(dest)
2616 except OSError:
2617 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002618
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002619 def test_appexeclink(self):
2620 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002621 if not os.path.isdir(root):
2622 self.skipTest("test requires a WindowsApps directory")
2623
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002624 aliases = [os.path.join(root, a)
2625 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2626
2627 for alias in aliases:
2628 if support.verbose:
2629 print()
2630 print("Testing with", alias)
2631 st = os.lstat(alias)
2632 self.assertEqual(st, os.stat(alias))
2633 self.assertFalse(stat.S_ISLNK(st.st_mode))
2634 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2635 # testing the first one we see is sufficient
2636 break
2637 else:
2638 self.skipTest("test requires an app execution alias")
2639
Tim Golden0321cf22014-05-05 19:46:17 +01002640@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2641class Win32JunctionTests(unittest.TestCase):
2642 junction = 'junctiontest'
2643 junction_target = os.path.dirname(os.path.abspath(__file__))
2644
2645 def setUp(self):
2646 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002647 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002648
2649 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002650 if os.path.lexists(self.junction):
2651 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002652
2653 def test_create_junction(self):
2654 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002655 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002656 self.assertTrue(os.path.exists(self.junction))
2657 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002658 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2659 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002660
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002661 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002662 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002663 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2664 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002665
2666 def test_unlink_removes_junction(self):
2667 _winapi.CreateJunction(self.junction_target, self.junction)
2668 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002669 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002670
2671 os.unlink(self.junction)
2672 self.assertFalse(os.path.exists(self.junction))
2673
Mark Becwarb82bfac2019-02-02 16:08:23 -05002674@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2675class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002676 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002677 nt = support.import_module('nt')
2678 ctypes = support.import_module('ctypes')
2679 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002680
2681 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2682 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2683
2684 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2685 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2686 ctypes.wintypes.LPDWORD)
2687
2688 # This is a pseudo-handle that doesn't need to be closed
2689 hproc = kernel.GetCurrentProcess()
2690
2691 handle_count = ctypes.wintypes.DWORD()
2692 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2693 self.assertEqual(1, ok)
2694
2695 before_count = handle_count.value
2696
2697 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002698 filenames = [
2699 r'\\?\C:',
2700 r'\\?\NUL',
2701 r'\\?\CONIN',
2702 __file__,
2703 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002704
Berker Peksag6ef726a2019-04-22 18:46:28 +03002705 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002706 for name in filenames:
2707 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002708 nt._getfinalpathname(name)
2709 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002710 # Failure is expected
2711 pass
2712 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002713 os.stat(name)
2714 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002715 pass
2716
2717 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2718 self.assertEqual(1, ok)
2719
2720 handle_delta = handle_count.value - before_count
2721
2722 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002723
Hai Shi0c4f0f32020-06-30 21:46:31 +08002724@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002725class NonLocalSymlinkTests(unittest.TestCase):
2726
2727 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002728 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002729 Create this structure:
2730
2731 base
2732 \___ some_dir
2733 """
2734 os.makedirs('base/some_dir')
2735
2736 def tearDown(self):
2737 shutil.rmtree('base')
2738
2739 def test_directory_link_nonlocal(self):
2740 """
2741 The symlink target should resolve relative to the link, not relative
2742 to the current directory.
2743
2744 Then, link base/some_link -> base/some_dir and ensure that some_link
2745 is resolved as a directory.
2746
2747 In issue13772, it was discovered that directory detection failed if
2748 the symlink target was not specified relative to the current
2749 directory, which was a defect in the implementation.
2750 """
2751 src = os.path.join('base', 'some_link')
2752 os.symlink('some_dir', src)
2753 assert os.path.isdir(src)
2754
2755
Victor Stinnere8d51452010-08-19 01:05:19 +00002756class FSEncodingTests(unittest.TestCase):
2757 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002758 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2759 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002760
Victor Stinnere8d51452010-08-19 01:05:19 +00002761 def test_identity(self):
2762 # assert fsdecode(fsencode(x)) == x
2763 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2764 try:
2765 bytesfn = os.fsencode(fn)
2766 except UnicodeEncodeError:
2767 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002768 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002769
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002770
Brett Cannonefb00c02012-02-29 18:31:31 -05002771
2772class DeviceEncodingTests(unittest.TestCase):
2773
2774 def test_bad_fd(self):
2775 # Return None when an fd doesn't actually exist.
2776 self.assertIsNone(os.device_encoding(123456))
2777
Paul Monson62dfd7d2019-04-25 11:36:45 -07002778 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002779 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002780 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002781 def test_device_encoding(self):
2782 encoding = os.device_encoding(0)
2783 self.assertIsNotNone(encoding)
2784 self.assertTrue(codecs.lookup(encoding))
2785
2786
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002787class PidTests(unittest.TestCase):
2788 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2789 def test_getppid(self):
2790 p = subprocess.Popen([sys.executable, '-c',
2791 'import os; print(os.getppid())'],
2792 stdout=subprocess.PIPE)
2793 stdout, _ = p.communicate()
2794 # We are the parent of our subprocess
2795 self.assertEqual(int(stdout), os.getpid())
2796
Victor Stinner9bee32b2020-04-22 16:30:35 +02002797 def check_waitpid(self, code, exitcode, callback=None):
2798 if sys.platform == 'win32':
2799 # On Windows, os.spawnv() simply joins arguments with spaces:
2800 # arguments need to be quoted
2801 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2802 else:
2803 args = [sys.executable, '-c', code]
2804 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002805
Victor Stinner9bee32b2020-04-22 16:30:35 +02002806 if callback is not None:
2807 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002808
Victor Stinner9bee32b2020-04-22 16:30:35 +02002809 # don't use support.wait_process() to test directly os.waitpid()
2810 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002811 pid2, status = os.waitpid(pid, 0)
2812 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2813 self.assertEqual(pid2, pid)
2814
Victor Stinner9bee32b2020-04-22 16:30:35 +02002815 def test_waitpid(self):
2816 self.check_waitpid(code='pass', exitcode=0)
2817
2818 def test_waitstatus_to_exitcode(self):
2819 exitcode = 23
2820 code = f'import sys; sys.exit({exitcode})'
2821 self.check_waitpid(code, exitcode=exitcode)
2822
2823 with self.assertRaises(TypeError):
2824 os.waitstatus_to_exitcode(0.0)
2825
2826 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2827 def test_waitpid_windows(self):
2828 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2829 # with exit code larger than INT_MAX.
2830 STATUS_CONTROL_C_EXIT = 0xC000013A
2831 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2832 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2833
2834 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2835 def test_waitstatus_to_exitcode_windows(self):
2836 max_exitcode = 2 ** 32 - 1
2837 for exitcode in (0, 1, 5, max_exitcode):
2838 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2839 exitcode)
2840
2841 # invalid values
2842 with self.assertRaises(ValueError):
2843 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2844 with self.assertRaises(OverflowError):
2845 os.waitstatus_to_exitcode(-1)
2846
Victor Stinner65a796e2020-04-01 18:49:29 +02002847 # Skip the test on Windows
2848 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2849 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002850 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002851 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002852
Victor Stinner9bee32b2020-04-22 16:30:35 +02002853 def kill_process(pid):
2854 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002855
Victor Stinner9bee32b2020-04-22 16:30:35 +02002856 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002857
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002858
Victor Stinner4659ccf2016-09-14 10:57:00 +02002859class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002860 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002861 self.exitcode = 17
2862
Hai Shi0c4f0f32020-06-30 21:46:31 +08002863 filename = os_helper.TESTFN
2864 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002865
2866 if not with_env:
2867 code = 'import sys; sys.exit(%s)' % self.exitcode
2868 else:
2869 self.env = dict(os.environ)
2870 # create an unique key
2871 self.key = str(uuid.uuid4())
2872 self.env[self.key] = self.key
2873 # read the variable from os.environ to check that it exists
2874 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2875 % (self.key, self.exitcode))
2876
2877 with open(filename, "w") as fp:
2878 fp.write(code)
2879
Berker Peksag81816462016-09-15 20:19:47 +03002880 args = [sys.executable, filename]
2881 if use_bytes:
2882 args = [os.fsencode(a) for a in args]
2883 self.env = {os.fsencode(k): os.fsencode(v)
2884 for k, v in self.env.items()}
2885
2886 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002887
Berker Peksag4af23d72016-09-15 20:32:44 +03002888 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002889 def test_spawnl(self):
2890 args = self.create_args()
2891 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2892 self.assertEqual(exitcode, self.exitcode)
2893
Berker Peksag4af23d72016-09-15 20:32:44 +03002894 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002895 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002896 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002897 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2898 self.assertEqual(exitcode, self.exitcode)
2899
Berker Peksag4af23d72016-09-15 20:32:44 +03002900 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002901 def test_spawnlp(self):
2902 args = self.create_args()
2903 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2904 self.assertEqual(exitcode, self.exitcode)
2905
Berker Peksag4af23d72016-09-15 20:32:44 +03002906 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002907 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002908 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002909 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2910 self.assertEqual(exitcode, self.exitcode)
2911
Berker Peksag4af23d72016-09-15 20:32:44 +03002912 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002913 def test_spawnv(self):
2914 args = self.create_args()
2915 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2916 self.assertEqual(exitcode, self.exitcode)
2917
Victor Stinner9bee32b2020-04-22 16:30:35 +02002918 # Test for PyUnicode_FSConverter()
2919 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
2920 self.assertEqual(exitcode, self.exitcode)
2921
Berker Peksag4af23d72016-09-15 20:32:44 +03002922 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002923 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002924 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002925 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2926 self.assertEqual(exitcode, self.exitcode)
2927
Berker Peksag4af23d72016-09-15 20:32:44 +03002928 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002929 def test_spawnvp(self):
2930 args = self.create_args()
2931 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2932 self.assertEqual(exitcode, self.exitcode)
2933
Berker Peksag4af23d72016-09-15 20:32:44 +03002934 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002935 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002936 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002937 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2938 self.assertEqual(exitcode, self.exitcode)
2939
Berker Peksag4af23d72016-09-15 20:32:44 +03002940 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002941 def test_nowait(self):
2942 args = self.create_args()
2943 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02002944 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002945
Berker Peksag4af23d72016-09-15 20:32:44 +03002946 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002947 def test_spawnve_bytes(self):
2948 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2949 args = self.create_args(with_env=True, use_bytes=True)
2950 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2951 self.assertEqual(exitcode, self.exitcode)
2952
Steve Dower859fd7b2016-11-19 18:53:19 -08002953 @requires_os_func('spawnl')
2954 def test_spawnl_noargs(self):
2955 args = self.create_args()
2956 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002957 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002958
2959 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002960 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002961 args = self.create_args()
2962 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002963 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002964
2965 @requires_os_func('spawnv')
2966 def test_spawnv_noargs(self):
2967 args = self.create_args()
2968 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2969 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002970 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2971 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002972
2973 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002974 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002975 args = self.create_args()
2976 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2977 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002978 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2979 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002980
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002981 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002982 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002983
Ville Skyttä49b27342017-08-03 09:00:59 +03002984 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002985 newenv = os.environ.copy()
2986 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2987 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002988 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002989 except ValueError:
2990 pass
2991 else:
2992 self.assertEqual(exitcode, 127)
2993
Ville Skyttä49b27342017-08-03 09:00:59 +03002994 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002995 newenv = os.environ.copy()
2996 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2997 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002998 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002999 except ValueError:
3000 pass
3001 else:
3002 self.assertEqual(exitcode, 127)
3003
Ville Skyttä49b27342017-08-03 09:00:59 +03003004 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003005 newenv = os.environ.copy()
3006 newenv["FRUIT=ORANGE"] = "lemon"
3007 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003008 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003009 except ValueError:
3010 pass
3011 else:
3012 self.assertEqual(exitcode, 127)
3013
Ville Skyttä49b27342017-08-03 09:00:59 +03003014 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003015 filename = os_helper.TESTFN
3016 self.addCleanup(os_helper.unlink, filename)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003017 with open(filename, "w") as fp:
3018 fp.write('import sys, os\n'
3019 'if os.getenv("FRUIT") != "orange=lemon":\n'
3020 ' raise AssertionError')
3021 args = [sys.executable, filename]
3022 newenv = os.environ.copy()
3023 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003024 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003025 self.assertEqual(exitcode, 0)
3026
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003027 @requires_os_func('spawnve')
3028 def test_spawnve_invalid_env(self):
3029 self._test_invalid_env(os.spawnve)
3030
3031 @requires_os_func('spawnvpe')
3032 def test_spawnvpe_invalid_env(self):
3033 self._test_invalid_env(os.spawnvpe)
3034
Serhiy Storchaka77703942017-06-25 07:33:01 +03003035
Brian Curtin0151b8e2010-09-24 13:43:43 +00003036# The introduction of this TestCase caused at least two different errors on
3037# *nix buildbots. Temporarily skip this to let the buildbots move along.
3038@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003039@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3040class LoginTests(unittest.TestCase):
3041 def test_getlogin(self):
3042 user_name = os.getlogin()
3043 self.assertNotEqual(len(user_name), 0)
3044
3045
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003046@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3047 "needs os.getpriority and os.setpriority")
3048class ProgramPriorityTests(unittest.TestCase):
3049 """Tests for os.getpriority() and os.setpriority()."""
3050
3051 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003052
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003053 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3054 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3055 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003056 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3057 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003058 raise unittest.SkipTest("unable to reliably test setpriority "
3059 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003060 else:
3061 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003062 finally:
3063 try:
3064 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3065 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003066 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003067 raise
3068
3069
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003071
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003072 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003073
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074 def __init__(self, conn):
3075 asynchat.async_chat.__init__(self, conn)
3076 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003077 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003078 self.closed = False
3079 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003080
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 def handle_read(self):
3082 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003083 if self.accumulate:
3084 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 def get_data(self):
3087 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003088
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003090 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003091 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003092
3093 def handle_error(self):
3094 raise
3095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003096 def __init__(self, address):
3097 threading.Thread.__init__(self)
3098 asyncore.dispatcher.__init__(self)
3099 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3100 self.bind(address)
3101 self.listen(5)
3102 self.host, self.port = self.socket.getsockname()[:2]
3103 self.handler_instance = None
3104 self._active = False
3105 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003106
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003107 # --- public API
3108
3109 @property
3110 def running(self):
3111 return self._active
3112
3113 def start(self):
3114 assert not self.running
3115 self.__flag = threading.Event()
3116 threading.Thread.start(self)
3117 self.__flag.wait()
3118
3119 def stop(self):
3120 assert self.running
3121 self._active = False
3122 self.join()
3123
3124 def wait(self):
3125 # wait for handler connection to be closed, then stop the server
3126 while not getattr(self.handler_instance, "closed", False):
3127 time.sleep(0.001)
3128 self.stop()
3129
3130 # --- internals
3131
3132 def run(self):
3133 self._active = True
3134 self.__flag.set()
3135 while self._active and asyncore.socket_map:
3136 self._active_lock.acquire()
3137 asyncore.loop(timeout=0.001, count=1)
3138 self._active_lock.release()
3139 asyncore.close_all()
3140
3141 def handle_accept(self):
3142 conn, addr = self.accept()
3143 self.handler_instance = self.Handler(conn)
3144
3145 def handle_connect(self):
3146 self.close()
3147 handle_read = handle_connect
3148
3149 def writable(self):
3150 return 0
3151
3152 def handle_error(self):
3153 raise
3154
3155
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003156@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3157class TestSendfile(unittest.TestCase):
3158
Victor Stinner8c663fd2017-11-08 14:44:44 -08003159 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003160 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003161 not sys.platform.startswith("solaris") and \
3162 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003163 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3164 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003165 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3166 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003167
3168 @classmethod
3169 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003170 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003171 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003172
3173 @classmethod
3174 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003175 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003176 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003177
3178 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003179 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003180 self.server.start()
3181 self.client = socket.socket()
3182 self.client.connect((self.server.host, self.server.port))
3183 self.client.settimeout(1)
3184 # synchronize by waiting for "220 ready" response
3185 self.client.recv(1024)
3186 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003187 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003188 self.fileno = self.file.fileno()
3189
3190 def tearDown(self):
3191 self.file.close()
3192 self.client.close()
3193 if self.server.running:
3194 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003195 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003196
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003197 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003198 """A higher level wrapper representing how an application is
3199 supposed to use sendfile().
3200 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003201 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003202 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003203 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003204 except OSError as err:
3205 if err.errno == errno.ECONNRESET:
3206 # disconnected
3207 raise
3208 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3209 # we have to retry send data
3210 continue
3211 else:
3212 raise
3213
3214 def test_send_whole_file(self):
3215 # normal send
3216 total_sent = 0
3217 offset = 0
3218 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003219 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003220 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3221 if sent == 0:
3222 break
3223 offset += sent
3224 total_sent += sent
3225 self.assertTrue(sent <= nbytes)
3226 self.assertEqual(offset, total_sent)
3227
3228 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003229 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003230 self.client.close()
3231 self.server.wait()
3232 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003233 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003234 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003235
3236 def test_send_at_certain_offset(self):
3237 # start sending a file at a certain offset
3238 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003239 offset = len(self.DATA) // 2
3240 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003241 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003242 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003243 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3244 if sent == 0:
3245 break
3246 offset += sent
3247 total_sent += sent
3248 self.assertTrue(sent <= nbytes)
3249
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003250 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003251 self.client.close()
3252 self.server.wait()
3253 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003254 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003255 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003256 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003257 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003258
3259 def test_offset_overflow(self):
3260 # specify an offset > file size
3261 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003262 try:
3263 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3264 except OSError as e:
3265 # Solaris can raise EINVAL if offset >= file length, ignore.
3266 if e.errno != errno.EINVAL:
3267 raise
3268 else:
3269 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003270 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003271 self.client.close()
3272 self.server.wait()
3273 data = self.server.handler_instance.get_data()
3274 self.assertEqual(data, b'')
3275
3276 def test_invalid_offset(self):
3277 with self.assertRaises(OSError) as cm:
3278 os.sendfile(self.sockno, self.fileno, -1, 4096)
3279 self.assertEqual(cm.exception.errno, errno.EINVAL)
3280
Martin Panterbf19d162015-09-09 01:01:13 +00003281 def test_keywords(self):
3282 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003283 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3284 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003285 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003286 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3287 offset=0, count=4096,
3288 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003289
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003290 # --- headers / trailers tests
3291
Serhiy Storchaka43767632013-11-03 21:31:38 +02003292 @requires_headers_trailers
3293 def test_headers(self):
3294 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003295 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003296 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003297 headers=[b"x" * 512, b"y" * 256])
3298 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003299 total_sent += sent
3300 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003301 while total_sent < len(expected_data):
3302 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003303 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3304 offset, nbytes)
3305 if sent == 0:
3306 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003307 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003308 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003309 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003310
Serhiy Storchaka43767632013-11-03 21:31:38 +02003311 self.assertEqual(total_sent, len(expected_data))
3312 self.client.close()
3313 self.server.wait()
3314 data = self.server.handler_instance.get_data()
3315 self.assertEqual(hash(data), hash(expected_data))
3316
3317 @requires_headers_trailers
3318 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003319 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003320 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003321
Hai Shi0c4f0f32020-06-30 21:46:31 +08003322 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003323 create_file(TESTFN2, file_data)
3324
3325 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003326 os.sendfile(self.sockno, f.fileno(), 0, 5,
3327 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003328 self.client.close()
3329 self.server.wait()
3330 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003331 self.assertEqual(data, b"abcde123456789")
3332
3333 @requires_headers_trailers
3334 @requires_32b
3335 def test_headers_overflow_32bits(self):
3336 self.server.handler_instance.accumulate = False
3337 with self.assertRaises(OSError) as cm:
3338 os.sendfile(self.sockno, self.fileno, 0, 0,
3339 headers=[b"x" * 2**16] * 2**15)
3340 self.assertEqual(cm.exception.errno, errno.EINVAL)
3341
3342 @requires_headers_trailers
3343 @requires_32b
3344 def test_trailers_overflow_32bits(self):
3345 self.server.handler_instance.accumulate = False
3346 with self.assertRaises(OSError) as cm:
3347 os.sendfile(self.sockno, self.fileno, 0, 0,
3348 trailers=[b"x" * 2**16] * 2**15)
3349 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003350
Serhiy Storchaka43767632013-11-03 21:31:38 +02003351 @requires_headers_trailers
3352 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3353 'test needs os.SF_NODISKIO')
3354 def test_flags(self):
3355 try:
3356 os.sendfile(self.sockno, self.fileno, 0, 4096,
3357 flags=os.SF_NODISKIO)
3358 except OSError as err:
3359 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3360 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003361
3362
Larry Hastings9cf065c2012-06-22 16:30:09 -07003363def supports_extended_attributes():
3364 if not hasattr(os, "setxattr"):
3365 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003366
Larry Hastings9cf065c2012-06-22 16:30:09 -07003367 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003368 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003369 try:
3370 os.setxattr(fp.fileno(), b"user.test", b"")
3371 except OSError:
3372 return False
3373 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003374 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003375
3376 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003377
3378
3379@unittest.skipUnless(supports_extended_attributes(),
3380 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003381# Kernels < 2.6.39 don't respect setxattr flags.
3382@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003383class ExtendedAttributeTests(unittest.TestCase):
3384
Larry Hastings9cf065c2012-06-22 16:30:09 -07003385 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003386 fn = os_helper.TESTFN
3387 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003388 create_file(fn)
3389
Benjamin Peterson799bd802011-08-31 22:15:17 -04003390 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003391 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003392 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003393
Victor Stinnerf12e5062011-10-16 22:12:03 +02003394 init_xattr = listxattr(fn)
3395 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003396
Larry Hastings9cf065c2012-06-22 16:30:09 -07003397 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003398 xattr = set(init_xattr)
3399 xattr.add("user.test")
3400 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003401 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3402 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3403 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003404
Benjamin Peterson799bd802011-08-31 22:15:17 -04003405 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003406 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003407 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003408
Benjamin Peterson799bd802011-08-31 22:15:17 -04003409 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003410 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003411 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003412
Larry Hastings9cf065c2012-06-22 16:30:09 -07003413 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003414 xattr.add("user.test2")
3415 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003416 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003417
Benjamin Peterson799bd802011-08-31 22:15:17 -04003418 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003419 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003420 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003421
Victor Stinnerf12e5062011-10-16 22:12:03 +02003422 xattr.remove("user.test")
3423 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003424 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3425 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3426 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3427 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003428 many = sorted("user.test{}".format(i) for i in range(100))
3429 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003430 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003431 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003432
Larry Hastings9cf065c2012-06-22 16:30:09 -07003433 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003434 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003435 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003436
3437 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003438 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003439
3440 def test_simple(self):
3441 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3442 os.listxattr)
3443
3444 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003445 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3446 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003447
3448 def test_fds(self):
3449 def getxattr(path, *args):
3450 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003451 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003452 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003453 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003454 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003455 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003456 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003457 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003458 def listxattr(path, *args):
3459 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003460 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003461 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3462
3463
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003464@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3465class TermsizeTests(unittest.TestCase):
3466 def test_does_not_crash(self):
3467 """Check if get_terminal_size() returns a meaningful value.
3468
3469 There's no easy portable way to actually check the size of the
3470 terminal, so let's check if it returns something sensible instead.
3471 """
3472 try:
3473 size = os.get_terminal_size()
3474 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003475 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003476 # Under win32 a generic OSError can be thrown if the
3477 # handle cannot be retrieved
3478 self.skipTest("failed to query terminal size")
3479 raise
3480
Antoine Pitroucfade362012-02-08 23:48:59 +01003481 self.assertGreaterEqual(size.columns, 0)
3482 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003483
3484 def test_stty_match(self):
3485 """Check if stty returns the same results
3486
3487 stty actually tests stdin, so get_terminal_size is invoked on
3488 stdin explicitly. If stty succeeded, then get_terminal_size()
3489 should work too.
3490 """
3491 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003492 size = (
3493 subprocess.check_output(
3494 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3495 ).split()
3496 )
xdegaye6a55d092017-11-12 17:57:04 +01003497 except (FileNotFoundError, subprocess.CalledProcessError,
3498 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003499 self.skipTest("stty invocation failed")
3500 expected = (int(size[1]), int(size[0])) # reversed order
3501
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003502 try:
3503 actual = os.get_terminal_size(sys.__stdin__.fileno())
3504 except OSError as e:
3505 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3506 # Under win32 a generic OSError can be thrown if the
3507 # handle cannot be retrieved
3508 self.skipTest("failed to query terminal size")
3509 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003510 self.assertEqual(expected, actual)
3511
3512
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003513@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003514@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003515class MemfdCreateTests(unittest.TestCase):
3516 def test_memfd_create(self):
3517 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3518 self.assertNotEqual(fd, -1)
3519 self.addCleanup(os.close, fd)
3520 self.assertFalse(os.get_inheritable(fd))
3521 with open(fd, "wb", closefd=False) as f:
3522 f.write(b'memfd_create')
3523 self.assertEqual(f.tell(), 12)
3524
3525 fd2 = os.memfd_create("Hi")
3526 self.addCleanup(os.close, fd2)
3527 self.assertFalse(os.get_inheritable(fd2))
3528
3529
Victor Stinner292c8352012-10-30 02:17:38 +01003530class OSErrorTests(unittest.TestCase):
3531 def setUp(self):
3532 class Str(str):
3533 pass
3534
Victor Stinnerafe17062012-10-31 22:47:43 +01003535 self.bytes_filenames = []
3536 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003537 if os_helper.TESTFN_UNENCODABLE is not None:
3538 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003539 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003540 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003541 self.unicode_filenames.append(decoded)
3542 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003543 if os_helper.TESTFN_UNDECODABLE is not None:
3544 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003545 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003546 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003547 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003548 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003549 self.bytes_filenames.append(memoryview(encoded))
3550
3551 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003552
3553 def test_oserror_filename(self):
3554 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003555 (self.filenames, os.chdir,),
3556 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003557 (self.filenames, os.lstat,),
3558 (self.filenames, os.open, os.O_RDONLY),
3559 (self.filenames, os.rmdir,),
3560 (self.filenames, os.stat,),
3561 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003562 ]
3563 if sys.platform == "win32":
3564 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003565 (self.bytes_filenames, os.rename, b"dst"),
3566 (self.bytes_filenames, os.replace, b"dst"),
3567 (self.unicode_filenames, os.rename, "dst"),
3568 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003569 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003570 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003571 else:
3572 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003573 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003574 (self.filenames, os.rename, "dst"),
3575 (self.filenames, os.replace, "dst"),
3576 ))
3577 if hasattr(os, "chown"):
3578 funcs.append((self.filenames, os.chown, 0, 0))
3579 if hasattr(os, "lchown"):
3580 funcs.append((self.filenames, os.lchown, 0, 0))
3581 if hasattr(os, "truncate"):
3582 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003583 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003584 funcs.append((self.filenames, os.chflags, 0))
3585 if hasattr(os, "lchflags"):
3586 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003587 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003588 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003589 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003590 if sys.platform == "win32":
3591 funcs.append((self.bytes_filenames, os.link, b"dst"))
3592 funcs.append((self.unicode_filenames, os.link, "dst"))
3593 else:
3594 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003595 if hasattr(os, "listxattr"):
3596 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003597 (self.filenames, os.listxattr,),
3598 (self.filenames, os.getxattr, "user.test"),
3599 (self.filenames, os.setxattr, "user.test", b'user'),
3600 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003601 ))
3602 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003603 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003604 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003605 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003606
Steve Dowercc16be82016-09-08 10:35:16 -07003607
Victor Stinnerafe17062012-10-31 22:47:43 +01003608 for filenames, func, *func_args in funcs:
3609 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003610 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003611 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003612 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003613 else:
3614 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3615 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003616 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003617 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003618 except UnicodeDecodeError:
3619 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003620 else:
3621 self.fail("No exception thrown by {}".format(func))
3622
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003623class CPUCountTests(unittest.TestCase):
3624 def test_cpu_count(self):
3625 cpus = os.cpu_count()
3626 if cpus is not None:
3627 self.assertIsInstance(cpus, int)
3628 self.assertGreater(cpus, 0)
3629 else:
3630 self.skipTest("Could not determine the number of CPUs")
3631
Victor Stinnerdaf45552013-08-28 00:53:59 +02003632
3633class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003634 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003635 fd = os.open(__file__, os.O_RDONLY)
3636 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003637 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003638
Victor Stinnerdaf45552013-08-28 00:53:59 +02003639 os.set_inheritable(fd, True)
3640 self.assertEqual(os.get_inheritable(fd), True)
3641
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003642 @unittest.skipIf(fcntl is None, "need fcntl")
3643 def test_get_inheritable_cloexec(self):
3644 fd = os.open(__file__, os.O_RDONLY)
3645 self.addCleanup(os.close, fd)
3646 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003647
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003648 # clear FD_CLOEXEC flag
3649 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3650 flags &= ~fcntl.FD_CLOEXEC
3651 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003652
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003653 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003654
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003655 @unittest.skipIf(fcntl is None, "need fcntl")
3656 def test_set_inheritable_cloexec(self):
3657 fd = os.open(__file__, os.O_RDONLY)
3658 self.addCleanup(os.close, fd)
3659 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3660 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003661
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003662 os.set_inheritable(fd, True)
3663 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3664 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003665
Victor Stinnerdaf45552013-08-28 00:53:59 +02003666 def test_open(self):
3667 fd = os.open(__file__, os.O_RDONLY)
3668 self.addCleanup(os.close, fd)
3669 self.assertEqual(os.get_inheritable(fd), False)
3670
3671 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3672 def test_pipe(self):
3673 rfd, wfd = os.pipe()
3674 self.addCleanup(os.close, rfd)
3675 self.addCleanup(os.close, wfd)
3676 self.assertEqual(os.get_inheritable(rfd), False)
3677 self.assertEqual(os.get_inheritable(wfd), False)
3678
3679 def test_dup(self):
3680 fd1 = os.open(__file__, os.O_RDONLY)
3681 self.addCleanup(os.close, fd1)
3682
3683 fd2 = os.dup(fd1)
3684 self.addCleanup(os.close, fd2)
3685 self.assertEqual(os.get_inheritable(fd2), False)
3686
Zackery Spytz5be66602019-08-23 12:38:41 -06003687 def test_dup_standard_stream(self):
3688 fd = os.dup(1)
3689 self.addCleanup(os.close, fd)
3690 self.assertGreater(fd, 0)
3691
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003692 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3693 def test_dup_nul(self):
3694 # os.dup() was creating inheritable fds for character files.
3695 fd1 = os.open('NUL', os.O_RDONLY)
3696 self.addCleanup(os.close, fd1)
3697 fd2 = os.dup(fd1)
3698 self.addCleanup(os.close, fd2)
3699 self.assertFalse(os.get_inheritable(fd2))
3700
Victor Stinnerdaf45552013-08-28 00:53:59 +02003701 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3702 def test_dup2(self):
3703 fd = os.open(__file__, os.O_RDONLY)
3704 self.addCleanup(os.close, fd)
3705
3706 # inheritable by default
3707 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003708 self.addCleanup(os.close, fd2)
3709 self.assertEqual(os.dup2(fd, fd2), fd2)
3710 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003711
3712 # force non-inheritable
3713 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003714 self.addCleanup(os.close, fd3)
3715 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3716 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003717
3718 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3719 def test_openpty(self):
3720 master_fd, slave_fd = os.openpty()
3721 self.addCleanup(os.close, master_fd)
3722 self.addCleanup(os.close, slave_fd)
3723 self.assertEqual(os.get_inheritable(master_fd), False)
3724 self.assertEqual(os.get_inheritable(slave_fd), False)
3725
3726
Brett Cannon3f9183b2016-08-26 14:44:48 -07003727class PathTConverterTests(unittest.TestCase):
3728 # tuples of (function name, allows fd arguments, additional arguments to
3729 # function, cleanup function)
3730 functions = [
3731 ('stat', True, (), None),
3732 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003733 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003734 ('chflags', False, (0,), None),
3735 ('lchflags', False, (0,), None),
3736 ('open', False, (0,), getattr(os, 'close', None)),
3737 ]
3738
3739 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003740 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003741 if os.name == 'nt':
3742 bytes_fspath = bytes_filename = None
3743 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003744 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003745 bytes_fspath = FakePath(bytes_filename)
3746 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003747 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003748 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003749
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003750 int_fspath = FakePath(fd)
3751 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003752
3753 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3754 with self.subTest(name=name):
3755 try:
3756 fn = getattr(os, name)
3757 except AttributeError:
3758 continue
3759
Brett Cannon8f96a302016-08-26 19:30:11 -07003760 for path in (str_filename, bytes_filename, str_fspath,
3761 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003762 if path is None:
3763 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003764 with self.subTest(name=name, path=path):
3765 result = fn(path, *extra_args)
3766 if cleanup_fn is not None:
3767 cleanup_fn(result)
3768
3769 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003770 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003771 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003772
3773 if allow_fd:
3774 result = fn(fd, *extra_args) # should not fail
3775 if cleanup_fn is not None:
3776 cleanup_fn(result)
3777 else:
3778 with self.assertRaisesRegex(
3779 TypeError,
3780 'os.PathLike'):
3781 fn(fd, *extra_args)
3782
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003783 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003784 msg = r'__fspath__\(\) to return str or bytes, not %s'
3785 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003786 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003787 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003788 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003789 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003790 os.stat(FakePath(object()))
3791
Brett Cannon3f9183b2016-08-26 14:44:48 -07003792
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003793@unittest.skipUnless(hasattr(os, 'get_blocking'),
3794 'needs os.get_blocking() and os.set_blocking()')
3795class BlockingTests(unittest.TestCase):
3796 def test_blocking(self):
3797 fd = os.open(__file__, os.O_RDONLY)
3798 self.addCleanup(os.close, fd)
3799 self.assertEqual(os.get_blocking(fd), True)
3800
3801 os.set_blocking(fd, False)
3802 self.assertEqual(os.get_blocking(fd), False)
3803
3804 os.set_blocking(fd, True)
3805 self.assertEqual(os.get_blocking(fd), True)
3806
3807
Yury Selivanov97e2e062014-09-26 12:33:06 -04003808
3809class ExportsTests(unittest.TestCase):
3810 def test_os_all(self):
3811 self.assertIn('open', os.__all__)
3812 self.assertIn('walk', os.__all__)
3813
3814
Eddie Elizondob3966632019-11-05 07:16:14 -08003815class TestDirEntry(unittest.TestCase):
3816 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003817 self.path = os.path.realpath(os_helper.TESTFN)
3818 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08003819 os.mkdir(self.path)
3820
3821 def test_uninstantiable(self):
3822 self.assertRaises(TypeError, os.DirEntry)
3823
3824 def test_unpickable(self):
3825 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3826 entry = [entry for entry in os.scandir(self.path)].pop()
3827 self.assertIsInstance(entry, os.DirEntry)
3828 self.assertEqual(entry.name, "file.txt")
3829 import pickle
3830 self.assertRaises(TypeError, pickle.dumps, entry, filename)
3831
3832
Victor Stinner6036e442015-03-08 01:58:04 +01003833class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003834 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003835
Victor Stinner6036e442015-03-08 01:58:04 +01003836 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003837 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003838 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003839 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003840 os.mkdir(self.path)
3841
3842 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003843 path = self.bytes_path if isinstance(name, bytes) else self.path
3844 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003845 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003846 return filename
3847
3848 def get_entries(self, names):
3849 entries = dict((entry.name, entry)
3850 for entry in os.scandir(self.path))
3851 self.assertEqual(sorted(entries.keys()), names)
3852 return entries
3853
3854 def assert_stat_equal(self, stat1, stat2, skip_fields):
3855 if skip_fields:
3856 for attr in dir(stat1):
3857 if not attr.startswith("st_"):
3858 continue
3859 if attr in ("st_dev", "st_ino", "st_nlink"):
3860 continue
3861 self.assertEqual(getattr(stat1, attr),
3862 getattr(stat2, attr),
3863 (stat1, stat2, attr))
3864 else:
3865 self.assertEqual(stat1, stat2)
3866
Eddie Elizondob3966632019-11-05 07:16:14 -08003867 def test_uninstantiable(self):
3868 scandir_iter = os.scandir(self.path)
3869 self.assertRaises(TypeError, type(scandir_iter))
3870 scandir_iter.close()
3871
3872 def test_unpickable(self):
3873 filename = self.create_file("file.txt")
3874 scandir_iter = os.scandir(self.path)
3875 import pickle
3876 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3877 scandir_iter.close()
3878
Victor Stinner6036e442015-03-08 01:58:04 +01003879 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003880 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003881 self.assertEqual(entry.name, name)
3882 self.assertEqual(entry.path, os.path.join(self.path, name))
3883 self.assertEqual(entry.inode(),
3884 os.stat(entry.path, follow_symlinks=False).st_ino)
3885
3886 entry_stat = os.stat(entry.path)
3887 self.assertEqual(entry.is_dir(),
3888 stat.S_ISDIR(entry_stat.st_mode))
3889 self.assertEqual(entry.is_file(),
3890 stat.S_ISREG(entry_stat.st_mode))
3891 self.assertEqual(entry.is_symlink(),
3892 os.path.islink(entry.path))
3893
3894 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3895 self.assertEqual(entry.is_dir(follow_symlinks=False),
3896 stat.S_ISDIR(entry_lstat.st_mode))
3897 self.assertEqual(entry.is_file(follow_symlinks=False),
3898 stat.S_ISREG(entry_lstat.st_mode))
3899
3900 self.assert_stat_equal(entry.stat(),
3901 entry_stat,
3902 os.name == 'nt' and not is_symlink)
3903 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3904 entry_lstat,
3905 os.name == 'nt')
3906
3907 def test_attributes(self):
3908 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08003909 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01003910
3911 dirname = os.path.join(self.path, "dir")
3912 os.mkdir(dirname)
3913 filename = self.create_file("file.txt")
3914 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003915 try:
3916 os.link(filename, os.path.join(self.path, "link_file.txt"))
3917 except PermissionError as e:
3918 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003919 if symlink:
3920 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3921 target_is_directory=True)
3922 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3923
3924 names = ['dir', 'file.txt']
3925 if link:
3926 names.append('link_file.txt')
3927 if symlink:
3928 names.extend(('symlink_dir', 'symlink_file.txt'))
3929 entries = self.get_entries(names)
3930
3931 entry = entries['dir']
3932 self.check_entry(entry, 'dir', True, False, False)
3933
3934 entry = entries['file.txt']
3935 self.check_entry(entry, 'file.txt', False, True, False)
3936
3937 if link:
3938 entry = entries['link_file.txt']
3939 self.check_entry(entry, 'link_file.txt', False, True, False)
3940
3941 if symlink:
3942 entry = entries['symlink_dir']
3943 self.check_entry(entry, 'symlink_dir', True, False, True)
3944
3945 entry = entries['symlink_file.txt']
3946 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3947
3948 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003949 path = self.bytes_path if isinstance(name, bytes) else self.path
3950 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003951 self.assertEqual(len(entries), 1)
3952
3953 entry = entries[0]
3954 self.assertEqual(entry.name, name)
3955 return entry
3956
Brett Cannon96881cd2016-06-10 14:37:21 -07003957 def create_file_entry(self, name='file.txt'):
3958 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003959 return self.get_entry(os.path.basename(filename))
3960
3961 def test_current_directory(self):
3962 filename = self.create_file()
3963 old_dir = os.getcwd()
3964 try:
3965 os.chdir(self.path)
3966
3967 # call scandir() without parameter: it must list the content
3968 # of the current directory
3969 entries = dict((entry.name, entry) for entry in os.scandir())
3970 self.assertEqual(sorted(entries.keys()),
3971 [os.path.basename(filename)])
3972 finally:
3973 os.chdir(old_dir)
3974
3975 def test_repr(self):
3976 entry = self.create_file_entry()
3977 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3978
Brett Cannon96881cd2016-06-10 14:37:21 -07003979 def test_fspath_protocol(self):
3980 entry = self.create_file_entry()
3981 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3982
3983 def test_fspath_protocol_bytes(self):
3984 bytes_filename = os.fsencode('bytesfile.txt')
3985 bytes_entry = self.create_file_entry(name=bytes_filename)
3986 fspath = os.fspath(bytes_entry)
3987 self.assertIsInstance(fspath, bytes)
3988 self.assertEqual(fspath,
3989 os.path.join(os.fsencode(self.path),bytes_filename))
3990
Victor Stinner6036e442015-03-08 01:58:04 +01003991 def test_removed_dir(self):
3992 path = os.path.join(self.path, 'dir')
3993
3994 os.mkdir(path)
3995 entry = self.get_entry('dir')
3996 os.rmdir(path)
3997
3998 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3999 if os.name == 'nt':
4000 self.assertTrue(entry.is_dir())
4001 self.assertFalse(entry.is_file())
4002 self.assertFalse(entry.is_symlink())
4003 if os.name == 'nt':
4004 self.assertRaises(FileNotFoundError, entry.inode)
4005 # don't fail
4006 entry.stat()
4007 entry.stat(follow_symlinks=False)
4008 else:
4009 self.assertGreater(entry.inode(), 0)
4010 self.assertRaises(FileNotFoundError, entry.stat)
4011 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4012
4013 def test_removed_file(self):
4014 entry = self.create_file_entry()
4015 os.unlink(entry.path)
4016
4017 self.assertFalse(entry.is_dir())
4018 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4019 if os.name == 'nt':
4020 self.assertTrue(entry.is_file())
4021 self.assertFalse(entry.is_symlink())
4022 if os.name == 'nt':
4023 self.assertRaises(FileNotFoundError, entry.inode)
4024 # don't fail
4025 entry.stat()
4026 entry.stat(follow_symlinks=False)
4027 else:
4028 self.assertGreater(entry.inode(), 0)
4029 self.assertRaises(FileNotFoundError, entry.stat)
4030 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4031
4032 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004033 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004034 return self.skipTest('cannot create symbolic link')
4035
4036 filename = self.create_file("file.txt")
4037 os.symlink(filename,
4038 os.path.join(self.path, "symlink.txt"))
4039 entries = self.get_entries(['file.txt', 'symlink.txt'])
4040 entry = entries['symlink.txt']
4041 os.unlink(filename)
4042
4043 self.assertGreater(entry.inode(), 0)
4044 self.assertFalse(entry.is_dir())
4045 self.assertFalse(entry.is_file()) # broken symlink returns False
4046 self.assertFalse(entry.is_dir(follow_symlinks=False))
4047 self.assertFalse(entry.is_file(follow_symlinks=False))
4048 self.assertTrue(entry.is_symlink())
4049 self.assertRaises(FileNotFoundError, entry.stat)
4050 # don't fail
4051 entry.stat(follow_symlinks=False)
4052
4053 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004054 self.create_file("file.txt")
4055
4056 path_bytes = os.fsencode(self.path)
4057 entries = list(os.scandir(path_bytes))
4058 self.assertEqual(len(entries), 1, entries)
4059 entry = entries[0]
4060
4061 self.assertEqual(entry.name, b'file.txt')
4062 self.assertEqual(entry.path,
4063 os.fsencode(os.path.join(self.path, 'file.txt')))
4064
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004065 def test_bytes_like(self):
4066 self.create_file("file.txt")
4067
4068 for cls in bytearray, memoryview:
4069 path_bytes = cls(os.fsencode(self.path))
4070 with self.assertWarns(DeprecationWarning):
4071 entries = list(os.scandir(path_bytes))
4072 self.assertEqual(len(entries), 1, entries)
4073 entry = entries[0]
4074
4075 self.assertEqual(entry.name, b'file.txt')
4076 self.assertEqual(entry.path,
4077 os.fsencode(os.path.join(self.path, 'file.txt')))
4078 self.assertIs(type(entry.name), bytes)
4079 self.assertIs(type(entry.path), bytes)
4080
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004081 @unittest.skipUnless(os.listdir in os.supports_fd,
4082 'fd support for listdir required for this test.')
4083 def test_fd(self):
4084 self.assertIn(os.scandir, os.supports_fd)
4085 self.create_file('file.txt')
4086 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004087 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004088 os.symlink('file.txt', os.path.join(self.path, 'link'))
4089 expected_names.append('link')
4090
4091 fd = os.open(self.path, os.O_RDONLY)
4092 try:
4093 with os.scandir(fd) as it:
4094 entries = list(it)
4095 names = [entry.name for entry in entries]
4096 self.assertEqual(sorted(names), expected_names)
4097 self.assertEqual(names, os.listdir(fd))
4098 for entry in entries:
4099 self.assertEqual(entry.path, entry.name)
4100 self.assertEqual(os.fspath(entry), entry.name)
4101 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4102 if os.stat in os.supports_dir_fd:
4103 st = os.stat(entry.name, dir_fd=fd)
4104 self.assertEqual(entry.stat(), st)
4105 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4106 self.assertEqual(entry.stat(follow_symlinks=False), st)
4107 finally:
4108 os.close(fd)
4109
Victor Stinner6036e442015-03-08 01:58:04 +01004110 def test_empty_path(self):
4111 self.assertRaises(FileNotFoundError, os.scandir, '')
4112
4113 def test_consume_iterator_twice(self):
4114 self.create_file("file.txt")
4115 iterator = os.scandir(self.path)
4116
4117 entries = list(iterator)
4118 self.assertEqual(len(entries), 1, entries)
4119
4120 # check than consuming the iterator twice doesn't raise exception
4121 entries2 = list(iterator)
4122 self.assertEqual(len(entries2), 0, entries2)
4123
4124 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004125 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004126 self.assertRaises(TypeError, os.scandir, obj)
4127
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004128 def test_close(self):
4129 self.create_file("file.txt")
4130 self.create_file("file2.txt")
4131 iterator = os.scandir(self.path)
4132 next(iterator)
4133 iterator.close()
4134 # multiple closes
4135 iterator.close()
4136 with self.check_no_resource_warning():
4137 del iterator
4138
4139 def test_context_manager(self):
4140 self.create_file("file.txt")
4141 self.create_file("file2.txt")
4142 with os.scandir(self.path) as iterator:
4143 next(iterator)
4144 with self.check_no_resource_warning():
4145 del iterator
4146
4147 def test_context_manager_close(self):
4148 self.create_file("file.txt")
4149 self.create_file("file2.txt")
4150 with os.scandir(self.path) as iterator:
4151 next(iterator)
4152 iterator.close()
4153
4154 def test_context_manager_exception(self):
4155 self.create_file("file.txt")
4156 self.create_file("file2.txt")
4157 with self.assertRaises(ZeroDivisionError):
4158 with os.scandir(self.path) as iterator:
4159 next(iterator)
4160 1/0
4161 with self.check_no_resource_warning():
4162 del iterator
4163
4164 def test_resource_warning(self):
4165 self.create_file("file.txt")
4166 self.create_file("file2.txt")
4167 iterator = os.scandir(self.path)
4168 next(iterator)
4169 with self.assertWarns(ResourceWarning):
4170 del iterator
4171 support.gc_collect()
4172 # exhausted iterator
4173 iterator = os.scandir(self.path)
4174 list(iterator)
4175 with self.check_no_resource_warning():
4176 del iterator
4177
Victor Stinner6036e442015-03-08 01:58:04 +01004178
Ethan Furmancdc08792016-06-02 15:06:09 -07004179class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004180
4181 # Abstracted so it can be overridden to test pure Python implementation
4182 # if a C version is provided.
4183 fspath = staticmethod(os.fspath)
4184
Ethan Furmancdc08792016-06-02 15:06:09 -07004185 def test_return_bytes(self):
4186 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004187 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004188
4189 def test_return_string(self):
4190 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004191 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004192
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004193 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004194 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004195 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004196
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004197 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004198 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4199 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4200
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004201 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004202 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4203 self.assertTrue(issubclass(FakePath, os.PathLike))
4204 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004205
Ethan Furmancdc08792016-06-02 15:06:09 -07004206 def test_garbage_in_exception_out(self):
4207 vapor = type('blah', (), {})
4208 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004209 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004210
4211 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004212 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004213
Brett Cannon044283a2016-07-15 10:41:49 -07004214 def test_bad_pathlike(self):
4215 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004216 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004217 # __fspath__ attribute that is not callable.
4218 c = type('foo', (), {})
4219 c.__fspath__ = 1
4220 self.assertRaises(TypeError, self.fspath, c())
4221 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004222 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004223 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004224
Bar Hareleae87e32019-12-22 11:57:27 +02004225 def test_pathlike_subclasshook(self):
4226 # bpo-38878: subclasshook causes subclass checks
4227 # true on abstract implementation.
4228 class A(os.PathLike):
4229 pass
4230 self.assertFalse(issubclass(FakePath, A))
4231 self.assertTrue(issubclass(FakePath, os.PathLike))
4232
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004233 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004234 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004235
Victor Stinnerc29b5852017-11-02 07:28:27 -07004236
4237class TimesTests(unittest.TestCase):
4238 def test_times(self):
4239 times = os.times()
4240 self.assertIsInstance(times, os.times_result)
4241
4242 for field in ('user', 'system', 'children_user', 'children_system',
4243 'elapsed'):
4244 value = getattr(times, field)
4245 self.assertIsInstance(value, float)
4246
4247 if os.name == 'nt':
4248 self.assertEqual(times.children_user, 0)
4249 self.assertEqual(times.children_system, 0)
4250 self.assertEqual(times.elapsed, 0)
4251
4252
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004253# Only test if the C version is provided, otherwise TestPEP519 already tested
4254# the pure Python implementation.
4255if hasattr(os, "_fspath"):
4256 class TestPEP519PurePython(TestPEP519):
4257
4258 """Explicitly test the pure Python implementation of os.fspath()."""
4259
4260 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004261
4262
Fred Drake2e2be372001-09-20 21:33:42 +00004263if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004264 unittest.main()