blob: d6da4617d50f7f265da3f49bfb553c27eb29eda5 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
Steve Dowerdf2d4a62019-08-21 15:27:33 -070011import fnmatch
Victor Stinner47aacc82015-06-12 17:26:23 +020012import fractions
Victor Stinner47aacc82015-06-12 17:26:23 +020013import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Christian Heimescd9fed62020-11-13 19:48:52 +010018import select
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Christian Heimescd9fed62020-11-13 19:48:52 +010023import struct
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinnerec3e20a2019-06-28 18:01:59 +020027import tempfile
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020028import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020029import time
Guido van Rossum48b069a2020-04-07 09:50:06 -070030import types
Victor Stinner47aacc82015-06-12 17:26:23 +020031import unittest
32import uuid
33import warnings
34from test import support
Hai Shid94af3f2020-08-08 17:32:41 +080035from test.support import import_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080036from test.support import os_helper
Serhiy Storchaka16994912020-04-25 10:06:29 +030037from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +080038from test.support import threading_helper
Hai Shi0c4f0f32020-06-30 21:46:31 +080039from test.support import warnings_helper
Paul Monson62dfd7d2019-04-25 11:36:45 -070040from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020041
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042try:
43 import resource
44except ImportError:
45 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010050try:
51 import _winapi
52except ImportError:
53 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020054try:
R David Murrayf2ad1732014-12-25 18:36:56 -050055 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Christian Heimescd9fed62020-11-13 19:48:52 +010064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Hai Shi0c4f0f32020-06-30 21:46:31 +080066from test.support import unix_shell
67from test.support.os_helper import FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
Berker Peksag4af23d72016-09-15 20:32:44 +030087def requires_os_func(name):
88 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
89
90
Victor Stinnerae39d232016-03-24 17:12:55 +010091def create_file(filename, content=b'content'):
92 with open(filename, "xb", 0) as fp:
93 fp.write(content)
94
95
Victor Stinner689830e2019-06-26 17:31:12 +020096class MiscTests(unittest.TestCase):
97 def test_getcwd(self):
98 cwd = os.getcwd()
99 self.assertIsInstance(cwd, str)
100
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200101 def test_getcwd_long_path(self):
102 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
103 # Windows, MAX_PATH is defined as 260 characters, but Windows supports
104 # longer path if longer paths support is enabled. Internally, the os
105 # module uses MAXPATHLEN which is at least 1024.
106 #
107 # Use a directory name of 200 characters to fit into Windows MAX_PATH
108 # limit.
109 #
110 # On Windows, the test can stop when trying to create a path longer
111 # than MAX_PATH if long paths support is disabled:
112 # see RtlAreLongPathsEnabled().
113 min_len = 2000 # characters
114 dirlen = 200 # characters
115 dirname = 'python_test_dir_'
116 dirname = dirname + ('a' * (dirlen - len(dirname)))
117
118 with tempfile.TemporaryDirectory() as tmpdir:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800119 with os_helper.change_cwd(tmpdir) as path:
Victor Stinnerec3e20a2019-06-28 18:01:59 +0200120 expected = path
121
122 while True:
123 cwd = os.getcwd()
124 self.assertEqual(cwd, expected)
125
126 need = min_len - (len(cwd) + len(os.path.sep))
127 if need <= 0:
128 break
129 if len(dirname) > need and need > 0:
130 dirname = dirname[:need]
131
132 path = os.path.join(path, dirname)
133 try:
134 os.mkdir(path)
135 # On Windows, chdir() can fail
136 # even if mkdir() succeeded
137 os.chdir(path)
138 except FileNotFoundError:
139 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
140 # ERROR_FILENAME_EXCED_RANGE (206) errors
141 # ("The filename or extension is too long")
142 break
143 except OSError as exc:
144 if exc.errno == errno.ENAMETOOLONG:
145 break
146 else:
147 raise
148
149 expected = path
150
151 if support.verbose:
152 print(f"Tested current directory length: {len(cwd)}")
153
Victor Stinner689830e2019-06-26 17:31:12 +0200154 def test_getcwdb(self):
155 cwd = os.getcwdb()
156 self.assertIsInstance(cwd, bytes)
157 self.assertEqual(os.fsdecode(cwd), os.getcwd())
158
159
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000160# Tests creating TESTFN
161class FileTests(unittest.TestCase):
162 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800163 if os.path.lexists(os_helper.TESTFN):
164 os.unlink(os_helper.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000165 tearDown = setUp
166
167 def test_access(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800168 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000169 os.close(f)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800170 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000171
Christian Heimesfdab48e2008-01-20 09:06:41 +0000172 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800173 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000174 # We must allocate two consecutive file descriptors, otherwise
175 # it will mess up other file descriptors (perhaps even the three
176 # standard ones).
177 second = os.dup(first)
178 try:
179 retries = 0
180 while second != first + 1:
181 os.close(first)
182 retries += 1
183 if retries > 10:
184 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000185 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000186 first, second = second, os.dup(second)
187 finally:
188 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000189 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000190 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000191 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000192
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000193 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000194 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800195 path = os_helper.TESTFN
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000196 old = sys.getrefcount(path)
197 self.assertRaises(TypeError, os.rename, path, 0)
198 new = sys.getrefcount(path)
199 self.assertEqual(old, new)
200
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000201 def test_read(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800202 with open(os_helper.TESTFN, "w+b") as fobj:
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000203 fobj.write(b"spam")
204 fobj.flush()
205 fd = fobj.fileno()
206 os.lseek(fd, 0, 0)
207 s = os.read(fd, 4)
208 self.assertEqual(type(s), bytes)
209 self.assertEqual(s, b"spam")
210
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200211 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200212 # Skip the test on 32-bit platforms: the number of bytes must fit in a
213 # Py_ssize_t type
214 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
215 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200216 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
217 def test_large_read(self, size):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800218 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
219 create_file(os_helper.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200220
221 # Issue #21932: Make sure that os.read() does not raise an
222 # OverflowError for size larger than INT_MAX
Hai Shi0c4f0f32020-06-30 21:46:31 +0800223 with open(os_helper.TESTFN, "rb") as fp:
Victor Stinnerb28ed922014-07-11 17:04:41 +0200224 data = os.read(fp.fileno(), size)
225
Victor Stinner8c663fd2017-11-08 14:44:44 -0800226 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200227 # operating system is free to return less bytes than requested.
228 self.assertEqual(data, b'test')
229
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000230 def test_write(self):
231 # os.write() accepts bytes- and buffer-like objects but not strings
Hai Shi0c4f0f32020-06-30 21:46:31 +0800232 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000233 self.assertRaises(TypeError, os.write, fd, "beans")
234 os.write(fd, b"bacon\n")
235 os.write(fd, bytearray(b"eggs\n"))
236 os.write(fd, memoryview(b"spam\n"))
237 os.close(fd)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800238 with open(os_helper.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000239 self.assertEqual(fobj.read().splitlines(),
240 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000241
Victor Stinnere0daff12011-03-20 23:36:35 +0100242 def write_windows_console(self, *args):
243 retcode = subprocess.call(args,
244 # use a new console to not flood the test output
245 creationflags=subprocess.CREATE_NEW_CONSOLE,
246 # use a shell to hide the console window (SW_HIDE)
247 shell=True)
248 self.assertEqual(retcode, 0)
249
250 @unittest.skipUnless(sys.platform == 'win32',
251 'test specific to the Windows console')
252 def test_write_windows_console(self):
253 # Issue #11395: the Windows console returns an error (12: not enough
254 # space error) on writing into stdout if stdout mode is binary and the
255 # length is greater than 66,000 bytes (or less, depending on heap
256 # usage).
257 code = "print('x' * 100000)"
258 self.write_windows_console(sys.executable, "-c", code)
259 self.write_windows_console(sys.executable, "-u", "-c", code)
260
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000261 def fdopen_helper(self, *args):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800262 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200263 f = os.fdopen(fd, *args)
264 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000265
266 def test_fdopen(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800267 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200268 os.close(fd)
269
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000270 self.fdopen_helper()
271 self.fdopen_helper('r')
272 self.fdopen_helper('r', 100)
273
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100274 def test_replace(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800275 TESTFN2 = os_helper.TESTFN + ".2"
276 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
277 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +0100278
Hai Shi0c4f0f32020-06-30 21:46:31 +0800279 create_file(os_helper.TESTFN, b"1")
Victor Stinnerae39d232016-03-24 17:12:55 +0100280 create_file(TESTFN2, b"2")
281
Hai Shi0c4f0f32020-06-30 21:46:31 +0800282 os.replace(os_helper.TESTFN, TESTFN2)
283 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100284 with open(TESTFN2, 'r') as f:
285 self.assertEqual(f.read(), "1")
286
Martin Panterbf19d162015-09-09 01:01:13 +0000287 def test_open_keywords(self):
288 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
289 dir_fd=None)
290 os.close(f)
291
292 def test_symlink_keywords(self):
293 symlink = support.get_attribute(os, "symlink")
294 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800295 symlink(src='target', dst=os_helper.TESTFN,
Martin Panterbf19d162015-09-09 01:01:13 +0000296 target_is_directory=False, dir_fd=None)
297 except (NotImplementedError, OSError):
298 pass # No OS support or unprivileged user
299
Pablo Galindoaac4d032019-05-31 19:39:47 +0100300 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
301 def test_copy_file_range_invalid_values(self):
302 with self.assertRaises(ValueError):
303 os.copy_file_range(0, 1, -10)
304
305 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
306 def test_copy_file_range(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800307 TESTFN2 = os_helper.TESTFN + ".3"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100308 data = b'0123456789'
309
Hai Shi0c4f0f32020-06-30 21:46:31 +0800310 create_file(os_helper.TESTFN, data)
311 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100312
Hai Shi0c4f0f32020-06-30 21:46:31 +0800313 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100314 self.addCleanup(in_file.close)
315 in_fd = in_file.fileno()
316
317 out_file = open(TESTFN2, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800318 self.addCleanup(os_helper.unlink, TESTFN2)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100319 self.addCleanup(out_file.close)
320 out_fd = out_file.fileno()
321
322 try:
323 i = os.copy_file_range(in_fd, out_fd, 5)
324 except OSError as e:
325 # Handle the case in which Python was compiled
326 # in a system with the syscall but without support
327 # in the kernel.
328 if e.errno != errno.ENOSYS:
329 raise
330 self.skipTest(e)
331 else:
332 # The number of copied bytes can be less than
333 # the number of bytes originally requested.
334 self.assertIn(i, range(0, 6));
335
336 with open(TESTFN2, 'rb') as in_file:
337 self.assertEqual(in_file.read(), data[:i])
338
339 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
340 def test_copy_file_range_offset(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800341 TESTFN4 = os_helper.TESTFN + ".4"
Pablo Galindoaac4d032019-05-31 19:39:47 +0100342 data = b'0123456789'
343 bytes_to_copy = 6
344 in_skip = 3
345 out_seek = 5
346
Hai Shi0c4f0f32020-06-30 21:46:31 +0800347 create_file(os_helper.TESTFN, data)
348 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100349
Hai Shi0c4f0f32020-06-30 21:46:31 +0800350 in_file = open(os_helper.TESTFN, 'rb')
Pablo Galindoaac4d032019-05-31 19:39:47 +0100351 self.addCleanup(in_file.close)
352 in_fd = in_file.fileno()
353
354 out_file = open(TESTFN4, 'w+b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800355 self.addCleanup(os_helper.unlink, TESTFN4)
Pablo Galindoaac4d032019-05-31 19:39:47 +0100356 self.addCleanup(out_file.close)
357 out_fd = out_file.fileno()
358
359 try:
360 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
361 offset_src=in_skip,
362 offset_dst=out_seek)
363 except OSError as e:
364 # Handle the case in which Python was compiled
365 # in a system with the syscall but without support
366 # in the kernel.
367 if e.errno != errno.ENOSYS:
368 raise
369 self.skipTest(e)
370 else:
371 # The number of copied bytes can be less than
372 # the number of bytes originally requested.
373 self.assertIn(i, range(0, bytes_to_copy+1));
374
375 with open(TESTFN4, 'rb') as in_file:
376 read = in_file.read()
377 # seeked bytes (5) are zero'ed
378 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
379 # 012 are skipped (in_skip)
380 # 345678 are copied in the file (in_skip + bytes_to_copy)
381 self.assertEqual(read[out_seek:],
382 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200383
Pablo Galindoa57b3d32020-11-17 00:00:38 +0000384 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
385 def test_splice_invalid_values(self):
386 with self.assertRaises(ValueError):
387 os.splice(0, 1, -10)
388
389 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
390 def test_splice(self):
391 TESTFN2 = os_helper.TESTFN + ".3"
392 data = b'0123456789'
393
394 create_file(os_helper.TESTFN, data)
395 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
396
397 in_file = open(os_helper.TESTFN, 'rb')
398 self.addCleanup(in_file.close)
399 in_fd = in_file.fileno()
400
401 read_fd, write_fd = os.pipe()
402 self.addCleanup(lambda: os.close(read_fd))
403 self.addCleanup(lambda: os.close(write_fd))
404
405 try:
406 i = os.splice(in_fd, write_fd, 5)
407 except OSError as e:
408 # Handle the case in which Python was compiled
409 # in a system with the syscall but without support
410 # in the kernel.
411 if e.errno != errno.ENOSYS:
412 raise
413 self.skipTest(e)
414 else:
415 # The number of copied bytes can be less than
416 # the number of bytes originally requested.
417 self.assertIn(i, range(0, 6));
418
419 self.assertEqual(os.read(read_fd, 100), data[:i])
420
421 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
422 def test_splice_offset_in(self):
423 TESTFN4 = os_helper.TESTFN + ".4"
424 data = b'0123456789'
425 bytes_to_copy = 6
426 in_skip = 3
427
428 create_file(os_helper.TESTFN, data)
429 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
430
431 in_file = open(os_helper.TESTFN, 'rb')
432 self.addCleanup(in_file.close)
433 in_fd = in_file.fileno()
434
435 read_fd, write_fd = os.pipe()
436 self.addCleanup(lambda: os.close(read_fd))
437 self.addCleanup(lambda: os.close(write_fd))
438
439 try:
440 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
441 except OSError as e:
442 # Handle the case in which Python was compiled
443 # in a system with the syscall but without support
444 # in the kernel.
445 if e.errno != errno.ENOSYS:
446 raise
447 self.skipTest(e)
448 else:
449 # The number of copied bytes can be less than
450 # the number of bytes originally requested.
451 self.assertIn(i, range(0, bytes_to_copy+1));
452
453 read = os.read(read_fd, 100)
454 # 012 are skipped (in_skip)
455 # 345678 are copied in the file (in_skip + bytes_to_copy)
456 self.assertEqual(read, data[in_skip:in_skip+i])
457
458 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
459 def test_splice_offset_out(self):
460 TESTFN4 = os_helper.TESTFN + ".4"
461 data = b'0123456789'
462 bytes_to_copy = 6
463 out_seek = 3
464
465 create_file(os_helper.TESTFN, data)
466 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
467
468 read_fd, write_fd = os.pipe()
469 self.addCleanup(lambda: os.close(read_fd))
470 self.addCleanup(lambda: os.close(write_fd))
471 os.write(write_fd, data)
472
473 out_file = open(TESTFN4, 'w+b')
474 self.addCleanup(os_helper.unlink, TESTFN4)
475 self.addCleanup(out_file.close)
476 out_fd = out_file.fileno()
477
478 try:
479 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
480 except OSError as e:
481 # Handle the case in which Python was compiled
482 # in a system with the syscall but without support
483 # in the kernel.
484 if e.errno != errno.ENOSYS:
485 raise
486 self.skipTest(e)
487 else:
488 # The number of copied bytes can be less than
489 # the number of bytes originally requested.
490 self.assertIn(i, range(0, bytes_to_copy+1));
491
492 with open(TESTFN4, 'rb') as in_file:
493 read = in_file.read()
494 # seeked bytes (5) are zero'ed
495 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
496 # 012 are skipped (in_skip)
497 # 345678 are copied in the file (in_skip + bytes_to_copy)
498 self.assertEqual(read[out_seek:], data[:i])
499
500
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000501# Test attributes on return values from os.*stat* family.
502class StatAttributeTests(unittest.TestCase):
503 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800504 self.fname = os_helper.TESTFN
505 self.addCleanup(os_helper.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100506 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000507
Antoine Pitrou38425292010-09-21 18:19:07 +0000508 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000509 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000510
511 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000512 self.assertEqual(result[stat.ST_SIZE], 3)
513 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000514
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000515 # Make sure all the attributes are there
516 members = dir(result)
517 for name in dir(stat):
518 if name[:3] == 'ST_':
519 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000520 if name.endswith("TIME"):
521 def trunc(x): return int(x)
522 else:
523 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000524 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000525 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000526 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000527
Larry Hastings6fe20b32012-04-19 15:07:49 -0700528 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700529 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700530 for name in 'st_atime st_mtime st_ctime'.split():
531 floaty = int(getattr(result, name) * 100000)
532 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700533 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700534
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000535 try:
536 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200537 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000538 except IndexError:
539 pass
540
541 # Make sure that assignment fails
542 try:
543 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200544 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000545 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000546 pass
547
548 try:
549 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200550 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000551 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000552 pass
553
554 try:
555 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200556 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000557 except AttributeError:
558 pass
559
560 # Use the stat_result constructor with a too-short tuple.
561 try:
562 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200563 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000564 except TypeError:
565 pass
566
Ezio Melotti42da6632011-03-15 05:18:48 +0200567 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000568 try:
569 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
570 except TypeError:
571 pass
572
Antoine Pitrou38425292010-09-21 18:19:07 +0000573 def test_stat_attributes(self):
574 self.check_stat_attributes(self.fname)
575
576 def test_stat_attributes_bytes(self):
577 try:
578 fname = self.fname.encode(sys.getfilesystemencoding())
579 except UnicodeEncodeError:
580 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700581 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000582
Christian Heimes25827622013-10-12 01:27:08 +0200583 def test_stat_result_pickle(self):
584 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200585 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
586 p = pickle.dumps(result, proto)
587 self.assertIn(b'stat_result', p)
588 if proto < 4:
589 self.assertIn(b'cos\nstat_result\n', p)
590 unpickled = pickle.loads(p)
591 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200592
Serhiy Storchaka43767632013-11-03 21:31:38 +0200593 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000594 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700595 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000596
597 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000598 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000599
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000600 # Make sure all the attributes are there.
601 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
602 'ffree', 'favail', 'flag', 'namemax')
603 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000604 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000605
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100606 self.assertTrue(isinstance(result.f_fsid, int))
607
608 # Test that the size of the tuple doesn't change
609 self.assertEqual(len(result), 10)
610
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000611 # Make sure that assignment really fails
612 try:
613 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200614 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000615 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000616 pass
617
618 try:
619 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200620 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000621 except AttributeError:
622 pass
623
624 # Use the constructor with a too-short tuple.
625 try:
626 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200627 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000628 except TypeError:
629 pass
630
Ezio Melotti42da6632011-03-15 05:18:48 +0200631 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000632 try:
633 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
634 except TypeError:
635 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000636
Christian Heimes25827622013-10-12 01:27:08 +0200637 @unittest.skipUnless(hasattr(os, 'statvfs'),
638 "need os.statvfs()")
639 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700640 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200641
Serhiy Storchakabad12572014-12-15 14:03:42 +0200642 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
643 p = pickle.dumps(result, proto)
644 self.assertIn(b'statvfs_result', p)
645 if proto < 4:
646 self.assertIn(b'cos\nstatvfs_result\n', p)
647 unpickled = pickle.loads(p)
648 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200649
Serhiy Storchaka43767632013-11-03 21:31:38 +0200650 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
651 def test_1686475(self):
652 # Verify that an open file can be stat'ed
653 try:
654 os.stat(r"c:\pagefile.sys")
655 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600656 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200657 except OSError as e:
658 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000659
Serhiy Storchaka43767632013-11-03 21:31:38 +0200660 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
661 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
662 def test_15261(self):
663 # Verify that stat'ing a closed fd does not cause crash
664 r, w = os.pipe()
665 try:
666 os.stat(r) # should not raise error
667 finally:
668 os.close(r)
669 os.close(w)
670 with self.assertRaises(OSError) as ctx:
671 os.stat(r)
672 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100673
Zachary Ware63f277b2014-06-19 09:46:37 -0500674 def check_file_attributes(self, result):
675 self.assertTrue(hasattr(result, 'st_file_attributes'))
676 self.assertTrue(isinstance(result.st_file_attributes, int))
677 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
678
679 @unittest.skipUnless(sys.platform == "win32",
680 "st_file_attributes is Win32 specific")
681 def test_file_attributes(self):
682 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
683 result = os.stat(self.fname)
684 self.check_file_attributes(result)
685 self.assertEqual(
686 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
687 0)
688
689 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800690 dirname = os_helper.TESTFN + "dir"
Victor Stinner47aacc82015-06-12 17:26:23 +0200691 os.mkdir(dirname)
692 self.addCleanup(os.rmdir, dirname)
693
694 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500695 self.check_file_attributes(result)
696 self.assertEqual(
697 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
698 stat.FILE_ATTRIBUTE_DIRECTORY)
699
Berker Peksag0b4dc482016-09-17 15:49:59 +0300700 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
701 def test_access_denied(self):
702 # Default to FindFirstFile WIN32_FIND_DATA when access is
703 # denied. See issue 28075.
704 # os.environ['TEMP'] should be located on a volume that
705 # supports file ACLs.
706 fname = os.path.join(os.environ['TEMP'], self.fname)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800707 self.addCleanup(os_helper.unlink, fname)
Berker Peksag0b4dc482016-09-17 15:49:59 +0300708 create_file(fname, b'ABC')
709 # Deny the right to [S]YNCHRONIZE on the file to
710 # force CreateFile to fail with ERROR_ACCESS_DENIED.
711 DETACHED_PROCESS = 8
712 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500713 # bpo-30584: Use security identifier *S-1-5-32-545 instead
714 # of localized "Users" to not depend on the locale.
715 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300716 creationflags=DETACHED_PROCESS
717 )
718 result = os.stat(fname)
719 self.assertNotEqual(result.st_size, 0)
720
Steve Dower772ec0f2019-09-04 14:42:54 -0700721 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
722 def test_stat_block_device(self):
723 # bpo-38030: os.stat fails for block devices
724 # Test a filename like "//./C:"
725 fname = "//./" + os.path.splitdrive(os.getcwd())[0]
726 result = os.stat(fname)
727 self.assertEqual(result.st_mode, stat.S_IFBLK)
728
Victor Stinner47aacc82015-06-12 17:26:23 +0200729
730class UtimeTests(unittest.TestCase):
731 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +0800732 self.dirname = os_helper.TESTFN
Victor Stinner47aacc82015-06-12 17:26:23 +0200733 self.fname = os.path.join(self.dirname, "f1")
734
Hai Shi0c4f0f32020-06-30 21:46:31 +0800735 self.addCleanup(os_helper.rmtree, self.dirname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200736 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100737 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200738
Victor Stinner47aacc82015-06-12 17:26:23 +0200739 def support_subsecond(self, filename):
740 # Heuristic to check if the filesystem supports timestamp with
741 # subsecond resolution: check if float and int timestamps are different
742 st = os.stat(filename)
743 return ((st.st_atime != st[7])
744 or (st.st_mtime != st[8])
745 or (st.st_ctime != st[9]))
746
747 def _test_utime(self, set_time, filename=None):
748 if not filename:
749 filename = self.fname
750
751 support_subsecond = self.support_subsecond(filename)
752 if support_subsecond:
753 # Timestamp with a resolution of 1 microsecond (10^-6).
754 #
755 # The resolution of the C internal function used by os.utime()
756 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
757 # test with a resolution of 1 ns requires more work:
758 # see the issue #15745.
759 atime_ns = 1002003000 # 1.002003 seconds
760 mtime_ns = 4005006000 # 4.005006 seconds
761 else:
762 # use a resolution of 1 second
763 atime_ns = 5 * 10**9
764 mtime_ns = 8 * 10**9
765
766 set_time(filename, (atime_ns, mtime_ns))
767 st = os.stat(filename)
768
769 if support_subsecond:
770 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
771 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
772 else:
773 self.assertEqual(st.st_atime, atime_ns * 1e-9)
774 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
775 self.assertEqual(st.st_atime_ns, atime_ns)
776 self.assertEqual(st.st_mtime_ns, mtime_ns)
777
778 def test_utime(self):
779 def set_time(filename, ns):
780 # test the ns keyword parameter
781 os.utime(filename, ns=ns)
782 self._test_utime(set_time)
783
784 @staticmethod
785 def ns_to_sec(ns):
786 # Convert a number of nanosecond (int) to a number of seconds (float).
787 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
788 # issue, os.utime() rounds towards minus infinity.
789 return (ns * 1e-9) + 0.5e-9
790
791 def test_utime_by_indexed(self):
792 # pass times as floating point seconds as the second indexed parameter
793 def set_time(filename, ns):
794 atime_ns, mtime_ns = ns
795 atime = self.ns_to_sec(atime_ns)
796 mtime = self.ns_to_sec(mtime_ns)
797 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
798 # or utime(time_t)
799 os.utime(filename, (atime, mtime))
800 self._test_utime(set_time)
801
802 def test_utime_by_times(self):
803 def set_time(filename, ns):
804 atime_ns, mtime_ns = ns
805 atime = self.ns_to_sec(atime_ns)
806 mtime = self.ns_to_sec(mtime_ns)
807 # test the times keyword parameter
808 os.utime(filename, times=(atime, mtime))
809 self._test_utime(set_time)
810
811 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
812 "follow_symlinks support for utime required "
813 "for this test.")
814 def test_utime_nofollow_symlinks(self):
815 def set_time(filename, ns):
816 # use follow_symlinks=False to test utimensat(timespec)
817 # or lutimes(timeval)
818 os.utime(filename, ns=ns, follow_symlinks=False)
819 self._test_utime(set_time)
820
821 @unittest.skipUnless(os.utime in os.supports_fd,
822 "fd support for utime required for this test.")
823 def test_utime_fd(self):
824 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100825 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200826 # use a file descriptor to test futimens(timespec)
827 # or futimes(timeval)
828 os.utime(fp.fileno(), ns=ns)
829 self._test_utime(set_time)
830
831 @unittest.skipUnless(os.utime in os.supports_dir_fd,
832 "dir_fd support for utime required for this test.")
833 def test_utime_dir_fd(self):
834 def set_time(filename, ns):
835 dirname, name = os.path.split(filename)
836 dirfd = os.open(dirname, os.O_RDONLY)
837 try:
838 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
839 os.utime(name, dir_fd=dirfd, ns=ns)
840 finally:
841 os.close(dirfd)
842 self._test_utime(set_time)
843
844 def test_utime_directory(self):
845 def set_time(filename, ns):
846 # test calling os.utime() on a directory
847 os.utime(filename, ns=ns)
848 self._test_utime(set_time, filename=self.dirname)
849
850 def _test_utime_current(self, set_time):
851 # Get the system clock
852 current = time.time()
853
854 # Call os.utime() to set the timestamp to the current system clock
855 set_time(self.fname)
856
857 if not self.support_subsecond(self.fname):
858 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700859 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200860 # On Windows, the usual resolution of time.time() is 15.6 ms.
861 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700862 #
863 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
864 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200865 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200866 st = os.stat(self.fname)
867 msg = ("st_time=%r, current=%r, dt=%r"
868 % (st.st_mtime, current, st.st_mtime - current))
869 self.assertAlmostEqual(st.st_mtime, current,
870 delta=delta, msg=msg)
871
872 def test_utime_current(self):
873 def set_time(filename):
874 # Set to the current time in the new way
875 os.utime(self.fname)
876 self._test_utime_current(set_time)
877
878 def test_utime_current_old(self):
879 def set_time(filename):
880 # Set to the current time in the old explicit way.
881 os.utime(self.fname, None)
882 self._test_utime_current(set_time)
883
884 def get_file_system(self, path):
885 if sys.platform == 'win32':
886 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
887 import ctypes
888 kernel32 = ctypes.windll.kernel32
889 buf = ctypes.create_unicode_buffer("", 100)
890 ok = kernel32.GetVolumeInformationW(root, None, 0,
891 None, None, None,
892 buf, len(buf))
893 if ok:
894 return buf.value
895 # return None if the filesystem is unknown
896
897 def test_large_time(self):
898 # Many filesystems are limited to the year 2038. At least, the test
899 # pass with NTFS filesystem.
900 if self.get_file_system(self.dirname) != "NTFS":
901 self.skipTest("requires NTFS")
902
903 large = 5000000000 # some day in 2128
904 os.utime(self.fname, (large, large))
905 self.assertEqual(os.stat(self.fname).st_mtime, large)
906
907 def test_utime_invalid_arguments(self):
908 # seconds and nanoseconds parameters are mutually exclusive
909 with self.assertRaises(ValueError):
910 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200911 with self.assertRaises(TypeError):
912 os.utime(self.fname, [5, 5])
913 with self.assertRaises(TypeError):
914 os.utime(self.fname, (5,))
915 with self.assertRaises(TypeError):
916 os.utime(self.fname, (5, 5, 5))
917 with self.assertRaises(TypeError):
918 os.utime(self.fname, ns=[5, 5])
919 with self.assertRaises(TypeError):
920 os.utime(self.fname, ns=(5,))
921 with self.assertRaises(TypeError):
922 os.utime(self.fname, ns=(5, 5, 5))
923
924 if os.utime not in os.supports_follow_symlinks:
925 with self.assertRaises(NotImplementedError):
926 os.utime(self.fname, (5, 5), follow_symlinks=False)
927 if os.utime not in os.supports_fd:
928 with open(self.fname, 'wb', 0) as fp:
929 with self.assertRaises(TypeError):
930 os.utime(fp.fileno(), (5, 5))
931 if os.utime not in os.supports_dir_fd:
932 with self.assertRaises(NotImplementedError):
933 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200934
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300935 @support.cpython_only
936 def test_issue31577(self):
937 # The interpreter shouldn't crash in case utime() received a bad
938 # ns argument.
939 def get_bad_int(divmod_ret_val):
940 class BadInt:
941 def __divmod__(*args):
942 return divmod_ret_val
943 return BadInt()
944 with self.assertRaises(TypeError):
945 os.utime(self.fname, ns=(get_bad_int(42), 1))
946 with self.assertRaises(TypeError):
947 os.utime(self.fname, ns=(get_bad_int(()), 1))
948 with self.assertRaises(TypeError):
949 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
950
Victor Stinner47aacc82015-06-12 17:26:23 +0200951
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000952from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000953
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000954class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000955 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000956 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000957
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000958 def setUp(self):
959 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000960 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000961 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000962 for key, value in self._reference().items():
963 os.environ[key] = value
964
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000965 def tearDown(self):
966 os.environ.clear()
967 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000968 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000969 os.environb.clear()
970 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000971
Christian Heimes90333392007-11-01 19:08:42 +0000972 def _reference(self):
973 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
974
975 def _empty_mapping(self):
976 os.environ.clear()
977 return os.environ
978
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000979 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200980 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
981 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000982 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000983 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300984 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200985 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300986 value = popen.read().strip()
987 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000988
Xavier de Gayed1415312016-07-22 12:15:29 +0200989 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
990 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000991 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200992 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
993 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300994 it = iter(popen)
995 self.assertEqual(next(it), "line1\n")
996 self.assertEqual(next(it), "line2\n")
997 self.assertEqual(next(it), "line3\n")
998 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000999
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001000 # Verify environ keys and values from the OS are of the
1001 # correct str type.
1002 def test_keyvalue_types(self):
1003 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001004 self.assertEqual(type(key), str)
1005 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +00001006
Christian Heimes90333392007-11-01 19:08:42 +00001007 def test_items(self):
1008 for key, value in self._reference().items():
1009 self.assertEqual(os.environ.get(key), value)
1010
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001011 # Issue 7310
1012 def test___repr__(self):
1013 """Check that the repr() of os.environ looks like environ({...})."""
1014 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +00001015 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
1016 '{!r}: {!r}'.format(key, value)
1017 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001018
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +00001019 def test_get_exec_path(self):
1020 defpath_list = os.defpath.split(os.pathsep)
1021 test_path = ['/monty', '/python', '', '/flying/circus']
1022 test_env = {'PATH': os.pathsep.join(test_path)}
1023
1024 saved_environ = os.environ
1025 try:
1026 os.environ = dict(test_env)
1027 # Test that defaulting to os.environ works.
1028 self.assertSequenceEqual(test_path, os.get_exec_path())
1029 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1030 finally:
1031 os.environ = saved_environ
1032
1033 # No PATH environment variable
1034 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1035 # Empty PATH environment variable
1036 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1037 # Supplied PATH environment variable
1038 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1039
Victor Stinnerb745a742010-05-18 17:17:23 +00001040 if os.supports_bytes_environ:
1041 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +00001042 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001043 # ignore BytesWarning warning
1044 with warnings.catch_warnings(record=True):
1045 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +00001046 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +00001047 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +00001048 pass
1049 else:
1050 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +00001051
1052 # bytes key and/or value
1053 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1054 ['abc'])
1055 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1056 ['abc'])
1057 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1058 ['abc'])
1059
1060 @unittest.skipUnless(os.supports_bytes_environ,
1061 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +00001062 def test_environb(self):
1063 # os.environ -> os.environb
1064 value = 'euro\u20ac'
1065 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001066 value_bytes = value.encode(sys.getfilesystemencoding(),
1067 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +00001068 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +00001069 msg = "U+20AC character is not encodable to %s" % (
1070 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +00001071 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +00001072 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001073 self.assertEqual(os.environ['unicode'], value)
1074 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +00001075
1076 # os.environb -> os.environ
1077 value = b'\xff'
1078 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +00001079 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +00001080 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001081 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +00001082
Victor Stinner161e7b32020-01-24 11:53:44 +01001083 def test_putenv_unsetenv(self):
1084 name = "PYTHONTESTVAR"
1085 value = "testvalue"
1086 code = f'import os; print(repr(os.environ.get({name!r})))'
1087
Hai Shi0c4f0f32020-06-30 21:46:31 +08001088 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner161e7b32020-01-24 11:53:44 +01001089 env.pop(name, None)
1090
1091 os.putenv(name, value)
1092 proc = subprocess.run([sys.executable, '-c', code], check=True,
1093 stdout=subprocess.PIPE, text=True)
1094 self.assertEqual(proc.stdout.rstrip(), repr(value))
1095
1096 os.unsetenv(name)
1097 proc = subprocess.run([sys.executable, '-c', code], check=True,
1098 stdout=subprocess.PIPE, text=True)
1099 self.assertEqual(proc.stdout.rstrip(), repr(None))
1100
Victor Stinner13ff2452018-01-22 18:32:50 +01001101 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +01001102 @support.requires_mac_ver(10, 6)
Victor Stinner161e7b32020-01-24 11:53:44 +01001103 def test_putenv_unsetenv_error(self):
1104 # Empty variable name is invalid.
1105 # "=" and null character are not allowed in a variable name.
1106 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1107 self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1108 self.assertRaises((OSError, ValueError), os.unsetenv, name)
1109
Victor Stinnerb73dd022020-01-22 21:11:17 +01001110 if sys.platform == "win32":
Victor Stinner161e7b32020-01-24 11:53:44 +01001111 # On Windows, an environment variable string ("name=value" string)
1112 # is limited to 32,767 characters
1113 longstr = 'x' * 32_768
1114 self.assertRaises(ValueError, os.putenv, longstr, "1")
1115 self.assertRaises(ValueError, os.putenv, "X", longstr)
1116 self.assertRaises(ValueError, os.unsetenv, longstr)
Victor Stinner60b385e2011-11-22 22:01:28 +01001117
Victor Stinner6d101392013-04-14 16:35:04 +02001118 def test_key_type(self):
1119 missing = 'missingkey'
1120 self.assertNotIn(missing, os.environ)
1121
Victor Stinner839e5ea2013-04-14 16:43:03 +02001122 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001123 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001124 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001125 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +02001126
Victor Stinner839e5ea2013-04-14 16:43:03 +02001127 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +02001128 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +02001129 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +02001130 self.assertTrue(cm.exception.__suppress_context__)
1131
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -03001132 def _test_environ_iteration(self, collection):
1133 iterator = iter(collection)
1134 new_key = "__new_key__"
1135
1136 next(iterator) # start iteration over os.environ.items
1137
1138 # add a new key in os.environ mapping
1139 os.environ[new_key] = "test_environ_iteration"
1140
1141 try:
1142 next(iterator) # force iteration over modified mapping
1143 self.assertEqual(os.environ[new_key], "test_environ_iteration")
1144 finally:
1145 del os.environ[new_key]
1146
1147 def test_iter_error_when_changing_os_environ(self):
1148 self._test_environ_iteration(os.environ)
1149
1150 def test_iter_error_when_changing_os_environ_items(self):
1151 self._test_environ_iteration(os.environ.items())
1152
1153 def test_iter_error_when_changing_os_environ_values(self):
1154 self._test_environ_iteration(os.environ.values())
1155
Charles Burklandd648ef12020-03-13 09:04:43 -07001156 def _test_underlying_process_env(self, var, expected):
1157 if not (unix_shell and os.path.exists(unix_shell)):
1158 return
1159
1160 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1161 value = popen.read().strip()
1162
1163 self.assertEqual(expected, value)
1164
1165 def test_or_operator(self):
1166 overridden_key = '_TEST_VAR_'
1167 original_value = 'original_value'
1168 os.environ[overridden_key] = original_value
1169
1170 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1171 expected = dict(os.environ)
1172 expected.update(new_vars_dict)
1173
1174 actual = os.environ | new_vars_dict
1175 self.assertDictEqual(expected, actual)
1176 self.assertEqual('3', actual[overridden_key])
1177
1178 new_vars_items = new_vars_dict.items()
1179 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1180
1181 self._test_underlying_process_env('_A_', '')
1182 self._test_underlying_process_env(overridden_key, original_value)
1183
1184 def test_ior_operator(self):
1185 overridden_key = '_TEST_VAR_'
1186 os.environ[overridden_key] = 'original_value'
1187
1188 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1189 expected = dict(os.environ)
1190 expected.update(new_vars_dict)
1191
1192 os.environ |= new_vars_dict
1193 self.assertEqual(expected, os.environ)
1194 self.assertEqual('3', os.environ[overridden_key])
1195
1196 self._test_underlying_process_env('_A_', '1')
1197 self._test_underlying_process_env(overridden_key, '3')
1198
1199 def test_ior_operator_invalid_dicts(self):
1200 os_environ_copy = os.environ.copy()
1201 with self.assertRaises(TypeError):
1202 dict_with_bad_key = {1: '_A_'}
1203 os.environ |= dict_with_bad_key
1204
1205 with self.assertRaises(TypeError):
1206 dict_with_bad_val = {'_A_': 1}
1207 os.environ |= dict_with_bad_val
1208
1209 # Check nothing was added.
1210 self.assertEqual(os_environ_copy, os.environ)
1211
1212 def test_ior_operator_key_value_iterable(self):
1213 overridden_key = '_TEST_VAR_'
1214 os.environ[overridden_key] = 'original_value'
1215
1216 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1217 expected = dict(os.environ)
1218 expected.update(new_vars_items)
1219
1220 os.environ |= new_vars_items
1221 self.assertEqual(expected, os.environ)
1222 self.assertEqual('3', os.environ[overridden_key])
1223
1224 self._test_underlying_process_env('_A_', '1')
1225 self._test_underlying_process_env(overridden_key, '3')
1226
1227 def test_ror_operator(self):
1228 overridden_key = '_TEST_VAR_'
1229 original_value = 'original_value'
1230 os.environ[overridden_key] = original_value
1231
1232 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1233 expected = dict(new_vars_dict)
1234 expected.update(os.environ)
1235
1236 actual = new_vars_dict | os.environ
1237 self.assertDictEqual(expected, actual)
1238 self.assertEqual(original_value, actual[overridden_key])
1239
1240 new_vars_items = new_vars_dict.items()
1241 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1242
1243 self._test_underlying_process_env('_A_', '')
1244 self._test_underlying_process_env(overridden_key, original_value)
1245
Victor Stinner6d101392013-04-14 16:35:04 +02001246
Tim Petersc4e09402003-04-25 07:11:48 +00001247class WalkTests(unittest.TestCase):
1248 """Tests for os.walk()."""
1249
Victor Stinner0561c532015-03-12 10:28:24 +01001250 # Wrapper to hide minor differences between os.walk and os.fwalk
1251 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001252 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +02001253 if 'follow_symlinks' in kwargs:
1254 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001255 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +01001256
Charles-François Natali7372b062012-02-05 15:15:38 +01001257 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +01001258 join = os.path.join
Hai Shi0c4f0f32020-06-30 21:46:31 +08001259 self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +00001260
1261 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +00001262 # TESTFN/
1263 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +00001264 # tmp1
1265 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +00001266 # tmp2
1267 # SUB11/ no kids
1268 # SUB2/ a file kid and a dirsymlink kid
1269 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001270 # SUB21/ not readable
1271 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +00001272 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001273 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001274 # broken_link2
1275 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +00001276 # TEST2/
1277 # tmp4 a lone file
Hai Shi0c4f0f32020-06-30 21:46:31 +08001278 self.walk_path = join(os_helper.TESTFN, "TEST1")
Victor Stinner0561c532015-03-12 10:28:24 +01001279 self.sub1_path = join(self.walk_path, "SUB1")
1280 self.sub11_path = join(self.sub1_path, "SUB11")
1281 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001282 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +01001283 tmp1_path = join(self.walk_path, "tmp1")
1284 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +00001285 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001286 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +01001287 self.link_path = join(sub2_path, "link")
Hai Shi0c4f0f32020-06-30 21:46:31 +08001288 t2_path = join(os_helper.TESTFN, "TEST2")
1289 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +02001290 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001291 broken_link2_path = join(sub2_path, "broken_link2")
1292 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +00001293
1294 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +01001295 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +00001296 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001297 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +00001298 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +01001299
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001300 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03001301 with open(path, "x", encoding='utf-8') as f:
Victor Stinnere77c9742016-03-25 10:28:23 +01001302 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001303
Hai Shi0c4f0f32020-06-30 21:46:31 +08001304 if os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001305 os.symlink(os.path.abspath(t2_path), self.link_path)
1306 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001307 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1308 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001309 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001310 ["broken_link", "broken_link2", "broken_link3",
1311 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001312 else:
pxinwr3e028b22019-02-15 13:04:47 +08001313 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001314
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001315 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001316 try:
1317 os.listdir(sub21_path)
1318 except PermissionError:
1319 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1320 else:
1321 os.chmod(sub21_path, stat.S_IRWXU)
1322 os.unlink(tmp5_path)
1323 os.rmdir(sub21_path)
1324 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001325
Victor Stinner0561c532015-03-12 10:28:24 +01001326 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001327 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001328 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001329
Tim Petersc4e09402003-04-25 07:11:48 +00001330 self.assertEqual(len(all), 4)
1331 # We can't know which order SUB1 and SUB2 will appear in.
1332 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1333 # flipped: TESTFN, SUB2, SUB1, SUB11
1334 flipped = all[0][1][0] != "SUB1"
1335 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001336 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001337 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001338 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1339 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1340 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1341 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001342
Brett Cannon3f9183b2016-08-26 14:44:48 -07001343 def test_walk_prune(self, walk_path=None):
1344 if walk_path is None:
1345 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001346 # Prune the search.
1347 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001348 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001349 all.append((root, dirs, files))
1350 # Don't descend into SUB1.
1351 if 'SUB1' in dirs:
1352 # Note that this also mutates the dirs we appended to all!
1353 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001354
Victor Stinner0561c532015-03-12 10:28:24 +01001355 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001356 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001357
1358 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001359 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001360 self.assertEqual(all[1], self.sub2_tree)
1361
Brett Cannon3f9183b2016-08-26 14:44:48 -07001362 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001363 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001364
Victor Stinner0561c532015-03-12 10:28:24 +01001365 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001366 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001367 all = list(self.walk(self.walk_path, topdown=False))
1368
Victor Stinner53b0a412016-03-26 01:12:36 +01001369 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001370 # We can't know which order SUB1 and SUB2 will appear in.
1371 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1372 # flipped: SUB2, SUB11, SUB1, TESTFN
1373 flipped = all[3][1][0] != "SUB1"
1374 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001375 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001376 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001377 self.assertEqual(all[3],
1378 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1379 self.assertEqual(all[flipped],
1380 (self.sub11_path, [], []))
1381 self.assertEqual(all[flipped + 1],
1382 (self.sub1_path, ["SUB11"], ["tmp2"]))
1383 self.assertEqual(all[2 - 2 * flipped],
1384 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001385
Victor Stinner0561c532015-03-12 10:28:24 +01001386 def test_walk_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001387 if not os_helper.can_symlink():
Victor Stinner0561c532015-03-12 10:28:24 +01001388 self.skipTest("need symlink support")
1389
1390 # Walk, following symlinks.
1391 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1392 for root, dirs, files in walk_it:
1393 if root == self.link_path:
1394 self.assertEqual(dirs, [])
1395 self.assertEqual(files, ["tmp4"])
1396 break
1397 else:
1398 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001399
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001400 def test_walk_bad_dir(self):
1401 # Walk top-down.
1402 errors = []
1403 walk_it = self.walk(self.walk_path, onerror=errors.append)
1404 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001405 self.assertEqual(errors, [])
1406 dir1 = 'SUB1'
1407 path1 = os.path.join(root, dir1)
1408 path1new = os.path.join(root, dir1 + '.new')
1409 os.rename(path1, path1new)
1410 try:
1411 roots = [r for r, d, f in walk_it]
1412 self.assertTrue(errors)
1413 self.assertNotIn(path1, roots)
1414 self.assertNotIn(path1new, roots)
1415 for dir2 in dirs:
1416 if dir2 != dir1:
1417 self.assertIn(os.path.join(root, dir2), roots)
1418 finally:
1419 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001420
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001421 def test_walk_many_open_files(self):
1422 depth = 30
Hai Shi0c4f0f32020-06-30 21:46:31 +08001423 base = os.path.join(os_helper.TESTFN, 'deep')
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001424 p = os.path.join(base, *(['d']*depth))
1425 os.makedirs(p)
1426
1427 iters = [self.walk(base, topdown=False) for j in range(100)]
1428 for i in range(depth + 1):
1429 expected = (p, ['d'] if i else [], [])
1430 for it in iters:
1431 self.assertEqual(next(it), expected)
1432 p = os.path.dirname(p)
1433
1434 iters = [self.walk(base, topdown=True) for j in range(100)]
1435 p = base
1436 for i in range(depth + 1):
1437 expected = (p, ['d'] if i < depth else [], [])
1438 for it in iters:
1439 self.assertEqual(next(it), expected)
1440 p = os.path.join(p, 'd')
1441
Charles-François Natali7372b062012-02-05 15:15:38 +01001442
1443@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1444class FwalkTests(WalkTests):
1445 """Tests for os.fwalk()."""
1446
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001447 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001448 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001449 yield (root, dirs, files)
1450
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001451 def fwalk(self, *args, **kwargs):
1452 return os.fwalk(*args, **kwargs)
1453
Larry Hastingsc48fe982012-06-25 04:49:05 -07001454 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1455 """
1456 compare with walk() results.
1457 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001458 walk_kwargs = walk_kwargs.copy()
1459 fwalk_kwargs = fwalk_kwargs.copy()
1460 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1461 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1462 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001463
Charles-François Natali7372b062012-02-05 15:15:38 +01001464 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001465 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001466 expected[root] = (set(dirs), set(files))
1467
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001468 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001469 self.assertIn(root, expected)
1470 self.assertEqual(expected[root], (set(dirs), set(files)))
1471
Larry Hastingsc48fe982012-06-25 04:49:05 -07001472 def test_compare_to_walk(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001473 kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001474 self._compare_to_walk(kwargs, kwargs)
1475
Charles-François Natali7372b062012-02-05 15:15:38 +01001476 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001477 try:
1478 fd = os.open(".", os.O_RDONLY)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001479 walk_kwargs = {'top': os_helper.TESTFN}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001480 fwalk_kwargs = walk_kwargs.copy()
1481 fwalk_kwargs['dir_fd'] = fd
1482 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1483 finally:
1484 os.close(fd)
1485
1486 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001487 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001488 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001489 args = os_helper.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001490 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001491 # check that the FD is valid
1492 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001493 # redundant check
1494 os.stat(rootfd)
1495 # check that listdir() returns consistent information
1496 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001497
1498 def test_fd_leak(self):
1499 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1500 # we both check that calling fwalk() a large number of times doesn't
1501 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1502 minfd = os.dup(1)
1503 os.close(minfd)
1504 for i in range(256):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001505 for x in self.fwalk(os_helper.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001506 pass
1507 newfd = os.dup(1)
1508 self.addCleanup(os.close, newfd)
1509 self.assertEqual(newfd, minfd)
1510
Serhiy Storchakaf9dc2ad2019-09-12 15:54:48 +03001511 # fwalk() keeps file descriptors open
1512 test_walk_many_open_files = None
1513
1514
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001515class BytesWalkTests(WalkTests):
1516 """Tests for os.walk() with bytes."""
1517 def walk(self, top, **kwargs):
1518 if 'follow_symlinks' in kwargs:
1519 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1520 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1521 root = os.fsdecode(broot)
1522 dirs = list(map(os.fsdecode, bdirs))
1523 files = list(map(os.fsdecode, bfiles))
1524 yield (root, dirs, files)
1525 bdirs[:] = list(map(os.fsencode, dirs))
1526 bfiles[:] = list(map(os.fsencode, files))
1527
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001528@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1529class BytesFwalkTests(FwalkTests):
1530 """Tests for os.walk() with bytes."""
1531 def fwalk(self, top='.', *args, **kwargs):
1532 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1533 root = os.fsdecode(broot)
1534 dirs = list(map(os.fsdecode, bdirs))
1535 files = list(map(os.fsdecode, bfiles))
1536 yield (root, dirs, files, topfd)
1537 bdirs[:] = list(map(os.fsencode, dirs))
1538 bfiles[:] = list(map(os.fsencode, files))
1539
Charles-François Natali7372b062012-02-05 15:15:38 +01001540
Guido van Rossume7ba4952007-06-06 23:52:48 +00001541class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001542 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001543 os.mkdir(os_helper.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001544
1545 def test_makedir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001546 base = os_helper.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001547 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1548 os.makedirs(path) # Should work
1549 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1550 os.makedirs(path)
1551
1552 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001553 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001554 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1555 os.makedirs(path)
1556 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1557 'dir5', 'dir6')
1558 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001559
Serhiy Storchakae304e332017-03-24 13:27:42 +02001560 def test_mode(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001561 with os_helper.temp_umask(0o002):
1562 base = os_helper.TESTFN
Serhiy Storchakae304e332017-03-24 13:27:42 +02001563 parent = os.path.join(base, 'dir1')
1564 path = os.path.join(parent, 'dir2')
1565 os.makedirs(path, 0o555)
1566 self.assertTrue(os.path.exists(path))
1567 self.assertTrue(os.path.isdir(path))
1568 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001569 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1570 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001571
Terry Reedy5a22b652010-12-02 07:05:56 +00001572 def test_exist_ok_existing_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001573 path = os.path.join(os_helper.TESTFN, 'dir1')
Terry Reedy5a22b652010-12-02 07:05:56 +00001574 mode = 0o777
1575 old_mask = os.umask(0o022)
1576 os.makedirs(path, mode)
1577 self.assertRaises(OSError, os.makedirs, path, mode)
1578 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001579 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001580 os.makedirs(path, mode=mode, exist_ok=True)
1581 os.umask(old_mask)
1582
Martin Pantera82642f2015-11-19 04:48:44 +00001583 # Issue #25583: A drive root could raise PermissionError on Windows
1584 os.makedirs(os.path.abspath('/'), exist_ok=True)
1585
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001586 def test_exist_ok_s_isgid_directory(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001587 path = os.path.join(os_helper.TESTFN, 'dir1')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001588 S_ISGID = stat.S_ISGID
1589 mode = 0o777
1590 old_mask = os.umask(0o022)
1591 try:
1592 existing_testfn_mode = stat.S_IMODE(
Hai Shi0c4f0f32020-06-30 21:46:31 +08001593 os.lstat(os_helper.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001594 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001595 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001596 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001597 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Hai Shi0c4f0f32020-06-30 21:46:31 +08001598 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001599 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1600 # The os should apply S_ISGID from the parent dir for us, but
1601 # this test need not depend on that behavior. Be explicit.
1602 os.makedirs(path, mode | S_ISGID)
1603 # http://bugs.python.org/issue14992
1604 # Should not fail when the bit is already set.
1605 os.makedirs(path, mode, exist_ok=True)
1606 # remove the bit.
1607 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001608 # May work even when the bit is not already set when demanded.
1609 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001610 finally:
1611 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001612
1613 def test_exist_ok_existing_regular_file(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001614 base = os_helper.TESTFN
1615 path = os.path.join(os_helper.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001616 with open(path, 'w') as f:
1617 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001618 self.assertRaises(OSError, os.makedirs, path)
1619 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1620 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1621 os.remove(path)
1622
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001623 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001624 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001625 'dir4', 'dir5', 'dir6')
1626 # If the tests failed, the bottom-most directory ('../dir6')
1627 # may not have been created, so we look for the outermost directory
1628 # that exists.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001629 while not os.path.exists(path) and path != os_helper.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001630 path = os.path.dirname(path)
1631
1632 os.removedirs(path)
1633
Andrew Svetlov405faed2012-12-25 12:18:09 +02001634
R David Murrayf2ad1732014-12-25 18:36:56 -05001635@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1636class ChownFileTests(unittest.TestCase):
1637
Berker Peksag036a71b2015-07-21 09:29:48 +03001638 @classmethod
1639 def setUpClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001640 os.mkdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001641
1642 def test_chown_uid_gid_arguments_must_be_index(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001643 stat = os.stat(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001644 uid = stat.st_uid
1645 gid = stat.st_gid
1646 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001647 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1648 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1649 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1650 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
R David Murrayf2ad1732014-12-25 18:36:56 -05001651
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001652 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1653 def test_chown_gid(self):
1654 groups = os.getgroups()
1655 if len(groups) < 2:
1656 self.skipTest("test needs at least 2 groups")
1657
R David Murrayf2ad1732014-12-25 18:36:56 -05001658 gid_1, gid_2 = groups[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001659 uid = os.stat(os_helper.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001660
Hai Shi0c4f0f32020-06-30 21:46:31 +08001661 os.chown(os_helper.TESTFN, uid, gid_1)
1662 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001663 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001664
Hai Shi0c4f0f32020-06-30 21:46:31 +08001665 os.chown(os_helper.TESTFN, uid, gid_2)
1666 gid = os.stat(os_helper.TESTFN).st_gid
R David Murrayf2ad1732014-12-25 18:36:56 -05001667 self.assertEqual(gid, gid_2)
1668
1669 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1670 "test needs root privilege and more than one user")
1671 def test_chown_with_root(self):
1672 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001673 gid = os.stat(os_helper.TESTFN).st_gid
1674 os.chown(os_helper.TESTFN, uid_1, gid)
1675 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001676 self.assertEqual(uid, uid_1)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001677 os.chown(os_helper.TESTFN, uid_2, gid)
1678 uid = os.stat(os_helper.TESTFN).st_uid
R David Murrayf2ad1732014-12-25 18:36:56 -05001679 self.assertEqual(uid, uid_2)
1680
1681 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1682 "test needs non-root account and more than one user")
1683 def test_chown_without_permission(self):
1684 uid_1, uid_2 = all_users[:2]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001685 gid = os.stat(os_helper.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001686 with self.assertRaises(PermissionError):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001687 os.chown(os_helper.TESTFN, uid_1, gid)
1688 os.chown(os_helper.TESTFN, uid_2, gid)
R David Murrayf2ad1732014-12-25 18:36:56 -05001689
Berker Peksag036a71b2015-07-21 09:29:48 +03001690 @classmethod
1691 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001692 os.rmdir(os_helper.TESTFN)
R David Murrayf2ad1732014-12-25 18:36:56 -05001693
1694
Andrew Svetlov405faed2012-12-25 12:18:09 +02001695class RemoveDirsTests(unittest.TestCase):
1696 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001697 os.makedirs(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001698
1699 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001700 os_helper.rmtree(os_helper.TESTFN)
Andrew Svetlov405faed2012-12-25 12:18:09 +02001701
1702 def test_remove_all(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001703 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001704 os.mkdir(dira)
1705 dirb = os.path.join(dira, 'dirb')
1706 os.mkdir(dirb)
1707 os.removedirs(dirb)
1708 self.assertFalse(os.path.exists(dirb))
1709 self.assertFalse(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001710 self.assertFalse(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001711
1712 def test_remove_partial(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001713 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001714 os.mkdir(dira)
1715 dirb = os.path.join(dira, 'dirb')
1716 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001717 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001718 os.removedirs(dirb)
1719 self.assertFalse(os.path.exists(dirb))
1720 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001721 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001722
1723 def test_remove_nothing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001724 dira = os.path.join(os_helper.TESTFN, 'dira')
Andrew Svetlov405faed2012-12-25 12:18:09 +02001725 os.mkdir(dira)
1726 dirb = os.path.join(dira, 'dirb')
1727 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001728 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001729 with self.assertRaises(OSError):
1730 os.removedirs(dirb)
1731 self.assertTrue(os.path.exists(dirb))
1732 self.assertTrue(os.path.exists(dira))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001733 self.assertTrue(os.path.exists(os_helper.TESTFN))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001734
1735
Guido van Rossume7ba4952007-06-06 23:52:48 +00001736class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001737 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001738 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001739 f.write(b'hello')
1740 f.close()
1741 with open(os.devnull, 'rb') as f:
1742 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001743
Andrew Svetlov405faed2012-12-25 12:18:09 +02001744
Guido van Rossume7ba4952007-06-06 23:52:48 +00001745class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001746 def test_urandom_length(self):
1747 self.assertEqual(len(os.urandom(0)), 0)
1748 self.assertEqual(len(os.urandom(1)), 1)
1749 self.assertEqual(len(os.urandom(10)), 10)
1750 self.assertEqual(len(os.urandom(100)), 100)
1751 self.assertEqual(len(os.urandom(1000)), 1000)
1752
1753 def test_urandom_value(self):
1754 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001755 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001756 data2 = os.urandom(16)
1757 self.assertNotEqual(data1, data2)
1758
1759 def get_urandom_subprocess(self, count):
1760 code = '\n'.join((
1761 'import os, sys',
1762 'data = os.urandom(%s)' % count,
1763 'sys.stdout.buffer.write(data)',
1764 'sys.stdout.buffer.flush()'))
1765 out = assert_python_ok('-c', code)
1766 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001767 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001768 return stdout
1769
1770 def test_urandom_subprocess(self):
1771 data1 = self.get_urandom_subprocess(16)
1772 data2 = self.get_urandom_subprocess(16)
1773 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001774
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001775
Victor Stinner9b1f4742016-09-06 16:18:52 -07001776@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1777class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001778 @classmethod
1779 def setUpClass(cls):
1780 try:
1781 os.getrandom(1)
1782 except OSError as exc:
1783 if exc.errno == errno.ENOSYS:
1784 # Python compiled on a more recent Linux version
1785 # than the current Linux kernel
1786 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1787 else:
1788 raise
1789
Victor Stinner9b1f4742016-09-06 16:18:52 -07001790 def test_getrandom_type(self):
1791 data = os.getrandom(16)
1792 self.assertIsInstance(data, bytes)
1793 self.assertEqual(len(data), 16)
1794
1795 def test_getrandom0(self):
1796 empty = os.getrandom(0)
1797 self.assertEqual(empty, b'')
1798
1799 def test_getrandom_random(self):
1800 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1801
1802 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1803 # resource /dev/random
1804
1805 def test_getrandom_nonblock(self):
1806 # The call must not fail. Check also that the flag exists
1807 try:
1808 os.getrandom(1, os.GRND_NONBLOCK)
1809 except BlockingIOError:
1810 # System urandom is not initialized yet
1811 pass
1812
1813 def test_getrandom_value(self):
1814 data1 = os.getrandom(16)
1815 data2 = os.getrandom(16)
1816 self.assertNotEqual(data1, data2)
1817
1818
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001819# os.urandom() doesn't use a file descriptor when it is implemented with the
1820# getentropy() function, the getrandom() function or the getrandom() syscall
1821OS_URANDOM_DONT_USE_FD = (
1822 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1823 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1824 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001825
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001826@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1827 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001828@unittest.skipIf(sys.platform == "vxworks",
1829 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001830class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001831 @unittest.skipUnless(resource, "test requires the resource module")
1832 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001833 # Check urandom() failing when it is not able to open /dev/random.
1834 # We spawn a new process to make the test more robust (if getrlimit()
1835 # failed to restore the file descriptor limit after this, the whole
1836 # test suite would crash; this actually happened on the OS X Tiger
1837 # buildbot).
1838 code = """if 1:
1839 import errno
1840 import os
1841 import resource
1842
1843 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1844 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1845 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001846 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001847 except OSError as e:
1848 assert e.errno == errno.EMFILE, e.errno
1849 else:
1850 raise AssertionError("OSError not raised")
1851 """
1852 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001853
Antoine Pitroue472aea2014-04-26 14:33:03 +02001854 def test_urandom_fd_closed(self):
1855 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1856 # closed.
1857 code = """if 1:
1858 import os
1859 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001860 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001861 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001862 with test.support.SuppressCrashReport():
1863 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001864 sys.stdout.buffer.write(os.urandom(4))
1865 """
1866 rc, out, err = assert_python_ok('-Sc', code)
1867
1868 def test_urandom_fd_reopened(self):
1869 # Issue #21207: urandom() should detect its fd to /dev/urandom
1870 # changed to something else, and reopen it.
Hai Shi0c4f0f32020-06-30 21:46:31 +08001871 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1872 create_file(os_helper.TESTFN, b"x" * 256)
Victor Stinnerae39d232016-03-24 17:12:55 +01001873
Antoine Pitroue472aea2014-04-26 14:33:03 +02001874 code = """if 1:
1875 import os
1876 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001877 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001878 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001879 with test.support.SuppressCrashReport():
1880 for fd in range(3, 256):
1881 try:
1882 os.close(fd)
1883 except OSError:
1884 pass
1885 else:
1886 # Found the urandom fd (XXX hopefully)
1887 break
1888 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001889 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001890 new_fd = f.fileno()
1891 # Issue #26935: posix allows new_fd and fd to be equal but
1892 # some libc implementations have dup2 return an error in this
1893 # case.
1894 if new_fd != fd:
1895 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001896 sys.stdout.buffer.write(os.urandom(4))
1897 sys.stdout.buffer.write(os.urandom(4))
Hai Shi0c4f0f32020-06-30 21:46:31 +08001898 """.format(TESTFN=os_helper.TESTFN)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001899 rc, out, err = assert_python_ok('-Sc', code)
1900 self.assertEqual(len(out), 8)
1901 self.assertNotEqual(out[0:4], out[4:8])
1902 rc, out2, err2 = assert_python_ok('-Sc', code)
1903 self.assertEqual(len(out2), 8)
1904 self.assertNotEqual(out2, out)
1905
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001906
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001907@contextlib.contextmanager
1908def _execvpe_mockup(defpath=None):
1909 """
1910 Stubs out execv and execve functions when used as context manager.
1911 Records exec calls. The mock execv and execve functions always raise an
1912 exception as they would normally never return.
1913 """
1914 # A list of tuples containing (function name, first arg, args)
1915 # of calls to execv or execve that have been made.
1916 calls = []
1917
1918 def mock_execv(name, *args):
1919 calls.append(('execv', name, args))
1920 raise RuntimeError("execv called")
1921
1922 def mock_execve(name, *args):
1923 calls.append(('execve', name, args))
1924 raise OSError(errno.ENOTDIR, "execve called")
1925
1926 try:
1927 orig_execv = os.execv
1928 orig_execve = os.execve
1929 orig_defpath = os.defpath
1930 os.execv = mock_execv
1931 os.execve = mock_execve
1932 if defpath is not None:
1933 os.defpath = defpath
1934 yield calls
1935 finally:
1936 os.execv = orig_execv
1937 os.execve = orig_execve
1938 os.defpath = orig_defpath
1939
pxinwrf2d7ac72019-05-21 18:46:37 +08001940@unittest.skipUnless(hasattr(os, 'execv'),
1941 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001942class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001943 @unittest.skipIf(USING_LINUXTHREADS,
1944 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001945 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001946 self.assertRaises(OSError, os.execvpe, 'no such app-',
1947 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001948
Steve Dowerbce26262016-11-19 19:17:26 -08001949 def test_execv_with_bad_arglist(self):
1950 self.assertRaises(ValueError, os.execv, 'notepad', ())
1951 self.assertRaises(ValueError, os.execv, 'notepad', [])
1952 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1953 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1954
Thomas Heller6790d602007-08-30 17:15:14 +00001955 def test_execvpe_with_bad_arglist(self):
1956 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001957 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1958 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001959
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001960 @unittest.skipUnless(hasattr(os, '_execvpe'),
1961 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001962 def _test_internal_execvpe(self, test_type):
1963 program_path = os.sep + 'absolutepath'
1964 if test_type is bytes:
1965 program = b'executable'
1966 fullpath = os.path.join(os.fsencode(program_path), program)
1967 native_fullpath = fullpath
1968 arguments = [b'progname', 'arg1', 'arg2']
1969 else:
1970 program = 'executable'
1971 arguments = ['progname', 'arg1', 'arg2']
1972 fullpath = os.path.join(program_path, program)
1973 if os.name != "nt":
1974 native_fullpath = os.fsencode(fullpath)
1975 else:
1976 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001977 env = {'spam': 'beans'}
1978
Victor Stinnerb745a742010-05-18 17:17:23 +00001979 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001980 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001981 self.assertRaises(RuntimeError,
1982 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001983 self.assertEqual(len(calls), 1)
1984 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1985
Victor Stinnerb745a742010-05-18 17:17:23 +00001986 # test os._execvpe() with a relative path:
1987 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001988 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001989 self.assertRaises(OSError,
1990 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001991 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001992 self.assertSequenceEqual(calls[0],
1993 ('execve', native_fullpath, (arguments, env)))
1994
1995 # test os._execvpe() with a relative path:
1996 # os.get_exec_path() reads the 'PATH' variable
1997 with _execvpe_mockup() as calls:
1998 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001999 if test_type is bytes:
2000 env_path[b'PATH'] = program_path
2001 else:
2002 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00002003 self.assertRaises(OSError,
2004 os._execvpe, program, arguments, env=env_path)
2005 self.assertEqual(len(calls), 1)
2006 self.assertSequenceEqual(calls[0],
2007 ('execve', native_fullpath, (arguments, env_path)))
2008
2009 def test_internal_execvpe_str(self):
2010 self._test_internal_execvpe(str)
2011 if os.name != "nt":
2012 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00002013
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002014 def test_execve_invalid_env(self):
2015 args = [sys.executable, '-c', 'pass']
2016
Ville Skyttä49b27342017-08-03 09:00:59 +03002017 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002018 newenv = os.environ.copy()
2019 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2020 with self.assertRaises(ValueError):
2021 os.execve(args[0], args, newenv)
2022
Ville Skyttä49b27342017-08-03 09:00:59 +03002023 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002024 newenv = os.environ.copy()
2025 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2026 with self.assertRaises(ValueError):
2027 os.execve(args[0], args, newenv)
2028
Ville Skyttä49b27342017-08-03 09:00:59 +03002029 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002030 newenv = os.environ.copy()
2031 newenv["FRUIT=ORANGE"] = "lemon"
2032 with self.assertRaises(ValueError):
2033 os.execve(args[0], args, newenv)
2034
Alexey Izbyshev83460312018-10-20 03:28:22 +03002035 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2036 def test_execve_with_empty_path(self):
2037 # bpo-32890: Check GetLastError() misuse
2038 try:
2039 os.execve('', ['arg'], {})
2040 except OSError as e:
2041 self.assertTrue(e.winerror is None or e.winerror != 0)
2042 else:
2043 self.fail('No OSError raised')
2044
Gregory P. Smith4ae37772010-05-08 18:05:46 +00002045
Serhiy Storchaka43767632013-11-03 21:31:38 +02002046@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002047class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01002048 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01002049 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002050 os.stat(os_helper.TESTFN)
Victor Stinner32830142016-03-25 15:12:08 +01002051 except FileNotFoundError:
2052 exists = False
2053 except OSError as exc:
2054 exists = True
2055 self.fail("file %s must not exist; os.stat failed with %s"
Hai Shi0c4f0f32020-06-30 21:46:31 +08002056 % (os_helper.TESTFN, exc))
Victor Stinner32830142016-03-25 15:12:08 +01002057 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002058 self.fail("file %s must not exist" % os_helper.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01002059
Thomas Wouters477c8d52006-05-27 19:21:47 +00002060 def test_rename(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002061 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00002062
2063 def test_remove(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002064 self.assertRaises(OSError, os.remove, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002065
2066 def test_chdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002067 self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002068
2069 def test_mkdir(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002070 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002071
Hai Shi0c4f0f32020-06-30 21:46:31 +08002072 with open(os_helper.TESTFN, "x") as f:
2073 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002074
2075 def test_utime(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002076 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002077
Thomas Wouters477c8d52006-05-27 19:21:47 +00002078 def test_chmod(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002079 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002080
Victor Stinnere77c9742016-03-25 10:28:23 +01002081
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002082class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00002083 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002084 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2085 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07002086 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002087 def get_single(f):
2088 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00002089 if hasattr(os, f):
2090 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002091 return helper
2092 for f in singles:
2093 locals()["test_"+f] = get_single(f)
2094
Benjamin Peterson7522c742009-01-19 21:00:09 +00002095 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002096 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002097 f(os_helper.make_bad_fd(), *args)
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002098 except OSError as e:
2099 self.assertEqual(e.errno, errno.EBADF)
2100 else:
Martin Panter7462b6492015-11-02 03:37:02 +00002101 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00002102 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00002103
Serhiy Storchaka43767632013-11-03 21:31:38 +02002104 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002105 def test_isatty(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002106 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002107
Serhiy Storchaka43767632013-11-03 21:31:38 +02002108 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002109 def test_closerange(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002110 fd = os_helper.make_bad_fd()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002111 # Make sure none of the descriptors we are about to close are
2112 # currently valid (issue 6542).
2113 for i in range(10):
2114 try: os.fstat(fd+i)
2115 except OSError:
2116 pass
2117 else:
2118 break
2119 if i < 2:
2120 raise unittest.SkipTest(
2121 "Unable to acquire a range of invalid file descriptors")
2122 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002123
Serhiy Storchaka43767632013-11-03 21:31:38 +02002124 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002125 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002126 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002127
Serhiy Storchaka43767632013-11-03 21:31:38 +02002128 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002129 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002130 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002131
Serhiy Storchaka43767632013-11-03 21:31:38 +02002132 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002133 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002134 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002135
Serhiy Storchaka43767632013-11-03 21:31:38 +02002136 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002137 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002138 self.check(os.pathconf, "PC_NAME_MAX")
2139 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002140
Serhiy Storchaka43767632013-11-03 21:31:38 +02002141 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002142 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002143 self.check(os.truncate, 0)
2144 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002145
Serhiy Storchaka43767632013-11-03 21:31:38 +02002146 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002147 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002148 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002149
Serhiy Storchaka43767632013-11-03 21:31:38 +02002150 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002151 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002152 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002153
Victor Stinner57ddf782014-01-08 15:21:28 +01002154 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2155 def test_readv(self):
2156 buf = bytearray(10)
2157 self.check(os.readv, [buf])
2158
Serhiy Storchaka43767632013-11-03 21:31:38 +02002159 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002160 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002161 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002162
Serhiy Storchaka43767632013-11-03 21:31:38 +02002163 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002164 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02002165 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002166
Victor Stinner57ddf782014-01-08 15:21:28 +01002167 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2168 def test_writev(self):
2169 self.check(os.writev, [b'abc'])
2170
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002171 def test_inheritable(self):
2172 self.check(os.get_inheritable)
2173 self.check(os.set_inheritable, True)
2174
2175 @unittest.skipUnless(hasattr(os, 'get_blocking'),
2176 'needs os.get_blocking() and os.set_blocking()')
2177 def test_blocking(self):
2178 self.check(os.get_blocking)
2179 self.check(os.set_blocking, True)
2180
Brian Curtin1b9df392010-11-24 20:24:31 +00002181
2182class LinkTests(unittest.TestCase):
2183 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002184 self.file1 = os_helper.TESTFN
2185 self.file2 = os.path.join(os_helper.TESTFN + "2")
Brian Curtin1b9df392010-11-24 20:24:31 +00002186
Brian Curtinc0abc4e2010-11-30 23:46:54 +00002187 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00002188 for file in (self.file1, self.file2):
2189 if os.path.exists(file):
2190 os.unlink(file)
2191
Brian Curtin1b9df392010-11-24 20:24:31 +00002192 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01002193 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00002194
xdegaye6a55d092017-11-12 17:57:04 +01002195 try:
2196 os.link(file1, file2)
2197 except PermissionError as e:
2198 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00002199 with open(file1, "r") as f1, open(file2, "r") as f2:
2200 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2201
2202 def test_link(self):
2203 self._test_link(self.file1, self.file2)
2204
2205 def test_link_bytes(self):
2206 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2207 bytes(self.file2, sys.getfilesystemencoding()))
2208
Brian Curtinf498b752010-11-30 15:54:04 +00002209 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00002210 try:
Brian Curtinf498b752010-11-30 15:54:04 +00002211 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00002212 except UnicodeError:
2213 raise unittest.SkipTest("Unable to encode for this platform.")
2214
Brian Curtinf498b752010-11-30 15:54:04 +00002215 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00002216 self.file2 = self.file1 + "2"
2217 self._test_link(self.file1, self.file2)
2218
Serhiy Storchaka43767632013-11-03 21:31:38 +02002219@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2220class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01002221 # uid_t and gid_t are 32-bit unsigned integers on Linux
2222 UID_OVERFLOW = (1 << 32)
2223 GID_OVERFLOW = (1 << 32)
2224
Serhiy Storchaka43767632013-11-03 21:31:38 +02002225 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2226 def test_setuid(self):
2227 if os.getuid() != 0:
2228 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002229 self.assertRaises(TypeError, os.setuid, 'not an int')
2230 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00002231
Serhiy Storchaka43767632013-11-03 21:31:38 +02002232 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2233 def test_setgid(self):
2234 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2235 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002236 self.assertRaises(TypeError, os.setgid, 'not an int')
2237 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002238
Serhiy Storchaka43767632013-11-03 21:31:38 +02002239 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2240 def test_seteuid(self):
2241 if os.getuid() != 0:
2242 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002243 self.assertRaises(TypeError, os.setegid, 'not an int')
2244 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002245
Serhiy Storchaka43767632013-11-03 21:31:38 +02002246 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2247 def test_setegid(self):
2248 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2249 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002250 self.assertRaises(TypeError, os.setegid, 'not an int')
2251 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002252
Serhiy Storchaka43767632013-11-03 21:31:38 +02002253 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2254 def test_setreuid(self):
2255 if os.getuid() != 0:
2256 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002257 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2258 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2259 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2260 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002261
Serhiy Storchaka43767632013-11-03 21:31:38 +02002262 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2263 def test_setreuid_neg1(self):
2264 # Needs to accept -1. We run this in a subprocess to avoid
2265 # altering the test runner's process state (issue8045).
2266 subprocess.check_call([
2267 sys.executable, '-c',
2268 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002269
Serhiy Storchaka43767632013-11-03 21:31:38 +02002270 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2271 def test_setregid(self):
2272 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2273 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01002274 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2275 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2276 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2277 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002278
Serhiy Storchaka43767632013-11-03 21:31:38 +02002279 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2280 def test_setregid_neg1(self):
2281 # Needs to accept -1. We run this in a subprocess to avoid
2282 # altering the test runner's process state (issue8045).
2283 subprocess.check_call([
2284 sys.executable, '-c',
2285 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00002286
Serhiy Storchaka43767632013-11-03 21:31:38 +02002287@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2288class Pep383Tests(unittest.TestCase):
2289 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002290 if os_helper.TESTFN_UNENCODABLE:
2291 self.dir = os_helper.TESTFN_UNENCODABLE
2292 elif os_helper.TESTFN_NONASCII:
2293 self.dir = os_helper.TESTFN_NONASCII
Serhiy Storchaka43767632013-11-03 21:31:38 +02002294 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002295 self.dir = os_helper.TESTFN
Serhiy Storchaka43767632013-11-03 21:31:38 +02002296 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002297
Serhiy Storchaka43767632013-11-03 21:31:38 +02002298 bytesfn = []
2299 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00002300 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002301 fn = os.fsencode(fn)
2302 except UnicodeEncodeError:
2303 return
2304 bytesfn.append(fn)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002305 add_filename(os_helper.TESTFN_UNICODE)
2306 if os_helper.TESTFN_UNENCODABLE:
2307 add_filename(os_helper.TESTFN_UNENCODABLE)
2308 if os_helper.TESTFN_NONASCII:
2309 add_filename(os_helper.TESTFN_NONASCII)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002310 if not bytesfn:
2311 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00002312
Serhiy Storchaka43767632013-11-03 21:31:38 +02002313 self.unicodefn = set()
2314 os.mkdir(self.dir)
2315 try:
2316 for fn in bytesfn:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002317 os_helper.create_empty_file(os.path.join(self.bdir, fn))
Serhiy Storchaka43767632013-11-03 21:31:38 +02002318 fn = os.fsdecode(fn)
2319 if fn in self.unicodefn:
2320 raise ValueError("duplicate filename")
2321 self.unicodefn.add(fn)
2322 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00002323 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002324 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00002325
Serhiy Storchaka43767632013-11-03 21:31:38 +02002326 def tearDown(self):
2327 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002328
Serhiy Storchaka43767632013-11-03 21:31:38 +02002329 def test_listdir(self):
2330 expected = self.unicodefn
2331 found = set(os.listdir(self.dir))
2332 self.assertEqual(found, expected)
2333 # test listdir without arguments
2334 current_directory = os.getcwd()
2335 try:
2336 os.chdir(os.sep)
2337 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2338 finally:
2339 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002340
Serhiy Storchaka43767632013-11-03 21:31:38 +02002341 def test_open(self):
2342 for fn in self.unicodefn:
2343 f = open(os.path.join(self.dir, fn), 'rb')
2344 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002345
Serhiy Storchaka43767632013-11-03 21:31:38 +02002346 @unittest.skipUnless(hasattr(os, 'statvfs'),
2347 "need os.statvfs()")
2348 def test_statvfs(self):
2349 # issue #9645
2350 for fn in self.unicodefn:
2351 # should not fail with file not found error
2352 fullname = os.path.join(self.dir, fn)
2353 os.statvfs(fullname)
2354
2355 def test_stat(self):
2356 for fn in self.unicodefn:
2357 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002358
Brian Curtineb24d742010-04-12 17:16:38 +00002359@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2360class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002361 def _kill(self, sig):
2362 # Start sys.executable as a subprocess and communicate from the
2363 # subprocess to the parent that the interpreter is ready. When it
2364 # becomes ready, send *sig* via os.kill to the subprocess and check
2365 # that the return code is equal to *sig*.
2366 import ctypes
2367 from ctypes import wintypes
2368 import msvcrt
2369
2370 # Since we can't access the contents of the process' stdout until the
2371 # process has exited, use PeekNamedPipe to see what's inside stdout
2372 # without waiting. This is done so we can tell that the interpreter
2373 # is started and running at a point where it could handle a signal.
2374 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2375 PeekNamedPipe.restype = wintypes.BOOL
2376 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2377 ctypes.POINTER(ctypes.c_char), # stdout buf
2378 wintypes.DWORD, # Buffer size
2379 ctypes.POINTER(wintypes.DWORD), # bytes read
2380 ctypes.POINTER(wintypes.DWORD), # bytes avail
2381 ctypes.POINTER(wintypes.DWORD)) # bytes left
2382 msg = "running"
2383 proc = subprocess.Popen([sys.executable, "-c",
2384 "import sys;"
2385 "sys.stdout.write('{}');"
2386 "sys.stdout.flush();"
2387 "input()".format(msg)],
2388 stdout=subprocess.PIPE,
2389 stderr=subprocess.PIPE,
2390 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002391 self.addCleanup(proc.stdout.close)
2392 self.addCleanup(proc.stderr.close)
2393 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002394
2395 count, max = 0, 100
2396 while count < max and proc.poll() is None:
2397 # Create a string buffer to store the result of stdout from the pipe
2398 buf = ctypes.create_string_buffer(len(msg))
2399 # Obtain the text currently in proc.stdout
2400 # Bytes read/avail/left are left as NULL and unused
2401 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2402 buf, ctypes.sizeof(buf), None, None, None)
2403 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2404 if buf.value:
2405 self.assertEqual(msg, buf.value.decode())
2406 break
2407 time.sleep(0.1)
2408 count += 1
2409 else:
2410 self.fail("Did not receive communication from the subprocess")
2411
Brian Curtineb24d742010-04-12 17:16:38 +00002412 os.kill(proc.pid, sig)
2413 self.assertEqual(proc.wait(), sig)
2414
2415 def test_kill_sigterm(self):
2416 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002417 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002418
2419 def test_kill_int(self):
2420 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002421 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002422
2423 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002424 tagname = "test_os_%s" % uuid.uuid1()
2425 m = mmap.mmap(-1, 1, tagname)
2426 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002427 # Run a script which has console control handling enabled.
2428 proc = subprocess.Popen([sys.executable,
2429 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002430 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002431 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2432 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002433 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002434 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002435 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002436 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002437 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002438 count += 1
2439 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002440 # Forcefully kill the process if we weren't able to signal it.
2441 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002442 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002443 os.kill(proc.pid, event)
2444 # proc.send_signal(event) could also be done here.
2445 # Allow time for the signal to be passed and the process to exit.
2446 time.sleep(0.5)
2447 if not proc.poll():
2448 # Forcefully kill the process if we weren't able to signal it.
2449 os.kill(proc.pid, signal.SIGINT)
2450 self.fail("subprocess did not stop on {}".format(name))
2451
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002452 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002453 def test_CTRL_C_EVENT(self):
2454 from ctypes import wintypes
2455 import ctypes
2456
2457 # Make a NULL value by creating a pointer with no argument.
2458 NULL = ctypes.POINTER(ctypes.c_int)()
2459 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2460 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2461 wintypes.BOOL)
2462 SetConsoleCtrlHandler.restype = wintypes.BOOL
2463
2464 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002465 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002466 # by subprocesses.
2467 SetConsoleCtrlHandler(NULL, 0)
2468
2469 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2470
2471 def test_CTRL_BREAK_EVENT(self):
2472 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2473
2474
Brian Curtind40e6f72010-07-08 21:39:08 +00002475@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002476class Win32ListdirTests(unittest.TestCase):
2477 """Test listdir on Windows."""
2478
2479 def setUp(self):
2480 self.created_paths = []
2481 for i in range(2):
2482 dir_name = 'SUB%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002483 dir_path = os.path.join(os_helper.TESTFN, dir_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002484 file_name = 'FILE%d' % i
Hai Shi0c4f0f32020-06-30 21:46:31 +08002485 file_path = os.path.join(os_helper.TESTFN, file_name)
Tim Golden781bbeb2013-10-25 20:24:06 +01002486 os.makedirs(dir_path)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +03002487 with open(file_path, 'w', encoding='utf-8') as f:
Tim Golden781bbeb2013-10-25 20:24:06 +01002488 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2489 self.created_paths.extend([dir_name, file_name])
2490 self.created_paths.sort()
2491
2492 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002493 shutil.rmtree(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002494
2495 def test_listdir_no_extended_path(self):
2496 """Test when the path is not an "extended" path."""
2497 # unicode
2498 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002499 sorted(os.listdir(os_helper.TESTFN)),
Tim Golden781bbeb2013-10-25 20:24:06 +01002500 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002501
Tim Golden781bbeb2013-10-25 20:24:06 +01002502 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002503 self.assertEqual(
Hai Shi0c4f0f32020-06-30 21:46:31 +08002504 sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
Steve Dowercc16be82016-09-08 10:35:16 -07002505 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002506
2507 def test_listdir_extended_path(self):
2508 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002509 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002510 # unicode
Hai Shi0c4f0f32020-06-30 21:46:31 +08002511 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
Tim Golden781bbeb2013-10-25 20:24:06 +01002512 self.assertEqual(
2513 sorted(os.listdir(path)),
2514 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002515
Tim Golden781bbeb2013-10-25 20:24:06 +01002516 # bytes
Hai Shi0c4f0f32020-06-30 21:46:31 +08002517 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
Steve Dowercc16be82016-09-08 10:35:16 -07002518 self.assertEqual(
2519 sorted(os.listdir(path)),
2520 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002521
2522
Berker Peksage0b5b202018-08-15 13:03:41 +03002523@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2524class ReadlinkTests(unittest.TestCase):
2525 filelink = 'readlinktest'
2526 filelink_target = os.path.abspath(__file__)
2527 filelinkb = os.fsencode(filelink)
2528 filelinkb_target = os.fsencode(filelink_target)
2529
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002530 def assertPathEqual(self, left, right):
2531 left = os.path.normcase(left)
2532 right = os.path.normcase(right)
2533 if sys.platform == 'win32':
2534 # Bad practice to blindly strip the prefix as it may be required to
2535 # correctly refer to the file, but we're only comparing paths here.
2536 has_prefix = lambda p: p.startswith(
2537 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2538 if has_prefix(left):
2539 left = left[4:]
2540 if has_prefix(right):
2541 right = right[4:]
2542 self.assertEqual(left, right)
2543
Berker Peksage0b5b202018-08-15 13:03:41 +03002544 def setUp(self):
2545 self.assertTrue(os.path.exists(self.filelink_target))
2546 self.assertTrue(os.path.exists(self.filelinkb_target))
2547 self.assertFalse(os.path.exists(self.filelink))
2548 self.assertFalse(os.path.exists(self.filelinkb))
2549
2550 def test_not_symlink(self):
2551 filelink_target = FakePath(self.filelink_target)
2552 self.assertRaises(OSError, os.readlink, self.filelink_target)
2553 self.assertRaises(OSError, os.readlink, filelink_target)
2554
2555 def test_missing_link(self):
2556 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2557 self.assertRaises(FileNotFoundError, os.readlink,
2558 FakePath('missing-link'))
2559
Hai Shi0c4f0f32020-06-30 21:46:31 +08002560 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002561 def test_pathlike(self):
2562 os.symlink(self.filelink_target, self.filelink)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002563 self.addCleanup(os_helper.unlink, self.filelink)
Berker Peksage0b5b202018-08-15 13:03:41 +03002564 filelink = FakePath(self.filelink)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002565 self.assertPathEqual(os.readlink(filelink), self.filelink_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002566
Hai Shi0c4f0f32020-06-30 21:46:31 +08002567 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002568 def test_pathlike_bytes(self):
2569 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002570 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002571 path = os.readlink(FakePath(self.filelinkb))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002572 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002573 self.assertIsInstance(path, bytes)
2574
Hai Shi0c4f0f32020-06-30 21:46:31 +08002575 @os_helper.skip_unless_symlink
Berker Peksage0b5b202018-08-15 13:03:41 +03002576 def test_bytes(self):
2577 os.symlink(self.filelinkb_target, self.filelinkb)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002578 self.addCleanup(os_helper.unlink, self.filelinkb)
Berker Peksage0b5b202018-08-15 13:03:41 +03002579 path = os.readlink(self.filelinkb)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002580 self.assertPathEqual(path, self.filelinkb_target)
Berker Peksage0b5b202018-08-15 13:03:41 +03002581 self.assertIsInstance(path, bytes)
2582
2583
Tim Golden781bbeb2013-10-25 20:24:06 +01002584@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002585@os_helper.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002586class Win32SymlinkTests(unittest.TestCase):
2587 filelink = 'filelinktest'
2588 filelink_target = os.path.abspath(__file__)
2589 dirlink = 'dirlinktest'
2590 dirlink_target = os.path.dirname(filelink_target)
2591 missing_link = 'missing link'
2592
2593 def setUp(self):
2594 assert os.path.exists(self.dirlink_target)
2595 assert os.path.exists(self.filelink_target)
2596 assert not os.path.exists(self.dirlink)
2597 assert not os.path.exists(self.filelink)
2598 assert not os.path.exists(self.missing_link)
2599
2600 def tearDown(self):
2601 if os.path.exists(self.filelink):
2602 os.remove(self.filelink)
2603 if os.path.exists(self.dirlink):
2604 os.rmdir(self.dirlink)
2605 if os.path.lexists(self.missing_link):
2606 os.remove(self.missing_link)
2607
2608 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002609 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002610 self.assertTrue(os.path.exists(self.dirlink))
2611 self.assertTrue(os.path.isdir(self.dirlink))
2612 self.assertTrue(os.path.islink(self.dirlink))
2613 self.check_stat(self.dirlink, self.dirlink_target)
2614
2615 def test_file_link(self):
2616 os.symlink(self.filelink_target, self.filelink)
2617 self.assertTrue(os.path.exists(self.filelink))
2618 self.assertTrue(os.path.isfile(self.filelink))
2619 self.assertTrue(os.path.islink(self.filelink))
2620 self.check_stat(self.filelink, self.filelink_target)
2621
2622 def _create_missing_dir_link(self):
2623 'Create a "directory" link to a non-existent target'
2624 linkname = self.missing_link
2625 if os.path.lexists(linkname):
2626 os.remove(linkname)
2627 target = r'c:\\target does not exist.29r3c740'
2628 assert not os.path.exists(target)
2629 target_is_dir = True
2630 os.symlink(target, linkname, target_is_dir)
2631
2632 def test_remove_directory_link_to_missing_target(self):
2633 self._create_missing_dir_link()
2634 # For compatibility with Unix, os.remove will check the
2635 # directory status and call RemoveDirectory if the symlink
2636 # was created with target_is_dir==True.
2637 os.remove(self.missing_link)
2638
Brian Curtind40e6f72010-07-08 21:39:08 +00002639 def test_isdir_on_directory_link_to_missing_target(self):
2640 self._create_missing_dir_link()
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002641 self.assertFalse(os.path.isdir(self.missing_link))
Brian Curtind40e6f72010-07-08 21:39:08 +00002642
Brian Curtind40e6f72010-07-08 21:39:08 +00002643 def test_rmdir_on_directory_link_to_missing_target(self):
2644 self._create_missing_dir_link()
Brian Curtind40e6f72010-07-08 21:39:08 +00002645 os.rmdir(self.missing_link)
2646
2647 def check_stat(self, link, target):
2648 self.assertEqual(os.stat(link), os.stat(target))
2649 self.assertNotEqual(os.lstat(link), os.stat(link))
2650
Brian Curtind25aef52011-06-13 15:16:04 -05002651 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002652 self.assertEqual(os.stat(bytes_link), os.stat(target))
2653 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002654
2655 def test_12084(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002656 level1 = os.path.abspath(os_helper.TESTFN)
Brian Curtind25aef52011-06-13 15:16:04 -05002657 level2 = os.path.join(level1, "level2")
2658 level3 = os.path.join(level2, "level3")
Hai Shi0c4f0f32020-06-30 21:46:31 +08002659 self.addCleanup(os_helper.rmtree, level1)
Victor Stinnerae39d232016-03-24 17:12:55 +01002660
2661 os.mkdir(level1)
2662 os.mkdir(level2)
2663 os.mkdir(level3)
2664
2665 file1 = os.path.abspath(os.path.join(level1, "file1"))
2666 create_file(file1)
2667
2668 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002669 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002670 os.chdir(level2)
2671 link = os.path.join(level2, "link")
2672 os.symlink(os.path.relpath(file1), "link")
2673 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002674
Victor Stinnerae39d232016-03-24 17:12:55 +01002675 # Check os.stat calls from the same dir as the link
2676 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002677
Victor Stinnerae39d232016-03-24 17:12:55 +01002678 # Check os.stat calls from a dir below the link
2679 os.chdir(level1)
2680 self.assertEqual(os.stat(file1),
2681 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002682
Victor Stinnerae39d232016-03-24 17:12:55 +01002683 # Check os.stat calls from a dir above the link
2684 os.chdir(level3)
2685 self.assertEqual(os.stat(file1),
2686 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002687 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002688 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002689
SSE43c34aad2018-02-13 00:10:35 +07002690 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2691 and os.path.exists(r'C:\ProgramData'),
2692 'Test directories not found')
2693 def test_29248(self):
2694 # os.symlink() calls CreateSymbolicLink, which creates
2695 # the reparse data buffer with the print name stored
2696 # first, so the offset is always 0. CreateSymbolicLink
2697 # stores the "PrintName" DOS path (e.g. "C:\") first,
2698 # with an offset of 0, followed by the "SubstituteName"
2699 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2700 # the other hand, seems to have been created manually
2701 # with an inverted order.
2702 target = os.readlink(r'C:\Users\All Users')
2703 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2704
Steve Dower6921e732018-03-05 14:26:08 -08002705 def test_buffer_overflow(self):
2706 # Older versions would have a buffer overflow when detecting
2707 # whether a link source was a directory. This test ensures we
2708 # no longer crash, but does not otherwise validate the behavior
2709 segment = 'X' * 27
2710 path = os.path.join(*[segment] * 10)
2711 test_cases = [
2712 # overflow with absolute src
2713 ('\\' + path, segment),
2714 # overflow dest with relative src
2715 (segment, path),
2716 # overflow when joining src
2717 (path[:180], path[:180]),
2718 ]
2719 for src, dest in test_cases:
2720 try:
2721 os.symlink(src, dest)
2722 except FileNotFoundError:
2723 pass
2724 else:
2725 try:
2726 os.remove(dest)
2727 except OSError:
2728 pass
2729 # Also test with bytes, since that is a separate code path.
2730 try:
2731 os.symlink(os.fsencode(src), os.fsencode(dest))
2732 except FileNotFoundError:
2733 pass
2734 else:
2735 try:
2736 os.remove(dest)
2737 except OSError:
2738 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002739
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002740 def test_appexeclink(self):
2741 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
Steve Dower374be592019-08-21 17:42:56 -07002742 if not os.path.isdir(root):
2743 self.skipTest("test requires a WindowsApps directory")
2744
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002745 aliases = [os.path.join(root, a)
2746 for a in fnmatch.filter(os.listdir(root), '*.exe')]
2747
2748 for alias in aliases:
2749 if support.verbose:
2750 print()
2751 print("Testing with", alias)
2752 st = os.lstat(alias)
2753 self.assertEqual(st, os.stat(alias))
2754 self.assertFalse(stat.S_ISLNK(st.st_mode))
2755 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2756 # testing the first one we see is sufficient
2757 break
2758 else:
2759 self.skipTest("test requires an app execution alias")
2760
Tim Golden0321cf22014-05-05 19:46:17 +01002761@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2762class Win32JunctionTests(unittest.TestCase):
2763 junction = 'junctiontest'
2764 junction_target = os.path.dirname(os.path.abspath(__file__))
2765
2766 def setUp(self):
2767 assert os.path.exists(self.junction_target)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002768 assert not os.path.lexists(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002769
2770 def tearDown(self):
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002771 if os.path.lexists(self.junction):
2772 os.unlink(self.junction)
Tim Golden0321cf22014-05-05 19:46:17 +01002773
2774 def test_create_junction(self):
2775 _winapi.CreateJunction(self.junction_target, self.junction)
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002776 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002777 self.assertTrue(os.path.exists(self.junction))
2778 self.assertTrue(os.path.isdir(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002779 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2780 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
Tim Golden0321cf22014-05-05 19:46:17 +01002781
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002782 # bpo-37834: Junctions are not recognized as links.
Tim Golden0321cf22014-05-05 19:46:17 +01002783 self.assertFalse(os.path.islink(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002784 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2785 os.path.normcase(os.readlink(self.junction)))
Tim Golden0321cf22014-05-05 19:46:17 +01002786
2787 def test_unlink_removes_junction(self):
2788 _winapi.CreateJunction(self.junction_target, self.junction)
2789 self.assertTrue(os.path.exists(self.junction))
Steve Dowerdf2d4a62019-08-21 15:27:33 -07002790 self.assertTrue(os.path.lexists(self.junction))
Tim Golden0321cf22014-05-05 19:46:17 +01002791
2792 os.unlink(self.junction)
2793 self.assertFalse(os.path.exists(self.junction))
2794
Mark Becwarb82bfac2019-02-02 16:08:23 -05002795@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2796class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002797 def test_getfinalpathname_handles(self):
Hai Shid94af3f2020-08-08 17:32:41 +08002798 nt = import_helper.import_module('nt')
2799 ctypes = import_helper.import_module('ctypes')
Berker Peksag6ef726a2019-04-22 18:46:28 +03002800 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002801
2802 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2803 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2804
2805 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2806 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2807 ctypes.wintypes.LPDWORD)
2808
2809 # This is a pseudo-handle that doesn't need to be closed
2810 hproc = kernel.GetCurrentProcess()
2811
2812 handle_count = ctypes.wintypes.DWORD()
2813 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2814 self.assertEqual(1, ok)
2815
2816 before_count = handle_count.value
2817
2818 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002819 filenames = [
2820 r'\\?\C:',
2821 r'\\?\NUL',
2822 r'\\?\CONIN',
2823 __file__,
2824 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002825
Berker Peksag6ef726a2019-04-22 18:46:28 +03002826 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002827 for name in filenames:
2828 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002829 nt._getfinalpathname(name)
2830 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002831 # Failure is expected
2832 pass
2833 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002834 os.stat(name)
2835 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002836 pass
2837
2838 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2839 self.assertEqual(1, ok)
2840
2841 handle_delta = handle_count.value - before_count
2842
2843 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002844
Hai Shi0c4f0f32020-06-30 21:46:31 +08002845@os_helper.skip_unless_symlink
Jason R. Coombs3a092862013-05-27 23:21:28 -04002846class NonLocalSymlinkTests(unittest.TestCase):
2847
2848 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002849 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002850 Create this structure:
2851
2852 base
2853 \___ some_dir
2854 """
2855 os.makedirs('base/some_dir')
2856
2857 def tearDown(self):
2858 shutil.rmtree('base')
2859
2860 def test_directory_link_nonlocal(self):
2861 """
2862 The symlink target should resolve relative to the link, not relative
2863 to the current directory.
2864
2865 Then, link base/some_link -> base/some_dir and ensure that some_link
2866 is resolved as a directory.
2867
2868 In issue13772, it was discovered that directory detection failed if
2869 the symlink target was not specified relative to the current
2870 directory, which was a defect in the implementation.
2871 """
2872 src = os.path.join('base', 'some_link')
2873 os.symlink('some_dir', src)
2874 assert os.path.isdir(src)
2875
2876
Victor Stinnere8d51452010-08-19 01:05:19 +00002877class FSEncodingTests(unittest.TestCase):
2878 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002879 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2880 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002881
Victor Stinnere8d51452010-08-19 01:05:19 +00002882 def test_identity(self):
2883 # assert fsdecode(fsencode(x)) == x
2884 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2885 try:
2886 bytesfn = os.fsencode(fn)
2887 except UnicodeEncodeError:
2888 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002889 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002890
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002891
Brett Cannonefb00c02012-02-29 18:31:31 -05002892
2893class DeviceEncodingTests(unittest.TestCase):
2894
2895 def test_bad_fd(self):
2896 # Return None when an fd doesn't actually exist.
2897 self.assertIsNone(os.device_encoding(123456))
2898
Paul Monson62dfd7d2019-04-25 11:36:45 -07002899 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002900 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002901 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002902 def test_device_encoding(self):
2903 encoding = os.device_encoding(0)
2904 self.assertIsNotNone(encoding)
2905 self.assertTrue(codecs.lookup(encoding))
2906
2907
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002908class PidTests(unittest.TestCase):
2909 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2910 def test_getppid(self):
2911 p = subprocess.Popen([sys.executable, '-c',
2912 'import os; print(os.getppid())'],
2913 stdout=subprocess.PIPE)
2914 stdout, _ = p.communicate()
2915 # We are the parent of our subprocess
2916 self.assertEqual(int(stdout), os.getpid())
2917
Victor Stinner9bee32b2020-04-22 16:30:35 +02002918 def check_waitpid(self, code, exitcode, callback=None):
2919 if sys.platform == 'win32':
2920 # On Windows, os.spawnv() simply joins arguments with spaces:
2921 # arguments need to be quoted
2922 args = [f'"{sys.executable}"', '-c', f'"{code}"']
2923 else:
2924 args = [sys.executable, '-c', code]
2925 pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002926
Victor Stinner9bee32b2020-04-22 16:30:35 +02002927 if callback is not None:
2928 callback(pid)
Victor Stinner65a796e2020-04-01 18:49:29 +02002929
Victor Stinner9bee32b2020-04-22 16:30:35 +02002930 # don't use support.wait_process() to test directly os.waitpid()
2931 # and os.waitstatus_to_exitcode()
Victor Stinner65a796e2020-04-01 18:49:29 +02002932 pid2, status = os.waitpid(pid, 0)
2933 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2934 self.assertEqual(pid2, pid)
2935
Victor Stinner9bee32b2020-04-22 16:30:35 +02002936 def test_waitpid(self):
2937 self.check_waitpid(code='pass', exitcode=0)
2938
2939 def test_waitstatus_to_exitcode(self):
2940 exitcode = 23
2941 code = f'import sys; sys.exit({exitcode})'
2942 self.check_waitpid(code, exitcode=exitcode)
2943
2944 with self.assertRaises(TypeError):
2945 os.waitstatus_to_exitcode(0.0)
2946
2947 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2948 def test_waitpid_windows(self):
2949 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2950 # with exit code larger than INT_MAX.
2951 STATUS_CONTROL_C_EXIT = 0xC000013A
2952 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2953 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2954
2955 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2956 def test_waitstatus_to_exitcode_windows(self):
2957 max_exitcode = 2 ** 32 - 1
2958 for exitcode in (0, 1, 5, max_exitcode):
2959 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2960 exitcode)
2961
2962 # invalid values
2963 with self.assertRaises(ValueError):
2964 os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2965 with self.assertRaises(OverflowError):
2966 os.waitstatus_to_exitcode(-1)
2967
Victor Stinner65a796e2020-04-01 18:49:29 +02002968 # Skip the test on Windows
2969 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2970 def test_waitstatus_to_exitcode_kill(self):
Victor Stinner9bee32b2020-04-22 16:30:35 +02002971 code = f'import time; time.sleep({support.LONG_TIMEOUT})'
Victor Stinner65a796e2020-04-01 18:49:29 +02002972 signum = signal.SIGKILL
Victor Stinner65a796e2020-04-01 18:49:29 +02002973
Victor Stinner9bee32b2020-04-22 16:30:35 +02002974 def kill_process(pid):
2975 os.kill(pid, signum)
Victor Stinner65a796e2020-04-01 18:49:29 +02002976
Victor Stinner9bee32b2020-04-22 16:30:35 +02002977 self.check_waitpid(code, exitcode=-signum, callback=kill_process)
Victor Stinner65a796e2020-04-01 18:49:29 +02002978
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002979
Victor Stinner4659ccf2016-09-14 10:57:00 +02002980class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002981 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002982 self.exitcode = 17
2983
Hai Shi0c4f0f32020-06-30 21:46:31 +08002984 filename = os_helper.TESTFN
2985 self.addCleanup(os_helper.unlink, filename)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002986
2987 if not with_env:
2988 code = 'import sys; sys.exit(%s)' % self.exitcode
2989 else:
2990 self.env = dict(os.environ)
2991 # create an unique key
2992 self.key = str(uuid.uuid4())
2993 self.env[self.key] = self.key
2994 # read the variable from os.environ to check that it exists
2995 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2996 % (self.key, self.exitcode))
2997
2998 with open(filename, "w") as fp:
2999 fp.write(code)
3000
Berker Peksag81816462016-09-15 20:19:47 +03003001 args = [sys.executable, filename]
3002 if use_bytes:
3003 args = [os.fsencode(a) for a in args]
3004 self.env = {os.fsencode(k): os.fsencode(v)
3005 for k, v in self.env.items()}
3006
3007 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02003008
Berker Peksag4af23d72016-09-15 20:32:44 +03003009 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003010 def test_spawnl(self):
3011 args = self.create_args()
3012 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
3013 self.assertEqual(exitcode, self.exitcode)
3014
Berker Peksag4af23d72016-09-15 20:32:44 +03003015 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003016 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003017 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003018 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
3019 self.assertEqual(exitcode, self.exitcode)
3020
Berker Peksag4af23d72016-09-15 20:32:44 +03003021 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003022 def test_spawnlp(self):
3023 args = self.create_args()
3024 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
3025 self.assertEqual(exitcode, self.exitcode)
3026
Berker Peksag4af23d72016-09-15 20:32:44 +03003027 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003028 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003029 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003030 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
3031 self.assertEqual(exitcode, self.exitcode)
3032
Berker Peksag4af23d72016-09-15 20:32:44 +03003033 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003034 def test_spawnv(self):
3035 args = self.create_args()
3036 exitcode = os.spawnv(os.P_WAIT, args[0], args)
3037 self.assertEqual(exitcode, self.exitcode)
3038
Victor Stinner9bee32b2020-04-22 16:30:35 +02003039 # Test for PyUnicode_FSConverter()
3040 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
3041 self.assertEqual(exitcode, self.exitcode)
3042
Berker Peksag4af23d72016-09-15 20:32:44 +03003043 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003044 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003045 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003046 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3047 self.assertEqual(exitcode, self.exitcode)
3048
Berker Peksag4af23d72016-09-15 20:32:44 +03003049 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003050 def test_spawnvp(self):
3051 args = self.create_args()
3052 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
3053 self.assertEqual(exitcode, self.exitcode)
3054
Berker Peksag4af23d72016-09-15 20:32:44 +03003055 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003056 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03003057 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003058 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
3059 self.assertEqual(exitcode, self.exitcode)
3060
Berker Peksag4af23d72016-09-15 20:32:44 +03003061 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02003062 def test_nowait(self):
3063 args = self.create_args()
3064 pid = os.spawnv(os.P_NOWAIT, args[0], args)
Victor Stinner278c1e12020-03-31 20:08:12 +02003065 support.wait_process(pid, exitcode=self.exitcode)
Victor Stinner4659ccf2016-09-14 10:57:00 +02003066
Berker Peksag4af23d72016-09-15 20:32:44 +03003067 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03003068 def test_spawnve_bytes(self):
3069 # Test bytes handling in parse_arglist and parse_envlist (#28114)
3070 args = self.create_args(with_env=True, use_bytes=True)
3071 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3072 self.assertEqual(exitcode, self.exitcode)
3073
Steve Dower859fd7b2016-11-19 18:53:19 -08003074 @requires_os_func('spawnl')
3075 def test_spawnl_noargs(self):
3076 args = self.create_args()
3077 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08003078 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08003079
3080 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08003081 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003082 args = self.create_args()
3083 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003084 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08003085
3086 @requires_os_func('spawnv')
3087 def test_spawnv_noargs(self):
3088 args = self.create_args()
3089 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
3090 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08003091 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
3092 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08003093
3094 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08003095 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08003096 args = self.create_args()
3097 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
3098 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08003099 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
3100 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02003101
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003102 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03003103 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003104
Ville Skyttä49b27342017-08-03 09:00:59 +03003105 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003106 newenv = os.environ.copy()
3107 newenv["FRUIT\0VEGETABLE"] = "cabbage"
3108 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003109 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003110 except ValueError:
3111 pass
3112 else:
3113 self.assertEqual(exitcode, 127)
3114
Ville Skyttä49b27342017-08-03 09:00:59 +03003115 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03003116 newenv = os.environ.copy()
3117 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3118 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003119 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003120 except ValueError:
3121 pass
3122 else:
3123 self.assertEqual(exitcode, 127)
3124
Ville Skyttä49b27342017-08-03 09:00:59 +03003125 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03003126 newenv = os.environ.copy()
3127 newenv["FRUIT=ORANGE"] = "lemon"
3128 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003129 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003130 except ValueError:
3131 pass
3132 else:
3133 self.assertEqual(exitcode, 127)
3134
Ville Skyttä49b27342017-08-03 09:00:59 +03003135 # equal character in the environment variable value
Hai Shi0c4f0f32020-06-30 21:46:31 +08003136 filename = os_helper.TESTFN
3137 self.addCleanup(os_helper.unlink, filename)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003138 with open(filename, "w") as fp:
3139 fp.write('import sys, os\n'
3140 'if os.getenv("FRUIT") != "orange=lemon":\n'
3141 ' raise AssertionError')
3142 args = [sys.executable, filename]
3143 newenv = os.environ.copy()
3144 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003145 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03003146 self.assertEqual(exitcode, 0)
3147
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03003148 @requires_os_func('spawnve')
3149 def test_spawnve_invalid_env(self):
3150 self._test_invalid_env(os.spawnve)
3151
3152 @requires_os_func('spawnvpe')
3153 def test_spawnvpe_invalid_env(self):
3154 self._test_invalid_env(os.spawnvpe)
3155
Serhiy Storchaka77703942017-06-25 07:33:01 +03003156
Brian Curtin0151b8e2010-09-24 13:43:43 +00003157# The introduction of this TestCase caused at least two different errors on
3158# *nix buildbots. Temporarily skip this to let the buildbots move along.
3159@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00003160@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3161class LoginTests(unittest.TestCase):
3162 def test_getlogin(self):
3163 user_name = os.getlogin()
3164 self.assertNotEqual(len(user_name), 0)
3165
3166
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003167@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3168 "needs os.getpriority and os.setpriority")
3169class ProgramPriorityTests(unittest.TestCase):
3170 """Tests for os.getpriority() and os.setpriority()."""
3171
3172 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003173
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003174 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3175 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3176 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003177 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3178 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01003179 raise unittest.SkipTest("unable to reliably test setpriority "
3180 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00003181 else:
3182 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003183 finally:
3184 try:
3185 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3186 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00003187 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00003188 raise
3189
3190
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003191class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003192
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003193 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003194
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003195 def __init__(self, conn):
3196 asynchat.async_chat.__init__(self, conn)
3197 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003198 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 self.closed = False
3200 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003201
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003202 def handle_read(self):
3203 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003204 if self.accumulate:
3205 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003206
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003207 def get_data(self):
3208 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003209
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003210 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003211 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003212 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003213
3214 def handle_error(self):
3215 raise
3216
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003217 def __init__(self, address):
3218 threading.Thread.__init__(self)
3219 asyncore.dispatcher.__init__(self)
3220 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3221 self.bind(address)
3222 self.listen(5)
3223 self.host, self.port = self.socket.getsockname()[:2]
3224 self.handler_instance = None
3225 self._active = False
3226 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003227
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003228 # --- public API
3229
3230 @property
3231 def running(self):
3232 return self._active
3233
3234 def start(self):
3235 assert not self.running
3236 self.__flag = threading.Event()
3237 threading.Thread.start(self)
3238 self.__flag.wait()
3239
3240 def stop(self):
3241 assert self.running
3242 self._active = False
3243 self.join()
3244
3245 def wait(self):
3246 # wait for handler connection to be closed, then stop the server
3247 while not getattr(self.handler_instance, "closed", False):
3248 time.sleep(0.001)
3249 self.stop()
3250
3251 # --- internals
3252
3253 def run(self):
3254 self._active = True
3255 self.__flag.set()
3256 while self._active and asyncore.socket_map:
3257 self._active_lock.acquire()
3258 asyncore.loop(timeout=0.001, count=1)
3259 self._active_lock.release()
3260 asyncore.close_all()
3261
3262 def handle_accept(self):
3263 conn, addr = self.accept()
3264 self.handler_instance = self.Handler(conn)
3265
3266 def handle_connect(self):
3267 self.close()
3268 handle_read = handle_connect
3269
3270 def writable(self):
3271 return 0
3272
3273 def handle_error(self):
3274 raise
3275
3276
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003277@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3278class TestSendfile(unittest.TestCase):
3279
Victor Stinner8c663fd2017-11-08 14:44:44 -08003280 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003281 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00003282 not sys.platform.startswith("solaris") and \
3283 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02003284 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3285 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003286 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3287 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003288
3289 @classmethod
3290 def setUpClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003291 cls.key = threading_helper.threading_setup()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003292 create_file(os_helper.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003293
3294 @classmethod
3295 def tearDownClass(cls):
Hai Shie80697d2020-05-28 06:10:27 +08003296 threading_helper.threading_cleanup(*cls.key)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003297 os_helper.unlink(os_helper.TESTFN)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003298
3299 def setUp(self):
Serhiy Storchaka16994912020-04-25 10:06:29 +03003300 self.server = SendfileTestServer((socket_helper.HOST, 0))
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003301 self.server.start()
3302 self.client = socket.socket()
3303 self.client.connect((self.server.host, self.server.port))
3304 self.client.settimeout(1)
3305 # synchronize by waiting for "220 ready" response
3306 self.client.recv(1024)
3307 self.sockno = self.client.fileno()
Hai Shi0c4f0f32020-06-30 21:46:31 +08003308 self.file = open(os_helper.TESTFN, 'rb')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003309 self.fileno = self.file.fileno()
3310
3311 def tearDown(self):
3312 self.file.close()
3313 self.client.close()
3314 if self.server.running:
3315 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02003316 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003317
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003318 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003319 """A higher level wrapper representing how an application is
3320 supposed to use sendfile().
3321 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003322 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003323 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003324 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003325 except OSError as err:
3326 if err.errno == errno.ECONNRESET:
3327 # disconnected
3328 raise
3329 elif err.errno in (errno.EAGAIN, errno.EBUSY):
3330 # we have to retry send data
3331 continue
3332 else:
3333 raise
3334
3335 def test_send_whole_file(self):
3336 # normal send
3337 total_sent = 0
3338 offset = 0
3339 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003340 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003341 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3342 if sent == 0:
3343 break
3344 offset += sent
3345 total_sent += sent
3346 self.assertTrue(sent <= nbytes)
3347 self.assertEqual(offset, total_sent)
3348
3349 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003350 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003351 self.client.close()
3352 self.server.wait()
3353 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003354 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003355 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003356
3357 def test_send_at_certain_offset(self):
3358 # start sending a file at a certain offset
3359 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003360 offset = len(self.DATA) // 2
3361 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003362 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003363 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003364 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3365 if sent == 0:
3366 break
3367 offset += sent
3368 total_sent += sent
3369 self.assertTrue(sent <= nbytes)
3370
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003371 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003372 self.client.close()
3373 self.server.wait()
3374 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003375 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003376 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003377 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003378 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003379
3380 def test_offset_overflow(self):
3381 # specify an offset > file size
3382 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00003383 try:
3384 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3385 except OSError as e:
3386 # Solaris can raise EINVAL if offset >= file length, ignore.
3387 if e.errno != errno.EINVAL:
3388 raise
3389 else:
3390 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00003391 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003392 self.client.close()
3393 self.server.wait()
3394 data = self.server.handler_instance.get_data()
3395 self.assertEqual(data, b'')
3396
3397 def test_invalid_offset(self):
3398 with self.assertRaises(OSError) as cm:
3399 os.sendfile(self.sockno, self.fileno, -1, 4096)
3400 self.assertEqual(cm.exception.errno, errno.EINVAL)
3401
Martin Panterbf19d162015-09-09 01:01:13 +00003402 def test_keywords(self):
3403 # Keyword arguments should be supported
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003404 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3405 offset=0, count=4096)
Martin Panterbf19d162015-09-09 01:01:13 +00003406 if self.SUPPORT_HEADERS_TRAILERS:
Serhiy Storchaka140a7d12019-10-13 11:59:31 +03003407 os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3408 offset=0, count=4096,
3409 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00003410
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003411 # --- headers / trailers tests
3412
Serhiy Storchaka43767632013-11-03 21:31:38 +02003413 @requires_headers_trailers
3414 def test_headers(self):
3415 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003416 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003417 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003418 headers=[b"x" * 512, b"y" * 256])
3419 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003420 total_sent += sent
3421 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003422 while total_sent < len(expected_data):
3423 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003424 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3425 offset, nbytes)
3426 if sent == 0:
3427 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003428 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003429 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003430 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003431
Serhiy Storchaka43767632013-11-03 21:31:38 +02003432 self.assertEqual(total_sent, len(expected_data))
3433 self.client.close()
3434 self.server.wait()
3435 data = self.server.handler_instance.get_data()
3436 self.assertEqual(hash(data), hash(expected_data))
3437
3438 @requires_headers_trailers
3439 def test_trailers(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003440 TESTFN2 = os_helper.TESTFN + "2"
Serhiy Storchaka43767632013-11-03 21:31:38 +02003441 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003442
Hai Shi0c4f0f32020-06-30 21:46:31 +08003443 self.addCleanup(os_helper.unlink, TESTFN2)
Victor Stinnerae39d232016-03-24 17:12:55 +01003444 create_file(TESTFN2, file_data)
3445
3446 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003447 os.sendfile(self.sockno, f.fileno(), 0, 5,
3448 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003449 self.client.close()
3450 self.server.wait()
3451 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003452 self.assertEqual(data, b"abcde123456789")
3453
3454 @requires_headers_trailers
3455 @requires_32b
3456 def test_headers_overflow_32bits(self):
3457 self.server.handler_instance.accumulate = False
3458 with self.assertRaises(OSError) as cm:
3459 os.sendfile(self.sockno, self.fileno, 0, 0,
3460 headers=[b"x" * 2**16] * 2**15)
3461 self.assertEqual(cm.exception.errno, errno.EINVAL)
3462
3463 @requires_headers_trailers
3464 @requires_32b
3465 def test_trailers_overflow_32bits(self):
3466 self.server.handler_instance.accumulate = False
3467 with self.assertRaises(OSError) as cm:
3468 os.sendfile(self.sockno, self.fileno, 0, 0,
3469 trailers=[b"x" * 2**16] * 2**15)
3470 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003471
Serhiy Storchaka43767632013-11-03 21:31:38 +02003472 @requires_headers_trailers
3473 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3474 'test needs os.SF_NODISKIO')
3475 def test_flags(self):
3476 try:
3477 os.sendfile(self.sockno, self.fileno, 0, 4096,
3478 flags=os.SF_NODISKIO)
3479 except OSError as err:
3480 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3481 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003482
3483
Larry Hastings9cf065c2012-06-22 16:30:09 -07003484def supports_extended_attributes():
3485 if not hasattr(os, "setxattr"):
3486 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003487
Larry Hastings9cf065c2012-06-22 16:30:09 -07003488 try:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003489 with open(os_helper.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003490 try:
3491 os.setxattr(fp.fileno(), b"user.test", b"")
3492 except OSError:
3493 return False
3494 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003495 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003496
3497 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003498
3499
3500@unittest.skipUnless(supports_extended_attributes(),
3501 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003502# Kernels < 2.6.39 don't respect setxattr flags.
3503@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003504class ExtendedAttributeTests(unittest.TestCase):
3505
Larry Hastings9cf065c2012-06-22 16:30:09 -07003506 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003507 fn = os_helper.TESTFN
3508 self.addCleanup(os_helper.unlink, fn)
Victor Stinnerae39d232016-03-24 17:12:55 +01003509 create_file(fn)
3510
Benjamin Peterson799bd802011-08-31 22:15:17 -04003511 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003512 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003513 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003514
Victor Stinnerf12e5062011-10-16 22:12:03 +02003515 init_xattr = listxattr(fn)
3516 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003517
Larry Hastings9cf065c2012-06-22 16:30:09 -07003518 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003519 xattr = set(init_xattr)
3520 xattr.add("user.test")
3521 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003522 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3523 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3524 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003525
Benjamin Peterson799bd802011-08-31 22:15:17 -04003526 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003527 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003528 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003529
Benjamin Peterson799bd802011-08-31 22:15:17 -04003530 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003531 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003532 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003533
Larry Hastings9cf065c2012-06-22 16:30:09 -07003534 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003535 xattr.add("user.test2")
3536 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003537 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003538
Benjamin Peterson799bd802011-08-31 22:15:17 -04003539 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003540 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003541 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003542
Victor Stinnerf12e5062011-10-16 22:12:03 +02003543 xattr.remove("user.test")
3544 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003545 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3546 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3547 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3548 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003549 many = sorted("user.test{}".format(i) for i in range(100))
3550 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003551 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003552 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003553
Larry Hastings9cf065c2012-06-22 16:30:09 -07003554 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003555 self._check_xattrs_str(str, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003556 os_helper.unlink(os_helper.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003557
3558 self._check_xattrs_str(os.fsencode, *args, **kwargs)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003559 os_helper.unlink(os_helper.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003560
3561 def test_simple(self):
3562 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3563 os.listxattr)
3564
3565 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003566 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3567 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003568
3569 def test_fds(self):
3570 def getxattr(path, *args):
3571 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003572 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003573 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003574 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003575 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003576 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003577 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003578 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003579 def listxattr(path, *args):
3580 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003581 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003582 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3583
3584
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003585@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3586class TermsizeTests(unittest.TestCase):
3587 def test_does_not_crash(self):
3588 """Check if get_terminal_size() returns a meaningful value.
3589
3590 There's no easy portable way to actually check the size of the
3591 terminal, so let's check if it returns something sensible instead.
3592 """
3593 try:
3594 size = os.get_terminal_size()
3595 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003596 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003597 # Under win32 a generic OSError can be thrown if the
3598 # handle cannot be retrieved
3599 self.skipTest("failed to query terminal size")
3600 raise
3601
Antoine Pitroucfade362012-02-08 23:48:59 +01003602 self.assertGreaterEqual(size.columns, 0)
3603 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003604
3605 def test_stty_match(self):
3606 """Check if stty returns the same results
3607
3608 stty actually tests stdin, so get_terminal_size is invoked on
3609 stdin explicitly. If stty succeeded, then get_terminal_size()
3610 should work too.
3611 """
3612 try:
Batuhan Taskayad5a980a2020-05-17 01:38:02 +03003613 size = (
3614 subprocess.check_output(
3615 ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3616 ).split()
3617 )
xdegaye6a55d092017-11-12 17:57:04 +01003618 except (FileNotFoundError, subprocess.CalledProcessError,
3619 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003620 self.skipTest("stty invocation failed")
3621 expected = (int(size[1]), int(size[0])) # reversed order
3622
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003623 try:
3624 actual = os.get_terminal_size(sys.__stdin__.fileno())
3625 except OSError as e:
3626 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3627 # Under win32 a generic OSError can be thrown if the
3628 # handle cannot be retrieved
3629 self.skipTest("failed to query terminal size")
3630 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003631 self.assertEqual(expected, actual)
3632
3633
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003634@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003635@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003636class MemfdCreateTests(unittest.TestCase):
3637 def test_memfd_create(self):
3638 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3639 self.assertNotEqual(fd, -1)
3640 self.addCleanup(os.close, fd)
3641 self.assertFalse(os.get_inheritable(fd))
3642 with open(fd, "wb", closefd=False) as f:
3643 f.write(b'memfd_create')
3644 self.assertEqual(f.tell(), 12)
3645
3646 fd2 = os.memfd_create("Hi")
3647 self.addCleanup(os.close, fd2)
3648 self.assertFalse(os.get_inheritable(fd2))
3649
3650
Christian Heimescd9fed62020-11-13 19:48:52 +01003651@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3652@support.requires_linux_version(2, 6, 30)
3653class EventfdTests(unittest.TestCase):
3654 def test_eventfd_initval(self):
3655 def pack(value):
3656 """Pack as native uint64_t
3657 """
3658 return struct.pack("@Q", value)
3659 size = 8 # read/write 8 bytes
3660 initval = 42
3661 fd = os.eventfd(initval)
3662 self.assertNotEqual(fd, -1)
3663 self.addCleanup(os.close, fd)
3664 self.assertFalse(os.get_inheritable(fd))
3665
3666 # test with raw read/write
3667 res = os.read(fd, size)
3668 self.assertEqual(res, pack(initval))
3669
3670 os.write(fd, pack(23))
3671 res = os.read(fd, size)
3672 self.assertEqual(res, pack(23))
3673
3674 os.write(fd, pack(40))
3675 os.write(fd, pack(2))
3676 res = os.read(fd, size)
3677 self.assertEqual(res, pack(42))
3678
3679 # test with eventfd_read/eventfd_write
3680 os.eventfd_write(fd, 20)
3681 os.eventfd_write(fd, 3)
3682 res = os.eventfd_read(fd)
3683 self.assertEqual(res, 23)
3684
3685 def test_eventfd_semaphore(self):
3686 initval = 2
3687 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3688 fd = os.eventfd(initval, flags)
3689 self.assertNotEqual(fd, -1)
3690 self.addCleanup(os.close, fd)
3691
3692 # semaphore starts has initval 2, two reads return '1'
3693 res = os.eventfd_read(fd)
3694 self.assertEqual(res, 1)
3695 res = os.eventfd_read(fd)
3696 self.assertEqual(res, 1)
3697 # third read would block
3698 with self.assertRaises(BlockingIOError):
3699 os.eventfd_read(fd)
3700 with self.assertRaises(BlockingIOError):
3701 os.read(fd, 8)
3702
3703 # increase semaphore counter, read one
3704 os.eventfd_write(fd, 1)
3705 res = os.eventfd_read(fd)
3706 self.assertEqual(res, 1)
3707 # next read would block, too
3708 with self.assertRaises(BlockingIOError):
3709 os.eventfd_read(fd)
3710
3711 def test_eventfd_select(self):
3712 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3713 fd = os.eventfd(0, flags)
3714 self.assertNotEqual(fd, -1)
3715 self.addCleanup(os.close, fd)
3716
3717 # counter is zero, only writeable
3718 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3719 self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3720
3721 # counter is non-zero, read and writeable
3722 os.eventfd_write(fd, 23)
3723 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3724 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3725 self.assertEqual(os.eventfd_read(fd), 23)
3726
3727 # counter at max, only readable
3728 os.eventfd_write(fd, (2**64) - 2)
3729 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3730 self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3731 os.eventfd_read(fd)
3732
3733
Victor Stinner292c8352012-10-30 02:17:38 +01003734class OSErrorTests(unittest.TestCase):
3735 def setUp(self):
3736 class Str(str):
3737 pass
3738
Victor Stinnerafe17062012-10-31 22:47:43 +01003739 self.bytes_filenames = []
3740 self.unicode_filenames = []
Hai Shi0c4f0f32020-06-30 21:46:31 +08003741 if os_helper.TESTFN_UNENCODABLE is not None:
3742 decoded = os_helper.TESTFN_UNENCODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003743 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003744 decoded = os_helper.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003745 self.unicode_filenames.append(decoded)
3746 self.unicode_filenames.append(Str(decoded))
Hai Shi0c4f0f32020-06-30 21:46:31 +08003747 if os_helper.TESTFN_UNDECODABLE is not None:
3748 encoded = os_helper.TESTFN_UNDECODABLE
Victor Stinner292c8352012-10-30 02:17:38 +01003749 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003750 encoded = os.fsencode(os_helper.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003751 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003752 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003753 self.bytes_filenames.append(memoryview(encoded))
3754
3755 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003756
3757 def test_oserror_filename(self):
3758 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003759 (self.filenames, os.chdir,),
3760 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003761 (self.filenames, os.lstat,),
3762 (self.filenames, os.open, os.O_RDONLY),
3763 (self.filenames, os.rmdir,),
3764 (self.filenames, os.stat,),
3765 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003766 ]
3767 if sys.platform == "win32":
3768 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003769 (self.bytes_filenames, os.rename, b"dst"),
3770 (self.bytes_filenames, os.replace, b"dst"),
3771 (self.unicode_filenames, os.rename, "dst"),
3772 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003773 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003774 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003775 else:
3776 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003777 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003778 (self.filenames, os.rename, "dst"),
3779 (self.filenames, os.replace, "dst"),
3780 ))
3781 if hasattr(os, "chown"):
3782 funcs.append((self.filenames, os.chown, 0, 0))
3783 if hasattr(os, "lchown"):
3784 funcs.append((self.filenames, os.lchown, 0, 0))
3785 if hasattr(os, "truncate"):
3786 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003787 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003788 funcs.append((self.filenames, os.chflags, 0))
3789 if hasattr(os, "lchflags"):
3790 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003791 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003792 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003793 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003794 if sys.platform == "win32":
3795 funcs.append((self.bytes_filenames, os.link, b"dst"))
3796 funcs.append((self.unicode_filenames, os.link, "dst"))
3797 else:
3798 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003799 if hasattr(os, "listxattr"):
3800 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003801 (self.filenames, os.listxattr,),
3802 (self.filenames, os.getxattr, "user.test"),
3803 (self.filenames, os.setxattr, "user.test", b'user'),
3804 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003805 ))
3806 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003807 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003808 if hasattr(os, "readlink"):
Steve Dower75e06492019-08-21 13:43:06 -07003809 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003810
Steve Dowercc16be82016-09-08 10:35:16 -07003811
Victor Stinnerafe17062012-10-31 22:47:43 +01003812 for filenames, func, *func_args in funcs:
3813 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003814 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003815 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003816 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003817 else:
3818 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3819 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003820 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003821 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003822 except UnicodeDecodeError:
3823 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003824 else:
3825 self.fail("No exception thrown by {}".format(func))
3826
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003827class CPUCountTests(unittest.TestCase):
3828 def test_cpu_count(self):
3829 cpus = os.cpu_count()
3830 if cpus is not None:
3831 self.assertIsInstance(cpus, int)
3832 self.assertGreater(cpus, 0)
3833 else:
3834 self.skipTest("Could not determine the number of CPUs")
3835
Victor Stinnerdaf45552013-08-28 00:53:59 +02003836
3837class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003838 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003839 fd = os.open(__file__, os.O_RDONLY)
3840 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003841 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003842
Victor Stinnerdaf45552013-08-28 00:53:59 +02003843 os.set_inheritable(fd, True)
3844 self.assertEqual(os.get_inheritable(fd), True)
3845
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003846 @unittest.skipIf(fcntl is None, "need fcntl")
3847 def test_get_inheritable_cloexec(self):
3848 fd = os.open(__file__, os.O_RDONLY)
3849 self.addCleanup(os.close, fd)
3850 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003851
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003852 # clear FD_CLOEXEC flag
3853 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3854 flags &= ~fcntl.FD_CLOEXEC
3855 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003856
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003857 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003858
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003859 @unittest.skipIf(fcntl is None, "need fcntl")
3860 def test_set_inheritable_cloexec(self):
3861 fd = os.open(__file__, os.O_RDONLY)
3862 self.addCleanup(os.close, fd)
3863 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3864 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003865
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003866 os.set_inheritable(fd, True)
3867 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3868 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003869
Victor Stinnerdaf45552013-08-28 00:53:59 +02003870 def test_open(self):
3871 fd = os.open(__file__, os.O_RDONLY)
3872 self.addCleanup(os.close, fd)
3873 self.assertEqual(os.get_inheritable(fd), False)
3874
3875 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3876 def test_pipe(self):
3877 rfd, wfd = os.pipe()
3878 self.addCleanup(os.close, rfd)
3879 self.addCleanup(os.close, wfd)
3880 self.assertEqual(os.get_inheritable(rfd), False)
3881 self.assertEqual(os.get_inheritable(wfd), False)
3882
3883 def test_dup(self):
3884 fd1 = os.open(__file__, os.O_RDONLY)
3885 self.addCleanup(os.close, fd1)
3886
3887 fd2 = os.dup(fd1)
3888 self.addCleanup(os.close, fd2)
3889 self.assertEqual(os.get_inheritable(fd2), False)
3890
Zackery Spytz5be66602019-08-23 12:38:41 -06003891 def test_dup_standard_stream(self):
3892 fd = os.dup(1)
3893 self.addCleanup(os.close, fd)
3894 self.assertGreater(fd, 0)
3895
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003896 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3897 def test_dup_nul(self):
3898 # os.dup() was creating inheritable fds for character files.
3899 fd1 = os.open('NUL', os.O_RDONLY)
3900 self.addCleanup(os.close, fd1)
3901 fd2 = os.dup(fd1)
3902 self.addCleanup(os.close, fd2)
3903 self.assertFalse(os.get_inheritable(fd2))
3904
Victor Stinnerdaf45552013-08-28 00:53:59 +02003905 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3906 def test_dup2(self):
3907 fd = os.open(__file__, os.O_RDONLY)
3908 self.addCleanup(os.close, fd)
3909
3910 # inheritable by default
3911 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003912 self.addCleanup(os.close, fd2)
3913 self.assertEqual(os.dup2(fd, fd2), fd2)
3914 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003915
3916 # force non-inheritable
3917 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003918 self.addCleanup(os.close, fd3)
3919 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3920 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003921
3922 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3923 def test_openpty(self):
3924 master_fd, slave_fd = os.openpty()
3925 self.addCleanup(os.close, master_fd)
3926 self.addCleanup(os.close, slave_fd)
3927 self.assertEqual(os.get_inheritable(master_fd), False)
3928 self.assertEqual(os.get_inheritable(slave_fd), False)
3929
3930
Brett Cannon3f9183b2016-08-26 14:44:48 -07003931class PathTConverterTests(unittest.TestCase):
3932 # tuples of (function name, allows fd arguments, additional arguments to
3933 # function, cleanup function)
3934 functions = [
3935 ('stat', True, (), None),
3936 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003937 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003938 ('chflags', False, (0,), None),
3939 ('lchflags', False, (0,), None),
3940 ('open', False, (0,), getattr(os, 'close', None)),
3941 ]
3942
3943 def test_path_t_converter(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08003944 str_filename = os_helper.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003945 if os.name == 'nt':
3946 bytes_fspath = bytes_filename = None
3947 else:
Hai Shi0c4f0f32020-06-30 21:46:31 +08003948 bytes_filename = os.fsencode(os_helper.TESTFN)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003949 bytes_fspath = FakePath(bytes_filename)
3950 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Hai Shi0c4f0f32020-06-30 21:46:31 +08003951 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003952 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003953
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003954 int_fspath = FakePath(fd)
3955 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003956
3957 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3958 with self.subTest(name=name):
3959 try:
3960 fn = getattr(os, name)
3961 except AttributeError:
3962 continue
3963
Brett Cannon8f96a302016-08-26 19:30:11 -07003964 for path in (str_filename, bytes_filename, str_fspath,
3965 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003966 if path is None:
3967 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003968 with self.subTest(name=name, path=path):
3969 result = fn(path, *extra_args)
3970 if cleanup_fn is not None:
3971 cleanup_fn(result)
3972
3973 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003974 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003975 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003976
3977 if allow_fd:
3978 result = fn(fd, *extra_args) # should not fail
3979 if cleanup_fn is not None:
3980 cleanup_fn(result)
3981 else:
3982 with self.assertRaisesRegex(
3983 TypeError,
3984 'os.PathLike'):
3985 fn(fd, *extra_args)
3986
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003987 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003988 msg = r'__fspath__\(\) to return str or bytes, not %s'
3989 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003990 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003991 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003992 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003993 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003994 os.stat(FakePath(object()))
3995
Brett Cannon3f9183b2016-08-26 14:44:48 -07003996
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003997@unittest.skipUnless(hasattr(os, 'get_blocking'),
3998 'needs os.get_blocking() and os.set_blocking()')
3999class BlockingTests(unittest.TestCase):
4000 def test_blocking(self):
4001 fd = os.open(__file__, os.O_RDONLY)
4002 self.addCleanup(os.close, fd)
4003 self.assertEqual(os.get_blocking(fd), True)
4004
4005 os.set_blocking(fd, False)
4006 self.assertEqual(os.get_blocking(fd), False)
4007
4008 os.set_blocking(fd, True)
4009 self.assertEqual(os.get_blocking(fd), True)
4010
4011
Yury Selivanov97e2e062014-09-26 12:33:06 -04004012
4013class ExportsTests(unittest.TestCase):
4014 def test_os_all(self):
4015 self.assertIn('open', os.__all__)
4016 self.assertIn('walk', os.__all__)
4017
4018
Eddie Elizondob3966632019-11-05 07:16:14 -08004019class TestDirEntry(unittest.TestCase):
4020 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004021 self.path = os.path.realpath(os_helper.TESTFN)
4022 self.addCleanup(os_helper.rmtree, self.path)
Eddie Elizondob3966632019-11-05 07:16:14 -08004023 os.mkdir(self.path)
4024
4025 def test_uninstantiable(self):
4026 self.assertRaises(TypeError, os.DirEntry)
4027
4028 def test_unpickable(self):
4029 filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4030 entry = [entry for entry in os.scandir(self.path)].pop()
4031 self.assertIsInstance(entry, os.DirEntry)
4032 self.assertEqual(entry.name, "file.txt")
4033 import pickle
4034 self.assertRaises(TypeError, pickle.dumps, entry, filename)
4035
4036
Victor Stinner6036e442015-03-08 01:58:04 +01004037class TestScandir(unittest.TestCase):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004038 check_no_resource_warning = warnings_helper.check_no_resource_warning
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004039
Victor Stinner6036e442015-03-08 01:58:04 +01004040 def setUp(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004041 self.path = os.path.realpath(os_helper.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07004042 self.bytes_path = os.fsencode(self.path)
Hai Shi0c4f0f32020-06-30 21:46:31 +08004043 self.addCleanup(os_helper.rmtree, self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01004044 os.mkdir(self.path)
4045
4046 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07004047 path = self.bytes_path if isinstance(name, bytes) else self.path
4048 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01004049 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01004050 return filename
4051
4052 def get_entries(self, names):
4053 entries = dict((entry.name, entry)
4054 for entry in os.scandir(self.path))
4055 self.assertEqual(sorted(entries.keys()), names)
4056 return entries
4057
4058 def assert_stat_equal(self, stat1, stat2, skip_fields):
4059 if skip_fields:
4060 for attr in dir(stat1):
4061 if not attr.startswith("st_"):
4062 continue
4063 if attr in ("st_dev", "st_ino", "st_nlink"):
4064 continue
4065 self.assertEqual(getattr(stat1, attr),
4066 getattr(stat2, attr),
4067 (stat1, stat2, attr))
4068 else:
4069 self.assertEqual(stat1, stat2)
4070
Eddie Elizondob3966632019-11-05 07:16:14 -08004071 def test_uninstantiable(self):
4072 scandir_iter = os.scandir(self.path)
4073 self.assertRaises(TypeError, type(scandir_iter))
4074 scandir_iter.close()
4075
4076 def test_unpickable(self):
4077 filename = self.create_file("file.txt")
4078 scandir_iter = os.scandir(self.path)
4079 import pickle
4080 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4081 scandir_iter.close()
4082
Victor Stinner6036e442015-03-08 01:58:04 +01004083 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07004084 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01004085 self.assertEqual(entry.name, name)
4086 self.assertEqual(entry.path, os.path.join(self.path, name))
4087 self.assertEqual(entry.inode(),
4088 os.stat(entry.path, follow_symlinks=False).st_ino)
4089
4090 entry_stat = os.stat(entry.path)
4091 self.assertEqual(entry.is_dir(),
4092 stat.S_ISDIR(entry_stat.st_mode))
4093 self.assertEqual(entry.is_file(),
4094 stat.S_ISREG(entry_stat.st_mode))
4095 self.assertEqual(entry.is_symlink(),
4096 os.path.islink(entry.path))
4097
4098 entry_lstat = os.stat(entry.path, follow_symlinks=False)
4099 self.assertEqual(entry.is_dir(follow_symlinks=False),
4100 stat.S_ISDIR(entry_lstat.st_mode))
4101 self.assertEqual(entry.is_file(follow_symlinks=False),
4102 stat.S_ISREG(entry_lstat.st_mode))
4103
4104 self.assert_stat_equal(entry.stat(),
4105 entry_stat,
4106 os.name == 'nt' and not is_symlink)
4107 self.assert_stat_equal(entry.stat(follow_symlinks=False),
4108 entry_lstat,
4109 os.name == 'nt')
4110
4111 def test_attributes(self):
4112 link = hasattr(os, 'link')
Hai Shi0c4f0f32020-06-30 21:46:31 +08004113 symlink = os_helper.can_symlink()
Victor Stinner6036e442015-03-08 01:58:04 +01004114
4115 dirname = os.path.join(self.path, "dir")
4116 os.mkdir(dirname)
4117 filename = self.create_file("file.txt")
4118 if link:
xdegaye6a55d092017-11-12 17:57:04 +01004119 try:
4120 os.link(filename, os.path.join(self.path, "link_file.txt"))
4121 except PermissionError as e:
4122 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01004123 if symlink:
4124 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4125 target_is_directory=True)
4126 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4127
4128 names = ['dir', 'file.txt']
4129 if link:
4130 names.append('link_file.txt')
4131 if symlink:
4132 names.extend(('symlink_dir', 'symlink_file.txt'))
4133 entries = self.get_entries(names)
4134
4135 entry = entries['dir']
4136 self.check_entry(entry, 'dir', True, False, False)
4137
4138 entry = entries['file.txt']
4139 self.check_entry(entry, 'file.txt', False, True, False)
4140
4141 if link:
4142 entry = entries['link_file.txt']
4143 self.check_entry(entry, 'link_file.txt', False, True, False)
4144
4145 if symlink:
4146 entry = entries['symlink_dir']
4147 self.check_entry(entry, 'symlink_dir', True, False, True)
4148
4149 entry = entries['symlink_file.txt']
4150 self.check_entry(entry, 'symlink_file.txt', False, True, True)
4151
4152 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07004153 path = self.bytes_path if isinstance(name, bytes) else self.path
4154 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01004155 self.assertEqual(len(entries), 1)
4156
4157 entry = entries[0]
4158 self.assertEqual(entry.name, name)
4159 return entry
4160
Brett Cannon96881cd2016-06-10 14:37:21 -07004161 def create_file_entry(self, name='file.txt'):
4162 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01004163 return self.get_entry(os.path.basename(filename))
4164
4165 def test_current_directory(self):
4166 filename = self.create_file()
4167 old_dir = os.getcwd()
4168 try:
4169 os.chdir(self.path)
4170
4171 # call scandir() without parameter: it must list the content
4172 # of the current directory
4173 entries = dict((entry.name, entry) for entry in os.scandir())
4174 self.assertEqual(sorted(entries.keys()),
4175 [os.path.basename(filename)])
4176 finally:
4177 os.chdir(old_dir)
4178
4179 def test_repr(self):
4180 entry = self.create_file_entry()
4181 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4182
Brett Cannon96881cd2016-06-10 14:37:21 -07004183 def test_fspath_protocol(self):
4184 entry = self.create_file_entry()
4185 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4186
4187 def test_fspath_protocol_bytes(self):
4188 bytes_filename = os.fsencode('bytesfile.txt')
4189 bytes_entry = self.create_file_entry(name=bytes_filename)
4190 fspath = os.fspath(bytes_entry)
4191 self.assertIsInstance(fspath, bytes)
4192 self.assertEqual(fspath,
4193 os.path.join(os.fsencode(self.path),bytes_filename))
4194
Victor Stinner6036e442015-03-08 01:58:04 +01004195 def test_removed_dir(self):
4196 path = os.path.join(self.path, 'dir')
4197
4198 os.mkdir(path)
4199 entry = self.get_entry('dir')
4200 os.rmdir(path)
4201
4202 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4203 if os.name == 'nt':
4204 self.assertTrue(entry.is_dir())
4205 self.assertFalse(entry.is_file())
4206 self.assertFalse(entry.is_symlink())
4207 if os.name == 'nt':
4208 self.assertRaises(FileNotFoundError, entry.inode)
4209 # don't fail
4210 entry.stat()
4211 entry.stat(follow_symlinks=False)
4212 else:
4213 self.assertGreater(entry.inode(), 0)
4214 self.assertRaises(FileNotFoundError, entry.stat)
4215 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4216
4217 def test_removed_file(self):
4218 entry = self.create_file_entry()
4219 os.unlink(entry.path)
4220
4221 self.assertFalse(entry.is_dir())
4222 # On POSIX, is_dir() result depends if scandir() filled d_type or not
4223 if os.name == 'nt':
4224 self.assertTrue(entry.is_file())
4225 self.assertFalse(entry.is_symlink())
4226 if os.name == 'nt':
4227 self.assertRaises(FileNotFoundError, entry.inode)
4228 # don't fail
4229 entry.stat()
4230 entry.stat(follow_symlinks=False)
4231 else:
4232 self.assertGreater(entry.inode(), 0)
4233 self.assertRaises(FileNotFoundError, entry.stat)
4234 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4235
4236 def test_broken_symlink(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08004237 if not os_helper.can_symlink():
Victor Stinner6036e442015-03-08 01:58:04 +01004238 return self.skipTest('cannot create symbolic link')
4239
4240 filename = self.create_file("file.txt")
4241 os.symlink(filename,
4242 os.path.join(self.path, "symlink.txt"))
4243 entries = self.get_entries(['file.txt', 'symlink.txt'])
4244 entry = entries['symlink.txt']
4245 os.unlink(filename)
4246
4247 self.assertGreater(entry.inode(), 0)
4248 self.assertFalse(entry.is_dir())
4249 self.assertFalse(entry.is_file()) # broken symlink returns False
4250 self.assertFalse(entry.is_dir(follow_symlinks=False))
4251 self.assertFalse(entry.is_file(follow_symlinks=False))
4252 self.assertTrue(entry.is_symlink())
4253 self.assertRaises(FileNotFoundError, entry.stat)
4254 # don't fail
4255 entry.stat(follow_symlinks=False)
4256
4257 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01004258 self.create_file("file.txt")
4259
4260 path_bytes = os.fsencode(self.path)
4261 entries = list(os.scandir(path_bytes))
4262 self.assertEqual(len(entries), 1, entries)
4263 entry = entries[0]
4264
4265 self.assertEqual(entry.name, b'file.txt')
4266 self.assertEqual(entry.path,
4267 os.fsencode(os.path.join(self.path, 'file.txt')))
4268
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03004269 def test_bytes_like(self):
4270 self.create_file("file.txt")
4271
4272 for cls in bytearray, memoryview:
4273 path_bytes = cls(os.fsencode(self.path))
4274 with self.assertWarns(DeprecationWarning):
4275 entries = list(os.scandir(path_bytes))
4276 self.assertEqual(len(entries), 1, entries)
4277 entry = entries[0]
4278
4279 self.assertEqual(entry.name, b'file.txt')
4280 self.assertEqual(entry.path,
4281 os.fsencode(os.path.join(self.path, 'file.txt')))
4282 self.assertIs(type(entry.name), bytes)
4283 self.assertIs(type(entry.path), bytes)
4284
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004285 @unittest.skipUnless(os.listdir in os.supports_fd,
4286 'fd support for listdir required for this test.')
4287 def test_fd(self):
4288 self.assertIn(os.scandir, os.supports_fd)
4289 self.create_file('file.txt')
4290 expected_names = ['file.txt']
Hai Shi0c4f0f32020-06-30 21:46:31 +08004291 if os_helper.can_symlink():
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004292 os.symlink('file.txt', os.path.join(self.path, 'link'))
4293 expected_names.append('link')
4294
4295 fd = os.open(self.path, os.O_RDONLY)
4296 try:
4297 with os.scandir(fd) as it:
4298 entries = list(it)
4299 names = [entry.name for entry in entries]
4300 self.assertEqual(sorted(names), expected_names)
4301 self.assertEqual(names, os.listdir(fd))
4302 for entry in entries:
4303 self.assertEqual(entry.path, entry.name)
4304 self.assertEqual(os.fspath(entry), entry.name)
4305 self.assertEqual(entry.is_symlink(), entry.name == 'link')
4306 if os.stat in os.supports_dir_fd:
4307 st = os.stat(entry.name, dir_fd=fd)
4308 self.assertEqual(entry.stat(), st)
4309 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4310 self.assertEqual(entry.stat(follow_symlinks=False), st)
4311 finally:
4312 os.close(fd)
4313
Victor Stinner6036e442015-03-08 01:58:04 +01004314 def test_empty_path(self):
4315 self.assertRaises(FileNotFoundError, os.scandir, '')
4316
4317 def test_consume_iterator_twice(self):
4318 self.create_file("file.txt")
4319 iterator = os.scandir(self.path)
4320
4321 entries = list(iterator)
4322 self.assertEqual(len(entries), 1, entries)
4323
4324 # check than consuming the iterator twice doesn't raise exception
4325 entries2 = list(iterator)
4326 self.assertEqual(len(entries2), 0, entries2)
4327
4328 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03004329 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01004330 self.assertRaises(TypeError, os.scandir, obj)
4331
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02004332 def test_close(self):
4333 self.create_file("file.txt")
4334 self.create_file("file2.txt")
4335 iterator = os.scandir(self.path)
4336 next(iterator)
4337 iterator.close()
4338 # multiple closes
4339 iterator.close()
4340 with self.check_no_resource_warning():
4341 del iterator
4342
4343 def test_context_manager(self):
4344 self.create_file("file.txt")
4345 self.create_file("file2.txt")
4346 with os.scandir(self.path) as iterator:
4347 next(iterator)
4348 with self.check_no_resource_warning():
4349 del iterator
4350
4351 def test_context_manager_close(self):
4352 self.create_file("file.txt")
4353 self.create_file("file2.txt")
4354 with os.scandir(self.path) as iterator:
4355 next(iterator)
4356 iterator.close()
4357
4358 def test_context_manager_exception(self):
4359 self.create_file("file.txt")
4360 self.create_file("file2.txt")
4361 with self.assertRaises(ZeroDivisionError):
4362 with os.scandir(self.path) as iterator:
4363 next(iterator)
4364 1/0
4365 with self.check_no_resource_warning():
4366 del iterator
4367
4368 def test_resource_warning(self):
4369 self.create_file("file.txt")
4370 self.create_file("file2.txt")
4371 iterator = os.scandir(self.path)
4372 next(iterator)
4373 with self.assertWarns(ResourceWarning):
4374 del iterator
4375 support.gc_collect()
4376 # exhausted iterator
4377 iterator = os.scandir(self.path)
4378 list(iterator)
4379 with self.check_no_resource_warning():
4380 del iterator
4381
Victor Stinner6036e442015-03-08 01:58:04 +01004382
Ethan Furmancdc08792016-06-02 15:06:09 -07004383class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004384
4385 # Abstracted so it can be overridden to test pure Python implementation
4386 # if a C version is provided.
4387 fspath = staticmethod(os.fspath)
4388
Ethan Furmancdc08792016-06-02 15:06:09 -07004389 def test_return_bytes(self):
4390 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004391 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07004392
4393 def test_return_string(self):
4394 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004395 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07004396
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004397 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004398 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004399 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004400
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004401 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07004402 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4403 self.assertEqual("path/like/object", os.fsdecode(pathlike))
4404
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004405 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004406 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4407 self.assertTrue(issubclass(FakePath, os.PathLike))
4408 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07004409
Ethan Furmancdc08792016-06-02 15:06:09 -07004410 def test_garbage_in_exception_out(self):
4411 vapor = type('blah', (), {})
4412 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004413 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07004414
4415 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07004416 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004417
Brett Cannon044283a2016-07-15 10:41:49 -07004418 def test_bad_pathlike(self):
4419 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004420 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07004421 # __fspath__ attribute that is not callable.
4422 c = type('foo', (), {})
4423 c.__fspath__ = 1
4424 self.assertRaises(TypeError, self.fspath, c())
4425 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07004426 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02004427 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004428
Bar Hareleae87e32019-12-22 11:57:27 +02004429 def test_pathlike_subclasshook(self):
4430 # bpo-38878: subclasshook causes subclass checks
4431 # true on abstract implementation.
4432 class A(os.PathLike):
4433 pass
4434 self.assertFalse(issubclass(FakePath, A))
4435 self.assertTrue(issubclass(FakePath, os.PathLike))
4436
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004437 def test_pathlike_class_getitem(self):
Guido van Rossum48b069a2020-04-07 09:50:06 -07004438 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
Batuhan Taşkaya526606b2019-12-08 23:31:15 +03004439
Victor Stinnerc29b5852017-11-02 07:28:27 -07004440
4441class TimesTests(unittest.TestCase):
4442 def test_times(self):
4443 times = os.times()
4444 self.assertIsInstance(times, os.times_result)
4445
4446 for field in ('user', 'system', 'children_user', 'children_system',
4447 'elapsed'):
4448 value = getattr(times, field)
4449 self.assertIsInstance(value, float)
4450
4451 if os.name == 'nt':
4452 self.assertEqual(times.children_user, 0)
4453 self.assertEqual(times.children_system, 0)
4454 self.assertEqual(times.elapsed, 0)
4455
4456
Brett Cannonc78ca1e2016-06-24 12:03:43 -07004457# Only test if the C version is provided, otherwise TestPEP519 already tested
4458# the pure Python implementation.
4459if hasattr(os, "_fspath"):
4460 class TestPEP519PurePython(TestPEP519):
4461
4462 """Explicitly test the pure Python implementation of os.fspath()."""
4463
4464 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07004465
4466
Fred Drake2e2be372001-09-20 21:33:42 +00004467if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05004468 unittest.main()