blob: 18cd78b55f601906aa15fa524abbf9a70e701364 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070031from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020037try:
38 import fcntl
39except ImportError:
40 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010041try:
42 import _winapi
43except ImportError:
44 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020045try:
R David Murrayf2ad1732014-12-25 18:36:56 -050046 import pwd
47 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010048except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050049 all_users = []
50try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020051 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020052except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020053 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020054
Berker Peksagce643912015-05-06 06:33:17 +030055from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020056from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000057
Victor Stinner923590e2016-03-24 09:11:48 +010058
R David Murrayf2ad1732014-12-25 18:36:56 -050059root_in_posix = False
60if hasattr(os, 'geteuid'):
61 root_in_posix = (os.geteuid() == 0)
62
Mark Dickinson7cf03892010-04-16 13:45:35 +000063# Detect whether we're on a Linux system that uses the (now outdated
64# and unmaintained) linuxthreads threading library. There's an issue
65# when combining linuxthreads with a failed execv call: see
66# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020067if hasattr(sys, 'thread_info') and sys.thread_info.version:
68 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
69else:
70 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000071
Stefan Krahebee49a2013-01-17 15:31:00 +010072# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
73HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
74
Victor Stinner923590e2016-03-24 09:11:48 +010075
Berker Peksag4af23d72016-09-15 20:32:44 +030076def requires_os_func(name):
77 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
78
79
Victor Stinnerae39d232016-03-24 17:12:55 +010080def create_file(filename, content=b'content'):
81 with open(filename, "xb", 0) as fp:
82 fp.write(content)
83
84
Victor Stinner689830e2019-06-26 17:31:12 +020085class MiscTests(unittest.TestCase):
86 def test_getcwd(self):
87 cwd = os.getcwd()
88 self.assertIsInstance(cwd, str)
89
90 def test_getcwdb(self):
91 cwd = os.getcwdb()
92 self.assertIsInstance(cwd, bytes)
93 self.assertEqual(os.fsdecode(cwd), os.getcwd())
94
95
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096# Tests creating TESTFN
97class FileTests(unittest.TestCase):
98 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000099 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000100 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000101 tearDown = setUp
102
103 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000104 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000105 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000106 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000107
Christian Heimesfdab48e2008-01-20 09:06:41 +0000108 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000109 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
110 # We must allocate two consecutive file descriptors, otherwise
111 # it will mess up other file descriptors (perhaps even the three
112 # standard ones).
113 second = os.dup(first)
114 try:
115 retries = 0
116 while second != first + 1:
117 os.close(first)
118 retries += 1
119 if retries > 10:
120 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000121 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000122 first, second = second, os.dup(second)
123 finally:
124 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000125 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000126 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000127 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000128
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000129 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000130 def test_rename(self):
131 path = support.TESTFN
132 old = sys.getrefcount(path)
133 self.assertRaises(TypeError, os.rename, path, 0)
134 new = sys.getrefcount(path)
135 self.assertEqual(old, new)
136
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000137 def test_read(self):
138 with open(support.TESTFN, "w+b") as fobj:
139 fobj.write(b"spam")
140 fobj.flush()
141 fd = fobj.fileno()
142 os.lseek(fd, 0, 0)
143 s = os.read(fd, 4)
144 self.assertEqual(type(s), bytes)
145 self.assertEqual(s, b"spam")
146
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200147 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200148 # Skip the test on 32-bit platforms: the number of bytes must fit in a
149 # Py_ssize_t type
150 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
151 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200152 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
153 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200154 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100155 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200156
157 # Issue #21932: Make sure that os.read() does not raise an
158 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200159 with open(support.TESTFN, "rb") as fp:
160 data = os.read(fp.fileno(), size)
161
Victor Stinner8c663fd2017-11-08 14:44:44 -0800162 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200163 # operating system is free to return less bytes than requested.
164 self.assertEqual(data, b'test')
165
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000166 def test_write(self):
167 # os.write() accepts bytes- and buffer-like objects but not strings
168 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
169 self.assertRaises(TypeError, os.write, fd, "beans")
170 os.write(fd, b"bacon\n")
171 os.write(fd, bytearray(b"eggs\n"))
172 os.write(fd, memoryview(b"spam\n"))
173 os.close(fd)
174 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000175 self.assertEqual(fobj.read().splitlines(),
176 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000177
Victor Stinnere0daff12011-03-20 23:36:35 +0100178 def write_windows_console(self, *args):
179 retcode = subprocess.call(args,
180 # use a new console to not flood the test output
181 creationflags=subprocess.CREATE_NEW_CONSOLE,
182 # use a shell to hide the console window (SW_HIDE)
183 shell=True)
184 self.assertEqual(retcode, 0)
185
186 @unittest.skipUnless(sys.platform == 'win32',
187 'test specific to the Windows console')
188 def test_write_windows_console(self):
189 # Issue #11395: the Windows console returns an error (12: not enough
190 # space error) on writing into stdout if stdout mode is binary and the
191 # length is greater than 66,000 bytes (or less, depending on heap
192 # usage).
193 code = "print('x' * 100000)"
194 self.write_windows_console(sys.executable, "-c", code)
195 self.write_windows_console(sys.executable, "-u", "-c", code)
196
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000197 def fdopen_helper(self, *args):
198 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200199 f = os.fdopen(fd, *args)
200 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000201
202 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200203 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
204 os.close(fd)
205
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000206 self.fdopen_helper()
207 self.fdopen_helper('r')
208 self.fdopen_helper('r', 100)
209
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100210 def test_replace(self):
211 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100212 self.addCleanup(support.unlink, support.TESTFN)
213 self.addCleanup(support.unlink, TESTFN2)
214
215 create_file(support.TESTFN, b"1")
216 create_file(TESTFN2, b"2")
217
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100218 os.replace(support.TESTFN, TESTFN2)
219 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
220 with open(TESTFN2, 'r') as f:
221 self.assertEqual(f.read(), "1")
222
Martin Panterbf19d162015-09-09 01:01:13 +0000223 def test_open_keywords(self):
224 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
225 dir_fd=None)
226 os.close(f)
227
228 def test_symlink_keywords(self):
229 symlink = support.get_attribute(os, "symlink")
230 try:
231 symlink(src='target', dst=support.TESTFN,
232 target_is_directory=False, dir_fd=None)
233 except (NotImplementedError, OSError):
234 pass # No OS support or unprivileged user
235
Pablo Galindoaac4d032019-05-31 19:39:47 +0100236 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
237 def test_copy_file_range_invalid_values(self):
238 with self.assertRaises(ValueError):
239 os.copy_file_range(0, 1, -10)
240
241 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
242 def test_copy_file_range(self):
243 TESTFN2 = support.TESTFN + ".3"
244 data = b'0123456789'
245
246 create_file(support.TESTFN, data)
247 self.addCleanup(support.unlink, support.TESTFN)
248
249 in_file = open(support.TESTFN, 'rb')
250 self.addCleanup(in_file.close)
251 in_fd = in_file.fileno()
252
253 out_file = open(TESTFN2, 'w+b')
254 self.addCleanup(support.unlink, TESTFN2)
255 self.addCleanup(out_file.close)
256 out_fd = out_file.fileno()
257
258 try:
259 i = os.copy_file_range(in_fd, out_fd, 5)
260 except OSError as e:
261 # Handle the case in which Python was compiled
262 # in a system with the syscall but without support
263 # in the kernel.
264 if e.errno != errno.ENOSYS:
265 raise
266 self.skipTest(e)
267 else:
268 # The number of copied bytes can be less than
269 # the number of bytes originally requested.
270 self.assertIn(i, range(0, 6));
271
272 with open(TESTFN2, 'rb') as in_file:
273 self.assertEqual(in_file.read(), data[:i])
274
275 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
276 def test_copy_file_range_offset(self):
277 TESTFN4 = support.TESTFN + ".4"
278 data = b'0123456789'
279 bytes_to_copy = 6
280 in_skip = 3
281 out_seek = 5
282
283 create_file(support.TESTFN, data)
284 self.addCleanup(support.unlink, support.TESTFN)
285
286 in_file = open(support.TESTFN, 'rb')
287 self.addCleanup(in_file.close)
288 in_fd = in_file.fileno()
289
290 out_file = open(TESTFN4, 'w+b')
291 self.addCleanup(support.unlink, TESTFN4)
292 self.addCleanup(out_file.close)
293 out_fd = out_file.fileno()
294
295 try:
296 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
297 offset_src=in_skip,
298 offset_dst=out_seek)
299 except OSError as e:
300 # Handle the case in which Python was compiled
301 # in a system with the syscall but without support
302 # in the kernel.
303 if e.errno != errno.ENOSYS:
304 raise
305 self.skipTest(e)
306 else:
307 # The number of copied bytes can be less than
308 # the number of bytes originally requested.
309 self.assertIn(i, range(0, bytes_to_copy+1));
310
311 with open(TESTFN4, 'rb') as in_file:
312 read = in_file.read()
313 # seeked bytes (5) are zero'ed
314 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
315 # 012 are skipped (in_skip)
316 # 345678 are copied in the file (in_skip + bytes_to_copy)
317 self.assertEqual(read[out_seek:],
318 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200319
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000320# Test attributes on return values from os.*stat* family.
321class StatAttributeTests(unittest.TestCase):
322 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200323 self.fname = support.TESTFN
324 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100325 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000326
Antoine Pitrou38425292010-09-21 18:19:07 +0000327 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000328 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000329
330 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000331 self.assertEqual(result[stat.ST_SIZE], 3)
332 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000334 # Make sure all the attributes are there
335 members = dir(result)
336 for name in dir(stat):
337 if name[:3] == 'ST_':
338 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000339 if name.endswith("TIME"):
340 def trunc(x): return int(x)
341 else:
342 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000345 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000346
Larry Hastings6fe20b32012-04-19 15:07:49 -0700347 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700348 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700349 for name in 'st_atime st_mtime st_ctime'.split():
350 floaty = int(getattr(result, name) * 100000)
351 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700352 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700353
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354 try:
355 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200356 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000357 except IndexError:
358 pass
359
360 # Make sure that assignment fails
361 try:
362 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200363 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000364 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 pass
366
367 try:
368 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200369 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000370 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000371 pass
372
373 try:
374 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200375 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000376 except AttributeError:
377 pass
378
379 # Use the stat_result constructor with a too-short tuple.
380 try:
381 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200382 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383 except TypeError:
384 pass
385
Ezio Melotti42da6632011-03-15 05:18:48 +0200386 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387 try:
388 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
389 except TypeError:
390 pass
391
Antoine Pitrou38425292010-09-21 18:19:07 +0000392 def test_stat_attributes(self):
393 self.check_stat_attributes(self.fname)
394
395 def test_stat_attributes_bytes(self):
396 try:
397 fname = self.fname.encode(sys.getfilesystemencoding())
398 except UnicodeEncodeError:
399 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700400 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000401
Christian Heimes25827622013-10-12 01:27:08 +0200402 def test_stat_result_pickle(self):
403 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200404 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405 p = pickle.dumps(result, proto)
406 self.assertIn(b'stat_result', p)
407 if proto < 4:
408 self.assertIn(b'cos\nstat_result\n', p)
409 unpickled = pickle.loads(p)
410 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200411
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000413 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700414 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000415
416 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000417 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000418
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000419 # Make sure all the attributes are there.
420 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
421 'ffree', 'favail', 'flag', 'namemax')
422 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000423 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000424
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100425 self.assertTrue(isinstance(result.f_fsid, int))
426
427 # Test that the size of the tuple doesn't change
428 self.assertEqual(len(result), 10)
429
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000430 # Make sure that assignment really fails
431 try:
432 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200433 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000434 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000435 pass
436
437 try:
438 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200439 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000440 except AttributeError:
441 pass
442
443 # Use the constructor with a too-short tuple.
444 try:
445 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200446 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000447 except TypeError:
448 pass
449
Ezio Melotti42da6632011-03-15 05:18:48 +0200450 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000451 try:
452 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
453 except TypeError:
454 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000455
Christian Heimes25827622013-10-12 01:27:08 +0200456 @unittest.skipUnless(hasattr(os, 'statvfs'),
457 "need os.statvfs()")
458 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700459 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200460
Serhiy Storchakabad12572014-12-15 14:03:42 +0200461 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
462 p = pickle.dumps(result, proto)
463 self.assertIn(b'statvfs_result', p)
464 if proto < 4:
465 self.assertIn(b'cos\nstatvfs_result\n', p)
466 unpickled = pickle.loads(p)
467 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200468
Serhiy Storchaka43767632013-11-03 21:31:38 +0200469 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
470 def test_1686475(self):
471 # Verify that an open file can be stat'ed
472 try:
473 os.stat(r"c:\pagefile.sys")
474 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600475 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200476 except OSError as e:
477 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000478
Serhiy Storchaka43767632013-11-03 21:31:38 +0200479 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
480 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
481 def test_15261(self):
482 # Verify that stat'ing a closed fd does not cause crash
483 r, w = os.pipe()
484 try:
485 os.stat(r) # should not raise error
486 finally:
487 os.close(r)
488 os.close(w)
489 with self.assertRaises(OSError) as ctx:
490 os.stat(r)
491 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100492
Zachary Ware63f277b2014-06-19 09:46:37 -0500493 def check_file_attributes(self, result):
494 self.assertTrue(hasattr(result, 'st_file_attributes'))
495 self.assertTrue(isinstance(result.st_file_attributes, int))
496 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
497
498 @unittest.skipUnless(sys.platform == "win32",
499 "st_file_attributes is Win32 specific")
500 def test_file_attributes(self):
501 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
502 result = os.stat(self.fname)
503 self.check_file_attributes(result)
504 self.assertEqual(
505 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
506 0)
507
508 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200509 dirname = support.TESTFN + "dir"
510 os.mkdir(dirname)
511 self.addCleanup(os.rmdir, dirname)
512
513 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500514 self.check_file_attributes(result)
515 self.assertEqual(
516 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
517 stat.FILE_ATTRIBUTE_DIRECTORY)
518
Berker Peksag0b4dc482016-09-17 15:49:59 +0300519 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
520 def test_access_denied(self):
521 # Default to FindFirstFile WIN32_FIND_DATA when access is
522 # denied. See issue 28075.
523 # os.environ['TEMP'] should be located on a volume that
524 # supports file ACLs.
525 fname = os.path.join(os.environ['TEMP'], self.fname)
526 self.addCleanup(support.unlink, fname)
527 create_file(fname, b'ABC')
528 # Deny the right to [S]YNCHRONIZE on the file to
529 # force CreateFile to fail with ERROR_ACCESS_DENIED.
530 DETACHED_PROCESS = 8
531 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500532 # bpo-30584: Use security identifier *S-1-5-32-545 instead
533 # of localized "Users" to not depend on the locale.
534 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300535 creationflags=DETACHED_PROCESS
536 )
537 result = os.stat(fname)
538 self.assertNotEqual(result.st_size, 0)
539
Victor Stinner47aacc82015-06-12 17:26:23 +0200540
541class UtimeTests(unittest.TestCase):
542 def setUp(self):
543 self.dirname = support.TESTFN
544 self.fname = os.path.join(self.dirname, "f1")
545
546 self.addCleanup(support.rmtree, self.dirname)
547 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100548 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200549
Victor Stinner47aacc82015-06-12 17:26:23 +0200550 def support_subsecond(self, filename):
551 # Heuristic to check if the filesystem supports timestamp with
552 # subsecond resolution: check if float and int timestamps are different
553 st = os.stat(filename)
554 return ((st.st_atime != st[7])
555 or (st.st_mtime != st[8])
556 or (st.st_ctime != st[9]))
557
558 def _test_utime(self, set_time, filename=None):
559 if not filename:
560 filename = self.fname
561
562 support_subsecond = self.support_subsecond(filename)
563 if support_subsecond:
564 # Timestamp with a resolution of 1 microsecond (10^-6).
565 #
566 # The resolution of the C internal function used by os.utime()
567 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
568 # test with a resolution of 1 ns requires more work:
569 # see the issue #15745.
570 atime_ns = 1002003000 # 1.002003 seconds
571 mtime_ns = 4005006000 # 4.005006 seconds
572 else:
573 # use a resolution of 1 second
574 atime_ns = 5 * 10**9
575 mtime_ns = 8 * 10**9
576
577 set_time(filename, (atime_ns, mtime_ns))
578 st = os.stat(filename)
579
580 if support_subsecond:
581 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
582 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
583 else:
584 self.assertEqual(st.st_atime, atime_ns * 1e-9)
585 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
586 self.assertEqual(st.st_atime_ns, atime_ns)
587 self.assertEqual(st.st_mtime_ns, mtime_ns)
588
589 def test_utime(self):
590 def set_time(filename, ns):
591 # test the ns keyword parameter
592 os.utime(filename, ns=ns)
593 self._test_utime(set_time)
594
595 @staticmethod
596 def ns_to_sec(ns):
597 # Convert a number of nanosecond (int) to a number of seconds (float).
598 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
599 # issue, os.utime() rounds towards minus infinity.
600 return (ns * 1e-9) + 0.5e-9
601
602 def test_utime_by_indexed(self):
603 # pass times as floating point seconds as the second indexed parameter
604 def set_time(filename, ns):
605 atime_ns, mtime_ns = ns
606 atime = self.ns_to_sec(atime_ns)
607 mtime = self.ns_to_sec(mtime_ns)
608 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
609 # or utime(time_t)
610 os.utime(filename, (atime, mtime))
611 self._test_utime(set_time)
612
613 def test_utime_by_times(self):
614 def set_time(filename, ns):
615 atime_ns, mtime_ns = ns
616 atime = self.ns_to_sec(atime_ns)
617 mtime = self.ns_to_sec(mtime_ns)
618 # test the times keyword parameter
619 os.utime(filename, times=(atime, mtime))
620 self._test_utime(set_time)
621
622 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
623 "follow_symlinks support for utime required "
624 "for this test.")
625 def test_utime_nofollow_symlinks(self):
626 def set_time(filename, ns):
627 # use follow_symlinks=False to test utimensat(timespec)
628 # or lutimes(timeval)
629 os.utime(filename, ns=ns, follow_symlinks=False)
630 self._test_utime(set_time)
631
632 @unittest.skipUnless(os.utime in os.supports_fd,
633 "fd support for utime required for this test.")
634 def test_utime_fd(self):
635 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100636 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200637 # use a file descriptor to test futimens(timespec)
638 # or futimes(timeval)
639 os.utime(fp.fileno(), ns=ns)
640 self._test_utime(set_time)
641
642 @unittest.skipUnless(os.utime in os.supports_dir_fd,
643 "dir_fd support for utime required for this test.")
644 def test_utime_dir_fd(self):
645 def set_time(filename, ns):
646 dirname, name = os.path.split(filename)
647 dirfd = os.open(dirname, os.O_RDONLY)
648 try:
649 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
650 os.utime(name, dir_fd=dirfd, ns=ns)
651 finally:
652 os.close(dirfd)
653 self._test_utime(set_time)
654
655 def test_utime_directory(self):
656 def set_time(filename, ns):
657 # test calling os.utime() on a directory
658 os.utime(filename, ns=ns)
659 self._test_utime(set_time, filename=self.dirname)
660
661 def _test_utime_current(self, set_time):
662 # Get the system clock
663 current = time.time()
664
665 # Call os.utime() to set the timestamp to the current system clock
666 set_time(self.fname)
667
668 if not self.support_subsecond(self.fname):
669 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700670 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200671 # On Windows, the usual resolution of time.time() is 15.6 ms.
672 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700673 #
674 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
675 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200676 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200677 st = os.stat(self.fname)
678 msg = ("st_time=%r, current=%r, dt=%r"
679 % (st.st_mtime, current, st.st_mtime - current))
680 self.assertAlmostEqual(st.st_mtime, current,
681 delta=delta, msg=msg)
682
683 def test_utime_current(self):
684 def set_time(filename):
685 # Set to the current time in the new way
686 os.utime(self.fname)
687 self._test_utime_current(set_time)
688
689 def test_utime_current_old(self):
690 def set_time(filename):
691 # Set to the current time in the old explicit way.
692 os.utime(self.fname, None)
693 self._test_utime_current(set_time)
694
695 def get_file_system(self, path):
696 if sys.platform == 'win32':
697 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
698 import ctypes
699 kernel32 = ctypes.windll.kernel32
700 buf = ctypes.create_unicode_buffer("", 100)
701 ok = kernel32.GetVolumeInformationW(root, None, 0,
702 None, None, None,
703 buf, len(buf))
704 if ok:
705 return buf.value
706 # return None if the filesystem is unknown
707
708 def test_large_time(self):
709 # Many filesystems are limited to the year 2038. At least, the test
710 # pass with NTFS filesystem.
711 if self.get_file_system(self.dirname) != "NTFS":
712 self.skipTest("requires NTFS")
713
714 large = 5000000000 # some day in 2128
715 os.utime(self.fname, (large, large))
716 self.assertEqual(os.stat(self.fname).st_mtime, large)
717
718 def test_utime_invalid_arguments(self):
719 # seconds and nanoseconds parameters are mutually exclusive
720 with self.assertRaises(ValueError):
721 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200722 with self.assertRaises(TypeError):
723 os.utime(self.fname, [5, 5])
724 with self.assertRaises(TypeError):
725 os.utime(self.fname, (5,))
726 with self.assertRaises(TypeError):
727 os.utime(self.fname, (5, 5, 5))
728 with self.assertRaises(TypeError):
729 os.utime(self.fname, ns=[5, 5])
730 with self.assertRaises(TypeError):
731 os.utime(self.fname, ns=(5,))
732 with self.assertRaises(TypeError):
733 os.utime(self.fname, ns=(5, 5, 5))
734
735 if os.utime not in os.supports_follow_symlinks:
736 with self.assertRaises(NotImplementedError):
737 os.utime(self.fname, (5, 5), follow_symlinks=False)
738 if os.utime not in os.supports_fd:
739 with open(self.fname, 'wb', 0) as fp:
740 with self.assertRaises(TypeError):
741 os.utime(fp.fileno(), (5, 5))
742 if os.utime not in os.supports_dir_fd:
743 with self.assertRaises(NotImplementedError):
744 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200745
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300746 @support.cpython_only
747 def test_issue31577(self):
748 # The interpreter shouldn't crash in case utime() received a bad
749 # ns argument.
750 def get_bad_int(divmod_ret_val):
751 class BadInt:
752 def __divmod__(*args):
753 return divmod_ret_val
754 return BadInt()
755 with self.assertRaises(TypeError):
756 os.utime(self.fname, ns=(get_bad_int(42), 1))
757 with self.assertRaises(TypeError):
758 os.utime(self.fname, ns=(get_bad_int(()), 1))
759 with self.assertRaises(TypeError):
760 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
761
Victor Stinner47aacc82015-06-12 17:26:23 +0200762
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000763from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000764
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000765class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000766 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000767 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000768
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000769 def setUp(self):
770 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000771 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000772 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000773 for key, value in self._reference().items():
774 os.environ[key] = value
775
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000776 def tearDown(self):
777 os.environ.clear()
778 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000779 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000780 os.environb.clear()
781 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000782
Christian Heimes90333392007-11-01 19:08:42 +0000783 def _reference(self):
784 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
785
786 def _empty_mapping(self):
787 os.environ.clear()
788 return os.environ
789
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000790 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200791 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
792 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000793 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000794 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300795 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200796 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300797 value = popen.read().strip()
798 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000799
Xavier de Gayed1415312016-07-22 12:15:29 +0200800 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
801 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000802 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200803 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
804 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300805 it = iter(popen)
806 self.assertEqual(next(it), "line1\n")
807 self.assertEqual(next(it), "line2\n")
808 self.assertEqual(next(it), "line3\n")
809 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000810
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000811 # Verify environ keys and values from the OS are of the
812 # correct str type.
813 def test_keyvalue_types(self):
814 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000815 self.assertEqual(type(key), str)
816 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000817
Christian Heimes90333392007-11-01 19:08:42 +0000818 def test_items(self):
819 for key, value in self._reference().items():
820 self.assertEqual(os.environ.get(key), value)
821
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000822 # Issue 7310
823 def test___repr__(self):
824 """Check that the repr() of os.environ looks like environ({...})."""
825 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000826 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
827 '{!r}: {!r}'.format(key, value)
828 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000829
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000830 def test_get_exec_path(self):
831 defpath_list = os.defpath.split(os.pathsep)
832 test_path = ['/monty', '/python', '', '/flying/circus']
833 test_env = {'PATH': os.pathsep.join(test_path)}
834
835 saved_environ = os.environ
836 try:
837 os.environ = dict(test_env)
838 # Test that defaulting to os.environ works.
839 self.assertSequenceEqual(test_path, os.get_exec_path())
840 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
841 finally:
842 os.environ = saved_environ
843
844 # No PATH environment variable
845 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
846 # Empty PATH environment variable
847 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
848 # Supplied PATH environment variable
849 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
850
Victor Stinnerb745a742010-05-18 17:17:23 +0000851 if os.supports_bytes_environ:
852 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000853 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000854 # ignore BytesWarning warning
855 with warnings.catch_warnings(record=True):
856 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000857 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000858 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000859 pass
860 else:
861 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000862
863 # bytes key and/or value
864 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
865 ['abc'])
866 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
867 ['abc'])
868 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
869 ['abc'])
870
871 @unittest.skipUnless(os.supports_bytes_environ,
872 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000873 def test_environb(self):
874 # os.environ -> os.environb
875 value = 'euro\u20ac'
876 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000877 value_bytes = value.encode(sys.getfilesystemencoding(),
878 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000879 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000880 msg = "U+20AC character is not encodable to %s" % (
881 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000882 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000883 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000884 self.assertEqual(os.environ['unicode'], value)
885 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000886
887 # os.environb -> os.environ
888 value = b'\xff'
889 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000890 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000891 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000892 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000893
Victor Stinner13ff2452018-01-22 18:32:50 +0100894 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100895 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100896 def test_unset_error(self):
897 if sys.platform == "win32":
898 # an environment variable is limited to 32,767 characters
899 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100900 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100901 else:
902 # "=" is not allowed in a variable name
903 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100904 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100905
Victor Stinner6d101392013-04-14 16:35:04 +0200906 def test_key_type(self):
907 missing = 'missingkey'
908 self.assertNotIn(missing, os.environ)
909
Victor Stinner839e5ea2013-04-14 16:43:03 +0200910 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200911 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200912 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200913 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200914
Victor Stinner839e5ea2013-04-14 16:43:03 +0200915 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200916 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200917 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200918 self.assertTrue(cm.exception.__suppress_context__)
919
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300920 def _test_environ_iteration(self, collection):
921 iterator = iter(collection)
922 new_key = "__new_key__"
923
924 next(iterator) # start iteration over os.environ.items
925
926 # add a new key in os.environ mapping
927 os.environ[new_key] = "test_environ_iteration"
928
929 try:
930 next(iterator) # force iteration over modified mapping
931 self.assertEqual(os.environ[new_key], "test_environ_iteration")
932 finally:
933 del os.environ[new_key]
934
935 def test_iter_error_when_changing_os_environ(self):
936 self._test_environ_iteration(os.environ)
937
938 def test_iter_error_when_changing_os_environ_items(self):
939 self._test_environ_iteration(os.environ.items())
940
941 def test_iter_error_when_changing_os_environ_values(self):
942 self._test_environ_iteration(os.environ.values())
943
Victor Stinner6d101392013-04-14 16:35:04 +0200944
Tim Petersc4e09402003-04-25 07:11:48 +0000945class WalkTests(unittest.TestCase):
946 """Tests for os.walk()."""
947
Victor Stinner0561c532015-03-12 10:28:24 +0100948 # Wrapper to hide minor differences between os.walk and os.fwalk
949 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200950 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200951 if 'follow_symlinks' in kwargs:
952 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200953 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100954
Charles-François Natali7372b062012-02-05 15:15:38 +0100955 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100956 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100957 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000958
959 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000960 # TESTFN/
961 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000962 # tmp1
963 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000964 # tmp2
965 # SUB11/ no kids
966 # SUB2/ a file kid and a dirsymlink kid
967 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300968 # SUB21/ not readable
969 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000970 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200971 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300972 # broken_link2
973 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000974 # TEST2/
975 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100976 self.walk_path = join(support.TESTFN, "TEST1")
977 self.sub1_path = join(self.walk_path, "SUB1")
978 self.sub11_path = join(self.sub1_path, "SUB11")
979 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300980 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100981 tmp1_path = join(self.walk_path, "tmp1")
982 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000983 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300984 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100985 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000986 t2_path = join(support.TESTFN, "TEST2")
987 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200988 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300989 broken_link2_path = join(sub2_path, "broken_link2")
990 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000991
992 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100993 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000994 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300995 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000996 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100997
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300998 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100999 with open(path, "x") as f:
1000 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +00001001
Victor Stinner0561c532015-03-12 10:28:24 +01001002 if support.can_symlink():
1003 os.symlink(os.path.abspath(t2_path), self.link_path)
1004 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001005 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1006 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001007 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001008 ["broken_link", "broken_link2", "broken_link3",
1009 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001010 else:
pxinwr3e028b22019-02-15 13:04:47 +08001011 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001012
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001013 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001014 try:
1015 os.listdir(sub21_path)
1016 except PermissionError:
1017 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1018 else:
1019 os.chmod(sub21_path, stat.S_IRWXU)
1020 os.unlink(tmp5_path)
1021 os.rmdir(sub21_path)
1022 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001023
Victor Stinner0561c532015-03-12 10:28:24 +01001024 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001025 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001026 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001027
Tim Petersc4e09402003-04-25 07:11:48 +00001028 self.assertEqual(len(all), 4)
1029 # We can't know which order SUB1 and SUB2 will appear in.
1030 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1031 # flipped: TESTFN, SUB2, SUB1, SUB11
1032 flipped = all[0][1][0] != "SUB1"
1033 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001034 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001035 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001036 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1037 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1038 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1039 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001040
Brett Cannon3f9183b2016-08-26 14:44:48 -07001041 def test_walk_prune(self, walk_path=None):
1042 if walk_path is None:
1043 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001044 # Prune the search.
1045 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001046 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001047 all.append((root, dirs, files))
1048 # Don't descend into SUB1.
1049 if 'SUB1' in dirs:
1050 # Note that this also mutates the dirs we appended to all!
1051 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001052
Victor Stinner0561c532015-03-12 10:28:24 +01001053 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001054 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001055
1056 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001057 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001058 self.assertEqual(all[1], self.sub2_tree)
1059
Brett Cannon3f9183b2016-08-26 14:44:48 -07001060 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001061 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001062
Victor Stinner0561c532015-03-12 10:28:24 +01001063 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001064 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001065 all = list(self.walk(self.walk_path, topdown=False))
1066
Victor Stinner53b0a412016-03-26 01:12:36 +01001067 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001068 # We can't know which order SUB1 and SUB2 will appear in.
1069 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1070 # flipped: SUB2, SUB11, SUB1, TESTFN
1071 flipped = all[3][1][0] != "SUB1"
1072 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001073 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001074 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001075 self.assertEqual(all[3],
1076 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1077 self.assertEqual(all[flipped],
1078 (self.sub11_path, [], []))
1079 self.assertEqual(all[flipped + 1],
1080 (self.sub1_path, ["SUB11"], ["tmp2"]))
1081 self.assertEqual(all[2 - 2 * flipped],
1082 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001083
Victor Stinner0561c532015-03-12 10:28:24 +01001084 def test_walk_symlink(self):
1085 if not support.can_symlink():
1086 self.skipTest("need symlink support")
1087
1088 # Walk, following symlinks.
1089 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1090 for root, dirs, files in walk_it:
1091 if root == self.link_path:
1092 self.assertEqual(dirs, [])
1093 self.assertEqual(files, ["tmp4"])
1094 break
1095 else:
1096 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001097
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001098 def test_walk_bad_dir(self):
1099 # Walk top-down.
1100 errors = []
1101 walk_it = self.walk(self.walk_path, onerror=errors.append)
1102 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001103 self.assertEqual(errors, [])
1104 dir1 = 'SUB1'
1105 path1 = os.path.join(root, dir1)
1106 path1new = os.path.join(root, dir1 + '.new')
1107 os.rename(path1, path1new)
1108 try:
1109 roots = [r for r, d, f in walk_it]
1110 self.assertTrue(errors)
1111 self.assertNotIn(path1, roots)
1112 self.assertNotIn(path1new, roots)
1113 for dir2 in dirs:
1114 if dir2 != dir1:
1115 self.assertIn(os.path.join(root, dir2), roots)
1116 finally:
1117 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001118
Charles-François Natali7372b062012-02-05 15:15:38 +01001119
1120@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1121class FwalkTests(WalkTests):
1122 """Tests for os.fwalk()."""
1123
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001124 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001125 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001126 yield (root, dirs, files)
1127
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001128 def fwalk(self, *args, **kwargs):
1129 return os.fwalk(*args, **kwargs)
1130
Larry Hastingsc48fe982012-06-25 04:49:05 -07001131 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1132 """
1133 compare with walk() results.
1134 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001135 walk_kwargs = walk_kwargs.copy()
1136 fwalk_kwargs = fwalk_kwargs.copy()
1137 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1138 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1139 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001140
Charles-François Natali7372b062012-02-05 15:15:38 +01001141 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001142 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001143 expected[root] = (set(dirs), set(files))
1144
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001145 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001146 self.assertIn(root, expected)
1147 self.assertEqual(expected[root], (set(dirs), set(files)))
1148
Larry Hastingsc48fe982012-06-25 04:49:05 -07001149 def test_compare_to_walk(self):
1150 kwargs = {'top': support.TESTFN}
1151 self._compare_to_walk(kwargs, kwargs)
1152
Charles-François Natali7372b062012-02-05 15:15:38 +01001153 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001154 try:
1155 fd = os.open(".", os.O_RDONLY)
1156 walk_kwargs = {'top': support.TESTFN}
1157 fwalk_kwargs = walk_kwargs.copy()
1158 fwalk_kwargs['dir_fd'] = fd
1159 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1160 finally:
1161 os.close(fd)
1162
1163 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001164 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001165 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1166 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001167 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001168 # check that the FD is valid
1169 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001170 # redundant check
1171 os.stat(rootfd)
1172 # check that listdir() returns consistent information
1173 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001174
1175 def test_fd_leak(self):
1176 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1177 # we both check that calling fwalk() a large number of times doesn't
1178 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1179 minfd = os.dup(1)
1180 os.close(minfd)
1181 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001182 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001183 pass
1184 newfd = os.dup(1)
1185 self.addCleanup(os.close, newfd)
1186 self.assertEqual(newfd, minfd)
1187
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001188class BytesWalkTests(WalkTests):
1189 """Tests for os.walk() with bytes."""
1190 def walk(self, top, **kwargs):
1191 if 'follow_symlinks' in kwargs:
1192 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1193 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1194 root = os.fsdecode(broot)
1195 dirs = list(map(os.fsdecode, bdirs))
1196 files = list(map(os.fsdecode, bfiles))
1197 yield (root, dirs, files)
1198 bdirs[:] = list(map(os.fsencode, dirs))
1199 bfiles[:] = list(map(os.fsencode, files))
1200
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001201@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1202class BytesFwalkTests(FwalkTests):
1203 """Tests for os.walk() with bytes."""
1204 def fwalk(self, top='.', *args, **kwargs):
1205 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1206 root = os.fsdecode(broot)
1207 dirs = list(map(os.fsdecode, bdirs))
1208 files = list(map(os.fsdecode, bfiles))
1209 yield (root, dirs, files, topfd)
1210 bdirs[:] = list(map(os.fsencode, dirs))
1211 bfiles[:] = list(map(os.fsencode, files))
1212
Charles-François Natali7372b062012-02-05 15:15:38 +01001213
Guido van Rossume7ba4952007-06-06 23:52:48 +00001214class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001215 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001216 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001217
1218 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001219 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001220 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1221 os.makedirs(path) # Should work
1222 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1223 os.makedirs(path)
1224
1225 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001226 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001227 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1228 os.makedirs(path)
1229 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1230 'dir5', 'dir6')
1231 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001232
Serhiy Storchakae304e332017-03-24 13:27:42 +02001233 def test_mode(self):
1234 with support.temp_umask(0o002):
1235 base = support.TESTFN
1236 parent = os.path.join(base, 'dir1')
1237 path = os.path.join(parent, 'dir2')
1238 os.makedirs(path, 0o555)
1239 self.assertTrue(os.path.exists(path))
1240 self.assertTrue(os.path.isdir(path))
1241 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001242 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1243 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001244
Terry Reedy5a22b652010-12-02 07:05:56 +00001245 def test_exist_ok_existing_directory(self):
1246 path = os.path.join(support.TESTFN, 'dir1')
1247 mode = 0o777
1248 old_mask = os.umask(0o022)
1249 os.makedirs(path, mode)
1250 self.assertRaises(OSError, os.makedirs, path, mode)
1251 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001252 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001253 os.makedirs(path, mode=mode, exist_ok=True)
1254 os.umask(old_mask)
1255
Martin Pantera82642f2015-11-19 04:48:44 +00001256 # Issue #25583: A drive root could raise PermissionError on Windows
1257 os.makedirs(os.path.abspath('/'), exist_ok=True)
1258
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001259 def test_exist_ok_s_isgid_directory(self):
1260 path = os.path.join(support.TESTFN, 'dir1')
1261 S_ISGID = stat.S_ISGID
1262 mode = 0o777
1263 old_mask = os.umask(0o022)
1264 try:
1265 existing_testfn_mode = stat.S_IMODE(
1266 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001267 try:
1268 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001269 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001270 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001271 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1272 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1273 # The os should apply S_ISGID from the parent dir for us, but
1274 # this test need not depend on that behavior. Be explicit.
1275 os.makedirs(path, mode | S_ISGID)
1276 # http://bugs.python.org/issue14992
1277 # Should not fail when the bit is already set.
1278 os.makedirs(path, mode, exist_ok=True)
1279 # remove the bit.
1280 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001281 # May work even when the bit is not already set when demanded.
1282 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001283 finally:
1284 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001285
1286 def test_exist_ok_existing_regular_file(self):
1287 base = support.TESTFN
1288 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001289 with open(path, 'w') as f:
1290 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001291 self.assertRaises(OSError, os.makedirs, path)
1292 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1293 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1294 os.remove(path)
1295
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001296 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001297 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001298 'dir4', 'dir5', 'dir6')
1299 # If the tests failed, the bottom-most directory ('../dir6')
1300 # may not have been created, so we look for the outermost directory
1301 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001302 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001303 path = os.path.dirname(path)
1304
1305 os.removedirs(path)
1306
Andrew Svetlov405faed2012-12-25 12:18:09 +02001307
R David Murrayf2ad1732014-12-25 18:36:56 -05001308@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1309class ChownFileTests(unittest.TestCase):
1310
Berker Peksag036a71b2015-07-21 09:29:48 +03001311 @classmethod
1312 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001313 os.mkdir(support.TESTFN)
1314
1315 def test_chown_uid_gid_arguments_must_be_index(self):
1316 stat = os.stat(support.TESTFN)
1317 uid = stat.st_uid
1318 gid = stat.st_gid
1319 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1320 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1321 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1322 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1323 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1324
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001325 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1326 def test_chown_gid(self):
1327 groups = os.getgroups()
1328 if len(groups) < 2:
1329 self.skipTest("test needs at least 2 groups")
1330
R David Murrayf2ad1732014-12-25 18:36:56 -05001331 gid_1, gid_2 = groups[:2]
1332 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001333
R David Murrayf2ad1732014-12-25 18:36:56 -05001334 os.chown(support.TESTFN, uid, gid_1)
1335 gid = os.stat(support.TESTFN).st_gid
1336 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001337
R David Murrayf2ad1732014-12-25 18:36:56 -05001338 os.chown(support.TESTFN, uid, gid_2)
1339 gid = os.stat(support.TESTFN).st_gid
1340 self.assertEqual(gid, gid_2)
1341
1342 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1343 "test needs root privilege and more than one user")
1344 def test_chown_with_root(self):
1345 uid_1, uid_2 = all_users[:2]
1346 gid = os.stat(support.TESTFN).st_gid
1347 os.chown(support.TESTFN, uid_1, gid)
1348 uid = os.stat(support.TESTFN).st_uid
1349 self.assertEqual(uid, uid_1)
1350 os.chown(support.TESTFN, uid_2, gid)
1351 uid = os.stat(support.TESTFN).st_uid
1352 self.assertEqual(uid, uid_2)
1353
1354 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1355 "test needs non-root account and more than one user")
1356 def test_chown_without_permission(self):
1357 uid_1, uid_2 = all_users[:2]
1358 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001359 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001360 os.chown(support.TESTFN, uid_1, gid)
1361 os.chown(support.TESTFN, uid_2, gid)
1362
Berker Peksag036a71b2015-07-21 09:29:48 +03001363 @classmethod
1364 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001365 os.rmdir(support.TESTFN)
1366
1367
Andrew Svetlov405faed2012-12-25 12:18:09 +02001368class RemoveDirsTests(unittest.TestCase):
1369 def setUp(self):
1370 os.makedirs(support.TESTFN)
1371
1372 def tearDown(self):
1373 support.rmtree(support.TESTFN)
1374
1375 def test_remove_all(self):
1376 dira = os.path.join(support.TESTFN, 'dira')
1377 os.mkdir(dira)
1378 dirb = os.path.join(dira, 'dirb')
1379 os.mkdir(dirb)
1380 os.removedirs(dirb)
1381 self.assertFalse(os.path.exists(dirb))
1382 self.assertFalse(os.path.exists(dira))
1383 self.assertFalse(os.path.exists(support.TESTFN))
1384
1385 def test_remove_partial(self):
1386 dira = os.path.join(support.TESTFN, 'dira')
1387 os.mkdir(dira)
1388 dirb = os.path.join(dira, 'dirb')
1389 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001390 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001391 os.removedirs(dirb)
1392 self.assertFalse(os.path.exists(dirb))
1393 self.assertTrue(os.path.exists(dira))
1394 self.assertTrue(os.path.exists(support.TESTFN))
1395
1396 def test_remove_nothing(self):
1397 dira = os.path.join(support.TESTFN, 'dira')
1398 os.mkdir(dira)
1399 dirb = os.path.join(dira, 'dirb')
1400 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001401 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001402 with self.assertRaises(OSError):
1403 os.removedirs(dirb)
1404 self.assertTrue(os.path.exists(dirb))
1405 self.assertTrue(os.path.exists(dira))
1406 self.assertTrue(os.path.exists(support.TESTFN))
1407
1408
Guido van Rossume7ba4952007-06-06 23:52:48 +00001409class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001410 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001411 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001412 f.write(b'hello')
1413 f.close()
1414 with open(os.devnull, 'rb') as f:
1415 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001416
Andrew Svetlov405faed2012-12-25 12:18:09 +02001417
Guido van Rossume7ba4952007-06-06 23:52:48 +00001418class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001419 def test_urandom_length(self):
1420 self.assertEqual(len(os.urandom(0)), 0)
1421 self.assertEqual(len(os.urandom(1)), 1)
1422 self.assertEqual(len(os.urandom(10)), 10)
1423 self.assertEqual(len(os.urandom(100)), 100)
1424 self.assertEqual(len(os.urandom(1000)), 1000)
1425
1426 def test_urandom_value(self):
1427 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001428 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001429 data2 = os.urandom(16)
1430 self.assertNotEqual(data1, data2)
1431
1432 def get_urandom_subprocess(self, count):
1433 code = '\n'.join((
1434 'import os, sys',
1435 'data = os.urandom(%s)' % count,
1436 'sys.stdout.buffer.write(data)',
1437 'sys.stdout.buffer.flush()'))
1438 out = assert_python_ok('-c', code)
1439 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001440 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001441 return stdout
1442
1443 def test_urandom_subprocess(self):
1444 data1 = self.get_urandom_subprocess(16)
1445 data2 = self.get_urandom_subprocess(16)
1446 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001447
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001448
Victor Stinner9b1f4742016-09-06 16:18:52 -07001449@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1450class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001451 @classmethod
1452 def setUpClass(cls):
1453 try:
1454 os.getrandom(1)
1455 except OSError as exc:
1456 if exc.errno == errno.ENOSYS:
1457 # Python compiled on a more recent Linux version
1458 # than the current Linux kernel
1459 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1460 else:
1461 raise
1462
Victor Stinner9b1f4742016-09-06 16:18:52 -07001463 def test_getrandom_type(self):
1464 data = os.getrandom(16)
1465 self.assertIsInstance(data, bytes)
1466 self.assertEqual(len(data), 16)
1467
1468 def test_getrandom0(self):
1469 empty = os.getrandom(0)
1470 self.assertEqual(empty, b'')
1471
1472 def test_getrandom_random(self):
1473 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1474
1475 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1476 # resource /dev/random
1477
1478 def test_getrandom_nonblock(self):
1479 # The call must not fail. Check also that the flag exists
1480 try:
1481 os.getrandom(1, os.GRND_NONBLOCK)
1482 except BlockingIOError:
1483 # System urandom is not initialized yet
1484 pass
1485
1486 def test_getrandom_value(self):
1487 data1 = os.getrandom(16)
1488 data2 = os.getrandom(16)
1489 self.assertNotEqual(data1, data2)
1490
1491
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001492# os.urandom() doesn't use a file descriptor when it is implemented with the
1493# getentropy() function, the getrandom() function or the getrandom() syscall
1494OS_URANDOM_DONT_USE_FD = (
1495 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1496 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1497 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001498
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001499@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1500 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001501@unittest.skipIf(sys.platform == "vxworks",
1502 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001503class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001504 @unittest.skipUnless(resource, "test requires the resource module")
1505 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001506 # Check urandom() failing when it is not able to open /dev/random.
1507 # We spawn a new process to make the test more robust (if getrlimit()
1508 # failed to restore the file descriptor limit after this, the whole
1509 # test suite would crash; this actually happened on the OS X Tiger
1510 # buildbot).
1511 code = """if 1:
1512 import errno
1513 import os
1514 import resource
1515
1516 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1517 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1518 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001519 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001520 except OSError as e:
1521 assert e.errno == errno.EMFILE, e.errno
1522 else:
1523 raise AssertionError("OSError not raised")
1524 """
1525 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001526
Antoine Pitroue472aea2014-04-26 14:33:03 +02001527 def test_urandom_fd_closed(self):
1528 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1529 # closed.
1530 code = """if 1:
1531 import os
1532 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001533 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001534 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001535 with test.support.SuppressCrashReport():
1536 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001537 sys.stdout.buffer.write(os.urandom(4))
1538 """
1539 rc, out, err = assert_python_ok('-Sc', code)
1540
1541 def test_urandom_fd_reopened(self):
1542 # Issue #21207: urandom() should detect its fd to /dev/urandom
1543 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001544 self.addCleanup(support.unlink, support.TESTFN)
1545 create_file(support.TESTFN, b"x" * 256)
1546
Antoine Pitroue472aea2014-04-26 14:33:03 +02001547 code = """if 1:
1548 import os
1549 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001550 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001551 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001552 with test.support.SuppressCrashReport():
1553 for fd in range(3, 256):
1554 try:
1555 os.close(fd)
1556 except OSError:
1557 pass
1558 else:
1559 # Found the urandom fd (XXX hopefully)
1560 break
1561 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001562 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001563 new_fd = f.fileno()
1564 # Issue #26935: posix allows new_fd and fd to be equal but
1565 # some libc implementations have dup2 return an error in this
1566 # case.
1567 if new_fd != fd:
1568 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001569 sys.stdout.buffer.write(os.urandom(4))
1570 sys.stdout.buffer.write(os.urandom(4))
1571 """.format(TESTFN=support.TESTFN)
1572 rc, out, err = assert_python_ok('-Sc', code)
1573 self.assertEqual(len(out), 8)
1574 self.assertNotEqual(out[0:4], out[4:8])
1575 rc, out2, err2 = assert_python_ok('-Sc', code)
1576 self.assertEqual(len(out2), 8)
1577 self.assertNotEqual(out2, out)
1578
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001579
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001580@contextlib.contextmanager
1581def _execvpe_mockup(defpath=None):
1582 """
1583 Stubs out execv and execve functions when used as context manager.
1584 Records exec calls. The mock execv and execve functions always raise an
1585 exception as they would normally never return.
1586 """
1587 # A list of tuples containing (function name, first arg, args)
1588 # of calls to execv or execve that have been made.
1589 calls = []
1590
1591 def mock_execv(name, *args):
1592 calls.append(('execv', name, args))
1593 raise RuntimeError("execv called")
1594
1595 def mock_execve(name, *args):
1596 calls.append(('execve', name, args))
1597 raise OSError(errno.ENOTDIR, "execve called")
1598
1599 try:
1600 orig_execv = os.execv
1601 orig_execve = os.execve
1602 orig_defpath = os.defpath
1603 os.execv = mock_execv
1604 os.execve = mock_execve
1605 if defpath is not None:
1606 os.defpath = defpath
1607 yield calls
1608 finally:
1609 os.execv = orig_execv
1610 os.execve = orig_execve
1611 os.defpath = orig_defpath
1612
pxinwrf2d7ac72019-05-21 18:46:37 +08001613@unittest.skipUnless(hasattr(os, 'execv'),
1614 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001615class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001616 @unittest.skipIf(USING_LINUXTHREADS,
1617 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001618 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001619 self.assertRaises(OSError, os.execvpe, 'no such app-',
1620 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001621
Steve Dowerbce26262016-11-19 19:17:26 -08001622 def test_execv_with_bad_arglist(self):
1623 self.assertRaises(ValueError, os.execv, 'notepad', ())
1624 self.assertRaises(ValueError, os.execv, 'notepad', [])
1625 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1626 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1627
Thomas Heller6790d602007-08-30 17:15:14 +00001628 def test_execvpe_with_bad_arglist(self):
1629 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001630 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1631 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001632
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001633 @unittest.skipUnless(hasattr(os, '_execvpe'),
1634 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001635 def _test_internal_execvpe(self, test_type):
1636 program_path = os.sep + 'absolutepath'
1637 if test_type is bytes:
1638 program = b'executable'
1639 fullpath = os.path.join(os.fsencode(program_path), program)
1640 native_fullpath = fullpath
1641 arguments = [b'progname', 'arg1', 'arg2']
1642 else:
1643 program = 'executable'
1644 arguments = ['progname', 'arg1', 'arg2']
1645 fullpath = os.path.join(program_path, program)
1646 if os.name != "nt":
1647 native_fullpath = os.fsencode(fullpath)
1648 else:
1649 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001650 env = {'spam': 'beans'}
1651
Victor Stinnerb745a742010-05-18 17:17:23 +00001652 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001653 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001654 self.assertRaises(RuntimeError,
1655 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001656 self.assertEqual(len(calls), 1)
1657 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1658
Victor Stinnerb745a742010-05-18 17:17:23 +00001659 # test os._execvpe() with a relative path:
1660 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001661 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001662 self.assertRaises(OSError,
1663 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001664 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001665 self.assertSequenceEqual(calls[0],
1666 ('execve', native_fullpath, (arguments, env)))
1667
1668 # test os._execvpe() with a relative path:
1669 # os.get_exec_path() reads the 'PATH' variable
1670 with _execvpe_mockup() as calls:
1671 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001672 if test_type is bytes:
1673 env_path[b'PATH'] = program_path
1674 else:
1675 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001676 self.assertRaises(OSError,
1677 os._execvpe, program, arguments, env=env_path)
1678 self.assertEqual(len(calls), 1)
1679 self.assertSequenceEqual(calls[0],
1680 ('execve', native_fullpath, (arguments, env_path)))
1681
1682 def test_internal_execvpe_str(self):
1683 self._test_internal_execvpe(str)
1684 if os.name != "nt":
1685 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001686
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001687 def test_execve_invalid_env(self):
1688 args = [sys.executable, '-c', 'pass']
1689
Ville Skyttä49b27342017-08-03 09:00:59 +03001690 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001691 newenv = os.environ.copy()
1692 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1693 with self.assertRaises(ValueError):
1694 os.execve(args[0], args, newenv)
1695
Ville Skyttä49b27342017-08-03 09:00:59 +03001696 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001697 newenv = os.environ.copy()
1698 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1699 with self.assertRaises(ValueError):
1700 os.execve(args[0], args, newenv)
1701
Ville Skyttä49b27342017-08-03 09:00:59 +03001702 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001703 newenv = os.environ.copy()
1704 newenv["FRUIT=ORANGE"] = "lemon"
1705 with self.assertRaises(ValueError):
1706 os.execve(args[0], args, newenv)
1707
Alexey Izbyshev83460312018-10-20 03:28:22 +03001708 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1709 def test_execve_with_empty_path(self):
1710 # bpo-32890: Check GetLastError() misuse
1711 try:
1712 os.execve('', ['arg'], {})
1713 except OSError as e:
1714 self.assertTrue(e.winerror is None or e.winerror != 0)
1715 else:
1716 self.fail('No OSError raised')
1717
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001718
Serhiy Storchaka43767632013-11-03 21:31:38 +02001719@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001720class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001721 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001722 try:
1723 os.stat(support.TESTFN)
1724 except FileNotFoundError:
1725 exists = False
1726 except OSError as exc:
1727 exists = True
1728 self.fail("file %s must not exist; os.stat failed with %s"
1729 % (support.TESTFN, exc))
1730 else:
1731 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001732
Thomas Wouters477c8d52006-05-27 19:21:47 +00001733 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001734 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001735
1736 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001737 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001738
1739 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001740 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001741
1742 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001743 self.addCleanup(support.unlink, support.TESTFN)
1744
Victor Stinnere77c9742016-03-25 10:28:23 +01001745 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001746 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001747
1748 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001749 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001750
Thomas Wouters477c8d52006-05-27 19:21:47 +00001751 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001752 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001753
Victor Stinnere77c9742016-03-25 10:28:23 +01001754
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001755class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001756 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001757 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1758 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001759 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001760 def get_single(f):
1761 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001762 if hasattr(os, f):
1763 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001764 return helper
1765 for f in singles:
1766 locals()["test_"+f] = get_single(f)
1767
Benjamin Peterson7522c742009-01-19 21:00:09 +00001768 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001769 try:
1770 f(support.make_bad_fd(), *args)
1771 except OSError as e:
1772 self.assertEqual(e.errno, errno.EBADF)
1773 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001774 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001775 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001776
Serhiy Storchaka43767632013-11-03 21:31:38 +02001777 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001778 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001779 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001780
Serhiy Storchaka43767632013-11-03 21:31:38 +02001781 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001782 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001783 fd = support.make_bad_fd()
1784 # Make sure none of the descriptors we are about to close are
1785 # currently valid (issue 6542).
1786 for i in range(10):
1787 try: os.fstat(fd+i)
1788 except OSError:
1789 pass
1790 else:
1791 break
1792 if i < 2:
1793 raise unittest.SkipTest(
1794 "Unable to acquire a range of invalid file descriptors")
1795 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001796
Serhiy Storchaka43767632013-11-03 21:31:38 +02001797 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001798 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001799 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001800
Serhiy Storchaka43767632013-11-03 21:31:38 +02001801 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001802 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001803 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001804
Serhiy Storchaka43767632013-11-03 21:31:38 +02001805 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001806 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001807 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001808
Serhiy Storchaka43767632013-11-03 21:31:38 +02001809 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001810 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001811 self.check(os.pathconf, "PC_NAME_MAX")
1812 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001813
Serhiy Storchaka43767632013-11-03 21:31:38 +02001814 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001815 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001816 self.check(os.truncate, 0)
1817 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001818
Serhiy Storchaka43767632013-11-03 21:31:38 +02001819 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001820 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001821 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001822
Serhiy Storchaka43767632013-11-03 21:31:38 +02001823 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001824 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001825 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001826
Victor Stinner57ddf782014-01-08 15:21:28 +01001827 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1828 def test_readv(self):
1829 buf = bytearray(10)
1830 self.check(os.readv, [buf])
1831
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001833 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001834 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001835
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001837 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001838 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001839
Victor Stinner57ddf782014-01-08 15:21:28 +01001840 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1841 def test_writev(self):
1842 self.check(os.writev, [b'abc'])
1843
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001844 def test_inheritable(self):
1845 self.check(os.get_inheritable)
1846 self.check(os.set_inheritable, True)
1847
1848 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1849 'needs os.get_blocking() and os.set_blocking()')
1850 def test_blocking(self):
1851 self.check(os.get_blocking)
1852 self.check(os.set_blocking, True)
1853
Brian Curtin1b9df392010-11-24 20:24:31 +00001854
1855class LinkTests(unittest.TestCase):
1856 def setUp(self):
1857 self.file1 = support.TESTFN
1858 self.file2 = os.path.join(support.TESTFN + "2")
1859
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001860 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001861 for file in (self.file1, self.file2):
1862 if os.path.exists(file):
1863 os.unlink(file)
1864
Brian Curtin1b9df392010-11-24 20:24:31 +00001865 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001866 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001867
xdegaye6a55d092017-11-12 17:57:04 +01001868 try:
1869 os.link(file1, file2)
1870 except PermissionError as e:
1871 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001872 with open(file1, "r") as f1, open(file2, "r") as f2:
1873 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1874
1875 def test_link(self):
1876 self._test_link(self.file1, self.file2)
1877
1878 def test_link_bytes(self):
1879 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1880 bytes(self.file2, sys.getfilesystemencoding()))
1881
Brian Curtinf498b752010-11-30 15:54:04 +00001882 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001883 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001884 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001885 except UnicodeError:
1886 raise unittest.SkipTest("Unable to encode for this platform.")
1887
Brian Curtinf498b752010-11-30 15:54:04 +00001888 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001889 self.file2 = self.file1 + "2"
1890 self._test_link(self.file1, self.file2)
1891
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1893class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001894 # uid_t and gid_t are 32-bit unsigned integers on Linux
1895 UID_OVERFLOW = (1 << 32)
1896 GID_OVERFLOW = (1 << 32)
1897
Serhiy Storchaka43767632013-11-03 21:31:38 +02001898 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1899 def test_setuid(self):
1900 if os.getuid() != 0:
1901 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001902 self.assertRaises(TypeError, os.setuid, 'not an int')
1903 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001904
Serhiy Storchaka43767632013-11-03 21:31:38 +02001905 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1906 def test_setgid(self):
1907 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1908 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001909 self.assertRaises(TypeError, os.setgid, 'not an int')
1910 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001911
Serhiy Storchaka43767632013-11-03 21:31:38 +02001912 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1913 def test_seteuid(self):
1914 if os.getuid() != 0:
1915 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001916 self.assertRaises(TypeError, os.setegid, 'not an int')
1917 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001918
Serhiy Storchaka43767632013-11-03 21:31:38 +02001919 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1920 def test_setegid(self):
1921 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1922 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001923 self.assertRaises(TypeError, os.setegid, 'not an int')
1924 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001925
Serhiy Storchaka43767632013-11-03 21:31:38 +02001926 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1927 def test_setreuid(self):
1928 if os.getuid() != 0:
1929 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001930 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1931 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1932 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1933 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001934
Serhiy Storchaka43767632013-11-03 21:31:38 +02001935 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1936 def test_setreuid_neg1(self):
1937 # Needs to accept -1. We run this in a subprocess to avoid
1938 # altering the test runner's process state (issue8045).
1939 subprocess.check_call([
1940 sys.executable, '-c',
1941 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001942
Serhiy Storchaka43767632013-11-03 21:31:38 +02001943 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1944 def test_setregid(self):
1945 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1946 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001947 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
1948 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
1949 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
1950 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001951
Serhiy Storchaka43767632013-11-03 21:31:38 +02001952 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1953 def test_setregid_neg1(self):
1954 # Needs to accept -1. We run this in a subprocess to avoid
1955 # altering the test runner's process state (issue8045).
1956 subprocess.check_call([
1957 sys.executable, '-c',
1958 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001959
Serhiy Storchaka43767632013-11-03 21:31:38 +02001960@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1961class Pep383Tests(unittest.TestCase):
1962 def setUp(self):
1963 if support.TESTFN_UNENCODABLE:
1964 self.dir = support.TESTFN_UNENCODABLE
1965 elif support.TESTFN_NONASCII:
1966 self.dir = support.TESTFN_NONASCII
1967 else:
1968 self.dir = support.TESTFN
1969 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001970
Serhiy Storchaka43767632013-11-03 21:31:38 +02001971 bytesfn = []
1972 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001973 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001974 fn = os.fsencode(fn)
1975 except UnicodeEncodeError:
1976 return
1977 bytesfn.append(fn)
1978 add_filename(support.TESTFN_UNICODE)
1979 if support.TESTFN_UNENCODABLE:
1980 add_filename(support.TESTFN_UNENCODABLE)
1981 if support.TESTFN_NONASCII:
1982 add_filename(support.TESTFN_NONASCII)
1983 if not bytesfn:
1984 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001985
Serhiy Storchaka43767632013-11-03 21:31:38 +02001986 self.unicodefn = set()
1987 os.mkdir(self.dir)
1988 try:
1989 for fn in bytesfn:
1990 support.create_empty_file(os.path.join(self.bdir, fn))
1991 fn = os.fsdecode(fn)
1992 if fn in self.unicodefn:
1993 raise ValueError("duplicate filename")
1994 self.unicodefn.add(fn)
1995 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001996 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001997 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001998
Serhiy Storchaka43767632013-11-03 21:31:38 +02001999 def tearDown(self):
2000 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002001
Serhiy Storchaka43767632013-11-03 21:31:38 +02002002 def test_listdir(self):
2003 expected = self.unicodefn
2004 found = set(os.listdir(self.dir))
2005 self.assertEqual(found, expected)
2006 # test listdir without arguments
2007 current_directory = os.getcwd()
2008 try:
2009 os.chdir(os.sep)
2010 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2011 finally:
2012 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002013
Serhiy Storchaka43767632013-11-03 21:31:38 +02002014 def test_open(self):
2015 for fn in self.unicodefn:
2016 f = open(os.path.join(self.dir, fn), 'rb')
2017 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002018
Serhiy Storchaka43767632013-11-03 21:31:38 +02002019 @unittest.skipUnless(hasattr(os, 'statvfs'),
2020 "need os.statvfs()")
2021 def test_statvfs(self):
2022 # issue #9645
2023 for fn in self.unicodefn:
2024 # should not fail with file not found error
2025 fullname = os.path.join(self.dir, fn)
2026 os.statvfs(fullname)
2027
2028 def test_stat(self):
2029 for fn in self.unicodefn:
2030 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002031
Brian Curtineb24d742010-04-12 17:16:38 +00002032@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2033class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002034 def _kill(self, sig):
2035 # Start sys.executable as a subprocess and communicate from the
2036 # subprocess to the parent that the interpreter is ready. When it
2037 # becomes ready, send *sig* via os.kill to the subprocess and check
2038 # that the return code is equal to *sig*.
2039 import ctypes
2040 from ctypes import wintypes
2041 import msvcrt
2042
2043 # Since we can't access the contents of the process' stdout until the
2044 # process has exited, use PeekNamedPipe to see what's inside stdout
2045 # without waiting. This is done so we can tell that the interpreter
2046 # is started and running at a point where it could handle a signal.
2047 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2048 PeekNamedPipe.restype = wintypes.BOOL
2049 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2050 ctypes.POINTER(ctypes.c_char), # stdout buf
2051 wintypes.DWORD, # Buffer size
2052 ctypes.POINTER(wintypes.DWORD), # bytes read
2053 ctypes.POINTER(wintypes.DWORD), # bytes avail
2054 ctypes.POINTER(wintypes.DWORD)) # bytes left
2055 msg = "running"
2056 proc = subprocess.Popen([sys.executable, "-c",
2057 "import sys;"
2058 "sys.stdout.write('{}');"
2059 "sys.stdout.flush();"
2060 "input()".format(msg)],
2061 stdout=subprocess.PIPE,
2062 stderr=subprocess.PIPE,
2063 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002064 self.addCleanup(proc.stdout.close)
2065 self.addCleanup(proc.stderr.close)
2066 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002067
2068 count, max = 0, 100
2069 while count < max and proc.poll() is None:
2070 # Create a string buffer to store the result of stdout from the pipe
2071 buf = ctypes.create_string_buffer(len(msg))
2072 # Obtain the text currently in proc.stdout
2073 # Bytes read/avail/left are left as NULL and unused
2074 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2075 buf, ctypes.sizeof(buf), None, None, None)
2076 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2077 if buf.value:
2078 self.assertEqual(msg, buf.value.decode())
2079 break
2080 time.sleep(0.1)
2081 count += 1
2082 else:
2083 self.fail("Did not receive communication from the subprocess")
2084
Brian Curtineb24d742010-04-12 17:16:38 +00002085 os.kill(proc.pid, sig)
2086 self.assertEqual(proc.wait(), sig)
2087
2088 def test_kill_sigterm(self):
2089 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002090 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002091
2092 def test_kill_int(self):
2093 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002094 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002095
2096 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002097 tagname = "test_os_%s" % uuid.uuid1()
2098 m = mmap.mmap(-1, 1, tagname)
2099 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002100 # Run a script which has console control handling enabled.
2101 proc = subprocess.Popen([sys.executable,
2102 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002103 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002104 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2105 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002106 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002107 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002108 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002109 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002110 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002111 count += 1
2112 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002113 # Forcefully kill the process if we weren't able to signal it.
2114 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002115 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002116 os.kill(proc.pid, event)
2117 # proc.send_signal(event) could also be done here.
2118 # Allow time for the signal to be passed and the process to exit.
2119 time.sleep(0.5)
2120 if not proc.poll():
2121 # Forcefully kill the process if we weren't able to signal it.
2122 os.kill(proc.pid, signal.SIGINT)
2123 self.fail("subprocess did not stop on {}".format(name))
2124
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002125 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002126 def test_CTRL_C_EVENT(self):
2127 from ctypes import wintypes
2128 import ctypes
2129
2130 # Make a NULL value by creating a pointer with no argument.
2131 NULL = ctypes.POINTER(ctypes.c_int)()
2132 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2133 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2134 wintypes.BOOL)
2135 SetConsoleCtrlHandler.restype = wintypes.BOOL
2136
2137 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002138 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002139 # by subprocesses.
2140 SetConsoleCtrlHandler(NULL, 0)
2141
2142 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2143
2144 def test_CTRL_BREAK_EVENT(self):
2145 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2146
2147
Brian Curtind40e6f72010-07-08 21:39:08 +00002148@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002149class Win32ListdirTests(unittest.TestCase):
2150 """Test listdir on Windows."""
2151
2152 def setUp(self):
2153 self.created_paths = []
2154 for i in range(2):
2155 dir_name = 'SUB%d' % i
2156 dir_path = os.path.join(support.TESTFN, dir_name)
2157 file_name = 'FILE%d' % i
2158 file_path = os.path.join(support.TESTFN, file_name)
2159 os.makedirs(dir_path)
2160 with open(file_path, 'w') as f:
2161 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2162 self.created_paths.extend([dir_name, file_name])
2163 self.created_paths.sort()
2164
2165 def tearDown(self):
2166 shutil.rmtree(support.TESTFN)
2167
2168 def test_listdir_no_extended_path(self):
2169 """Test when the path is not an "extended" path."""
2170 # unicode
2171 self.assertEqual(
2172 sorted(os.listdir(support.TESTFN)),
2173 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002174
Tim Golden781bbeb2013-10-25 20:24:06 +01002175 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002176 self.assertEqual(
2177 sorted(os.listdir(os.fsencode(support.TESTFN))),
2178 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002179
2180 def test_listdir_extended_path(self):
2181 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002182 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002183 # unicode
2184 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2185 self.assertEqual(
2186 sorted(os.listdir(path)),
2187 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002188
Tim Golden781bbeb2013-10-25 20:24:06 +01002189 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002190 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2191 self.assertEqual(
2192 sorted(os.listdir(path)),
2193 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002194
2195
Berker Peksage0b5b202018-08-15 13:03:41 +03002196@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2197class ReadlinkTests(unittest.TestCase):
2198 filelink = 'readlinktest'
2199 filelink_target = os.path.abspath(__file__)
2200 filelinkb = os.fsencode(filelink)
2201 filelinkb_target = os.fsencode(filelink_target)
2202
2203 def setUp(self):
2204 self.assertTrue(os.path.exists(self.filelink_target))
2205 self.assertTrue(os.path.exists(self.filelinkb_target))
2206 self.assertFalse(os.path.exists(self.filelink))
2207 self.assertFalse(os.path.exists(self.filelinkb))
2208
2209 def test_not_symlink(self):
2210 filelink_target = FakePath(self.filelink_target)
2211 self.assertRaises(OSError, os.readlink, self.filelink_target)
2212 self.assertRaises(OSError, os.readlink, filelink_target)
2213
2214 def test_missing_link(self):
2215 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2216 self.assertRaises(FileNotFoundError, os.readlink,
2217 FakePath('missing-link'))
2218
2219 @support.skip_unless_symlink
2220 def test_pathlike(self):
2221 os.symlink(self.filelink_target, self.filelink)
2222 self.addCleanup(support.unlink, self.filelink)
2223 filelink = FakePath(self.filelink)
2224 self.assertEqual(os.readlink(filelink), self.filelink_target)
2225
2226 @support.skip_unless_symlink
2227 def test_pathlike_bytes(self):
2228 os.symlink(self.filelinkb_target, self.filelinkb)
2229 self.addCleanup(support.unlink, self.filelinkb)
2230 path = os.readlink(FakePath(self.filelinkb))
2231 self.assertEqual(path, self.filelinkb_target)
2232 self.assertIsInstance(path, bytes)
2233
2234 @support.skip_unless_symlink
2235 def test_bytes(self):
2236 os.symlink(self.filelinkb_target, self.filelinkb)
2237 self.addCleanup(support.unlink, self.filelinkb)
2238 path = os.readlink(self.filelinkb)
2239 self.assertEqual(path, self.filelinkb_target)
2240 self.assertIsInstance(path, bytes)
2241
2242
Tim Golden781bbeb2013-10-25 20:24:06 +01002243@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002244@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002245class Win32SymlinkTests(unittest.TestCase):
2246 filelink = 'filelinktest'
2247 filelink_target = os.path.abspath(__file__)
2248 dirlink = 'dirlinktest'
2249 dirlink_target = os.path.dirname(filelink_target)
2250 missing_link = 'missing link'
2251
2252 def setUp(self):
2253 assert os.path.exists(self.dirlink_target)
2254 assert os.path.exists(self.filelink_target)
2255 assert not os.path.exists(self.dirlink)
2256 assert not os.path.exists(self.filelink)
2257 assert not os.path.exists(self.missing_link)
2258
2259 def tearDown(self):
2260 if os.path.exists(self.filelink):
2261 os.remove(self.filelink)
2262 if os.path.exists(self.dirlink):
2263 os.rmdir(self.dirlink)
2264 if os.path.lexists(self.missing_link):
2265 os.remove(self.missing_link)
2266
2267 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002268 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002269 self.assertTrue(os.path.exists(self.dirlink))
2270 self.assertTrue(os.path.isdir(self.dirlink))
2271 self.assertTrue(os.path.islink(self.dirlink))
2272 self.check_stat(self.dirlink, self.dirlink_target)
2273
2274 def test_file_link(self):
2275 os.symlink(self.filelink_target, self.filelink)
2276 self.assertTrue(os.path.exists(self.filelink))
2277 self.assertTrue(os.path.isfile(self.filelink))
2278 self.assertTrue(os.path.islink(self.filelink))
2279 self.check_stat(self.filelink, self.filelink_target)
2280
2281 def _create_missing_dir_link(self):
2282 'Create a "directory" link to a non-existent target'
2283 linkname = self.missing_link
2284 if os.path.lexists(linkname):
2285 os.remove(linkname)
2286 target = r'c:\\target does not exist.29r3c740'
2287 assert not os.path.exists(target)
2288 target_is_dir = True
2289 os.symlink(target, linkname, target_is_dir)
2290
2291 def test_remove_directory_link_to_missing_target(self):
2292 self._create_missing_dir_link()
2293 # For compatibility with Unix, os.remove will check the
2294 # directory status and call RemoveDirectory if the symlink
2295 # was created with target_is_dir==True.
2296 os.remove(self.missing_link)
2297
2298 @unittest.skip("currently fails; consider for improvement")
2299 def test_isdir_on_directory_link_to_missing_target(self):
2300 self._create_missing_dir_link()
2301 # consider having isdir return true for directory links
2302 self.assertTrue(os.path.isdir(self.missing_link))
2303
2304 @unittest.skip("currently fails; consider for improvement")
2305 def test_rmdir_on_directory_link_to_missing_target(self):
2306 self._create_missing_dir_link()
2307 # consider allowing rmdir to remove directory links
2308 os.rmdir(self.missing_link)
2309
2310 def check_stat(self, link, target):
2311 self.assertEqual(os.stat(link), os.stat(target))
2312 self.assertNotEqual(os.lstat(link), os.stat(link))
2313
Brian Curtind25aef52011-06-13 15:16:04 -05002314 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002315 self.assertEqual(os.stat(bytes_link), os.stat(target))
2316 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002317
2318 def test_12084(self):
2319 level1 = os.path.abspath(support.TESTFN)
2320 level2 = os.path.join(level1, "level2")
2321 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002322 self.addCleanup(support.rmtree, level1)
2323
2324 os.mkdir(level1)
2325 os.mkdir(level2)
2326 os.mkdir(level3)
2327
2328 file1 = os.path.abspath(os.path.join(level1, "file1"))
2329 create_file(file1)
2330
2331 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002332 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002333 os.chdir(level2)
2334 link = os.path.join(level2, "link")
2335 os.symlink(os.path.relpath(file1), "link")
2336 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002337
Victor Stinnerae39d232016-03-24 17:12:55 +01002338 # Check os.stat calls from the same dir as the link
2339 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002340
Victor Stinnerae39d232016-03-24 17:12:55 +01002341 # Check os.stat calls from a dir below the link
2342 os.chdir(level1)
2343 self.assertEqual(os.stat(file1),
2344 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002345
Victor Stinnerae39d232016-03-24 17:12:55 +01002346 # Check os.stat calls from a dir above the link
2347 os.chdir(level3)
2348 self.assertEqual(os.stat(file1),
2349 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002350 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002351 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002352
SSE43c34aad2018-02-13 00:10:35 +07002353 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2354 and os.path.exists(r'C:\ProgramData'),
2355 'Test directories not found')
2356 def test_29248(self):
2357 # os.symlink() calls CreateSymbolicLink, which creates
2358 # the reparse data buffer with the print name stored
2359 # first, so the offset is always 0. CreateSymbolicLink
2360 # stores the "PrintName" DOS path (e.g. "C:\") first,
2361 # with an offset of 0, followed by the "SubstituteName"
2362 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2363 # the other hand, seems to have been created manually
2364 # with an inverted order.
2365 target = os.readlink(r'C:\Users\All Users')
2366 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2367
Steve Dower6921e732018-03-05 14:26:08 -08002368 def test_buffer_overflow(self):
2369 # Older versions would have a buffer overflow when detecting
2370 # whether a link source was a directory. This test ensures we
2371 # no longer crash, but does not otherwise validate the behavior
2372 segment = 'X' * 27
2373 path = os.path.join(*[segment] * 10)
2374 test_cases = [
2375 # overflow with absolute src
2376 ('\\' + path, segment),
2377 # overflow dest with relative src
2378 (segment, path),
2379 # overflow when joining src
2380 (path[:180], path[:180]),
2381 ]
2382 for src, dest in test_cases:
2383 try:
2384 os.symlink(src, dest)
2385 except FileNotFoundError:
2386 pass
2387 else:
2388 try:
2389 os.remove(dest)
2390 except OSError:
2391 pass
2392 # Also test with bytes, since that is a separate code path.
2393 try:
2394 os.symlink(os.fsencode(src), os.fsencode(dest))
2395 except FileNotFoundError:
2396 pass
2397 else:
2398 try:
2399 os.remove(dest)
2400 except OSError:
2401 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002402
Tim Golden0321cf22014-05-05 19:46:17 +01002403@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2404class Win32JunctionTests(unittest.TestCase):
2405 junction = 'junctiontest'
2406 junction_target = os.path.dirname(os.path.abspath(__file__))
2407
2408 def setUp(self):
2409 assert os.path.exists(self.junction_target)
2410 assert not os.path.exists(self.junction)
2411
2412 def tearDown(self):
2413 if os.path.exists(self.junction):
2414 # os.rmdir delegates to Windows' RemoveDirectoryW,
2415 # which removes junction points safely.
2416 os.rmdir(self.junction)
2417
2418 def test_create_junction(self):
2419 _winapi.CreateJunction(self.junction_target, self.junction)
2420 self.assertTrue(os.path.exists(self.junction))
2421 self.assertTrue(os.path.isdir(self.junction))
2422
2423 # Junctions are not recognized as links.
2424 self.assertFalse(os.path.islink(self.junction))
2425
2426 def test_unlink_removes_junction(self):
2427 _winapi.CreateJunction(self.junction_target, self.junction)
2428 self.assertTrue(os.path.exists(self.junction))
2429
2430 os.unlink(self.junction)
2431 self.assertFalse(os.path.exists(self.junction))
2432
Mark Becwarb82bfac2019-02-02 16:08:23 -05002433@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2434class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002435 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002436 nt = support.import_module('nt')
2437 ctypes = support.import_module('ctypes')
2438 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002439
2440 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2441 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2442
2443 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2444 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2445 ctypes.wintypes.LPDWORD)
2446
2447 # This is a pseudo-handle that doesn't need to be closed
2448 hproc = kernel.GetCurrentProcess()
2449
2450 handle_count = ctypes.wintypes.DWORD()
2451 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2452 self.assertEqual(1, ok)
2453
2454 before_count = handle_count.value
2455
2456 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002457 filenames = [
2458 r'\\?\C:',
2459 r'\\?\NUL',
2460 r'\\?\CONIN',
2461 __file__,
2462 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002463
Berker Peksag6ef726a2019-04-22 18:46:28 +03002464 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002465 for name in filenames:
2466 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002467 nt._getfinalpathname(name)
2468 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002469 # Failure is expected
2470 pass
2471 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002472 os.stat(name)
2473 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002474 pass
2475
2476 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2477 self.assertEqual(1, ok)
2478
2479 handle_delta = handle_count.value - before_count
2480
2481 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002482
Jason R. Coombs3a092862013-05-27 23:21:28 -04002483@support.skip_unless_symlink
2484class NonLocalSymlinkTests(unittest.TestCase):
2485
2486 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002487 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002488 Create this structure:
2489
2490 base
2491 \___ some_dir
2492 """
2493 os.makedirs('base/some_dir')
2494
2495 def tearDown(self):
2496 shutil.rmtree('base')
2497
2498 def test_directory_link_nonlocal(self):
2499 """
2500 The symlink target should resolve relative to the link, not relative
2501 to the current directory.
2502
2503 Then, link base/some_link -> base/some_dir and ensure that some_link
2504 is resolved as a directory.
2505
2506 In issue13772, it was discovered that directory detection failed if
2507 the symlink target was not specified relative to the current
2508 directory, which was a defect in the implementation.
2509 """
2510 src = os.path.join('base', 'some_link')
2511 os.symlink('some_dir', src)
2512 assert os.path.isdir(src)
2513
2514
Victor Stinnere8d51452010-08-19 01:05:19 +00002515class FSEncodingTests(unittest.TestCase):
2516 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002517 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2518 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002519
Victor Stinnere8d51452010-08-19 01:05:19 +00002520 def test_identity(self):
2521 # assert fsdecode(fsencode(x)) == x
2522 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2523 try:
2524 bytesfn = os.fsencode(fn)
2525 except UnicodeEncodeError:
2526 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002527 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002528
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002529
Brett Cannonefb00c02012-02-29 18:31:31 -05002530
2531class DeviceEncodingTests(unittest.TestCase):
2532
2533 def test_bad_fd(self):
2534 # Return None when an fd doesn't actually exist.
2535 self.assertIsNone(os.device_encoding(123456))
2536
Paul Monson62dfd7d2019-04-25 11:36:45 -07002537 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002538 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002539 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002540 def test_device_encoding(self):
2541 encoding = os.device_encoding(0)
2542 self.assertIsNotNone(encoding)
2543 self.assertTrue(codecs.lookup(encoding))
2544
2545
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002546class PidTests(unittest.TestCase):
2547 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2548 def test_getppid(self):
2549 p = subprocess.Popen([sys.executable, '-c',
2550 'import os; print(os.getppid())'],
2551 stdout=subprocess.PIPE)
2552 stdout, _ = p.communicate()
2553 # We are the parent of our subprocess
2554 self.assertEqual(int(stdout), os.getpid())
2555
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002556 def test_waitpid(self):
2557 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002558 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002559 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002560 status = os.waitpid(pid, 0)
2561 self.assertEqual(status, (pid, 0))
2562
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002563
Victor Stinner4659ccf2016-09-14 10:57:00 +02002564class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002565 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002566 self.exitcode = 17
2567
2568 filename = support.TESTFN
2569 self.addCleanup(support.unlink, filename)
2570
2571 if not with_env:
2572 code = 'import sys; sys.exit(%s)' % self.exitcode
2573 else:
2574 self.env = dict(os.environ)
2575 # create an unique key
2576 self.key = str(uuid.uuid4())
2577 self.env[self.key] = self.key
2578 # read the variable from os.environ to check that it exists
2579 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2580 % (self.key, self.exitcode))
2581
2582 with open(filename, "w") as fp:
2583 fp.write(code)
2584
Berker Peksag81816462016-09-15 20:19:47 +03002585 args = [sys.executable, filename]
2586 if use_bytes:
2587 args = [os.fsencode(a) for a in args]
2588 self.env = {os.fsencode(k): os.fsencode(v)
2589 for k, v in self.env.items()}
2590
2591 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002592
Berker Peksag4af23d72016-09-15 20:32:44 +03002593 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002594 def test_spawnl(self):
2595 args = self.create_args()
2596 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2597 self.assertEqual(exitcode, self.exitcode)
2598
Berker Peksag4af23d72016-09-15 20:32:44 +03002599 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002600 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002601 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002602 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2603 self.assertEqual(exitcode, self.exitcode)
2604
Berker Peksag4af23d72016-09-15 20:32:44 +03002605 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002606 def test_spawnlp(self):
2607 args = self.create_args()
2608 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2609 self.assertEqual(exitcode, self.exitcode)
2610
Berker Peksag4af23d72016-09-15 20:32:44 +03002611 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002612 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002613 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002614 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2615 self.assertEqual(exitcode, self.exitcode)
2616
Berker Peksag4af23d72016-09-15 20:32:44 +03002617 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002618 def test_spawnv(self):
2619 args = self.create_args()
2620 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2621 self.assertEqual(exitcode, self.exitcode)
2622
Berker Peksag4af23d72016-09-15 20:32:44 +03002623 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002624 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002625 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002626 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2627 self.assertEqual(exitcode, self.exitcode)
2628
Berker Peksag4af23d72016-09-15 20:32:44 +03002629 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002630 def test_spawnvp(self):
2631 args = self.create_args()
2632 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2633 self.assertEqual(exitcode, self.exitcode)
2634
Berker Peksag4af23d72016-09-15 20:32:44 +03002635 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002636 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002637 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002638 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2639 self.assertEqual(exitcode, self.exitcode)
2640
Berker Peksag4af23d72016-09-15 20:32:44 +03002641 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002642 def test_nowait(self):
2643 args = self.create_args()
2644 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2645 result = os.waitpid(pid, 0)
2646 self.assertEqual(result[0], pid)
2647 status = result[1]
2648 if hasattr(os, 'WIFEXITED'):
2649 self.assertTrue(os.WIFEXITED(status))
2650 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2651 else:
2652 self.assertEqual(status, self.exitcode << 8)
2653
Berker Peksag4af23d72016-09-15 20:32:44 +03002654 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002655 def test_spawnve_bytes(self):
2656 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2657 args = self.create_args(with_env=True, use_bytes=True)
2658 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2659 self.assertEqual(exitcode, self.exitcode)
2660
Steve Dower859fd7b2016-11-19 18:53:19 -08002661 @requires_os_func('spawnl')
2662 def test_spawnl_noargs(self):
2663 args = self.create_args()
2664 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002665 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002666
2667 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002668 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002669 args = self.create_args()
2670 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002671 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002672
2673 @requires_os_func('spawnv')
2674 def test_spawnv_noargs(self):
2675 args = self.create_args()
2676 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2677 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002678 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2679 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002680
2681 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002682 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002683 args = self.create_args()
2684 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2685 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002686 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2687 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002688
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002689 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002690 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002691
Ville Skyttä49b27342017-08-03 09:00:59 +03002692 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002693 newenv = os.environ.copy()
2694 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2695 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002696 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002697 except ValueError:
2698 pass
2699 else:
2700 self.assertEqual(exitcode, 127)
2701
Ville Skyttä49b27342017-08-03 09:00:59 +03002702 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002703 newenv = os.environ.copy()
2704 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2705 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002706 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002707 except ValueError:
2708 pass
2709 else:
2710 self.assertEqual(exitcode, 127)
2711
Ville Skyttä49b27342017-08-03 09:00:59 +03002712 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002713 newenv = os.environ.copy()
2714 newenv["FRUIT=ORANGE"] = "lemon"
2715 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002716 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002717 except ValueError:
2718 pass
2719 else:
2720 self.assertEqual(exitcode, 127)
2721
Ville Skyttä49b27342017-08-03 09:00:59 +03002722 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002723 filename = support.TESTFN
2724 self.addCleanup(support.unlink, filename)
2725 with open(filename, "w") as fp:
2726 fp.write('import sys, os\n'
2727 'if os.getenv("FRUIT") != "orange=lemon":\n'
2728 ' raise AssertionError')
2729 args = [sys.executable, filename]
2730 newenv = os.environ.copy()
2731 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002732 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002733 self.assertEqual(exitcode, 0)
2734
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002735 @requires_os_func('spawnve')
2736 def test_spawnve_invalid_env(self):
2737 self._test_invalid_env(os.spawnve)
2738
2739 @requires_os_func('spawnvpe')
2740 def test_spawnvpe_invalid_env(self):
2741 self._test_invalid_env(os.spawnvpe)
2742
Serhiy Storchaka77703942017-06-25 07:33:01 +03002743
Brian Curtin0151b8e2010-09-24 13:43:43 +00002744# The introduction of this TestCase caused at least two different errors on
2745# *nix buildbots. Temporarily skip this to let the buildbots move along.
2746@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002747@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2748class LoginTests(unittest.TestCase):
2749 def test_getlogin(self):
2750 user_name = os.getlogin()
2751 self.assertNotEqual(len(user_name), 0)
2752
2753
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002754@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2755 "needs os.getpriority and os.setpriority")
2756class ProgramPriorityTests(unittest.TestCase):
2757 """Tests for os.getpriority() and os.setpriority()."""
2758
2759 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002760
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002761 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2762 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2763 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002764 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2765 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002766 raise unittest.SkipTest("unable to reliably test setpriority "
2767 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002768 else:
2769 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002770 finally:
2771 try:
2772 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2773 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002774 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002775 raise
2776
2777
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002779
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002781
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002782 def __init__(self, conn):
2783 asynchat.async_chat.__init__(self, conn)
2784 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002785 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002786 self.closed = False
2787 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002788
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002789 def handle_read(self):
2790 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002791 if self.accumulate:
2792 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002793
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002794 def get_data(self):
2795 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002796
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002797 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002798 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002799 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002800
2801 def handle_error(self):
2802 raise
2803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804 def __init__(self, address):
2805 threading.Thread.__init__(self)
2806 asyncore.dispatcher.__init__(self)
2807 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2808 self.bind(address)
2809 self.listen(5)
2810 self.host, self.port = self.socket.getsockname()[:2]
2811 self.handler_instance = None
2812 self._active = False
2813 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002814
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002815 # --- public API
2816
2817 @property
2818 def running(self):
2819 return self._active
2820
2821 def start(self):
2822 assert not self.running
2823 self.__flag = threading.Event()
2824 threading.Thread.start(self)
2825 self.__flag.wait()
2826
2827 def stop(self):
2828 assert self.running
2829 self._active = False
2830 self.join()
2831
2832 def wait(self):
2833 # wait for handler connection to be closed, then stop the server
2834 while not getattr(self.handler_instance, "closed", False):
2835 time.sleep(0.001)
2836 self.stop()
2837
2838 # --- internals
2839
2840 def run(self):
2841 self._active = True
2842 self.__flag.set()
2843 while self._active and asyncore.socket_map:
2844 self._active_lock.acquire()
2845 asyncore.loop(timeout=0.001, count=1)
2846 self._active_lock.release()
2847 asyncore.close_all()
2848
2849 def handle_accept(self):
2850 conn, addr = self.accept()
2851 self.handler_instance = self.Handler(conn)
2852
2853 def handle_connect(self):
2854 self.close()
2855 handle_read = handle_connect
2856
2857 def writable(self):
2858 return 0
2859
2860 def handle_error(self):
2861 raise
2862
2863
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002864@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2865class TestSendfile(unittest.TestCase):
2866
Victor Stinner8c663fd2017-11-08 14:44:44 -08002867 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002868 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002869 not sys.platform.startswith("solaris") and \
2870 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002871 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2872 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002873 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2874 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002875
2876 @classmethod
2877 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002878 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002879 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002880
2881 @classmethod
2882 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002883 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002884 support.unlink(support.TESTFN)
2885
2886 def setUp(self):
2887 self.server = SendfileTestServer((support.HOST, 0))
2888 self.server.start()
2889 self.client = socket.socket()
2890 self.client.connect((self.server.host, self.server.port))
2891 self.client.settimeout(1)
2892 # synchronize by waiting for "220 ready" response
2893 self.client.recv(1024)
2894 self.sockno = self.client.fileno()
2895 self.file = open(support.TESTFN, 'rb')
2896 self.fileno = self.file.fileno()
2897
2898 def tearDown(self):
2899 self.file.close()
2900 self.client.close()
2901 if self.server.running:
2902 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002903 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002904
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002905 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002906 """A higher level wrapper representing how an application is
2907 supposed to use sendfile().
2908 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002909 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002910 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002911 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002912 except OSError as err:
2913 if err.errno == errno.ECONNRESET:
2914 # disconnected
2915 raise
2916 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2917 # we have to retry send data
2918 continue
2919 else:
2920 raise
2921
2922 def test_send_whole_file(self):
2923 # normal send
2924 total_sent = 0
2925 offset = 0
2926 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002927 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002928 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2929 if sent == 0:
2930 break
2931 offset += sent
2932 total_sent += sent
2933 self.assertTrue(sent <= nbytes)
2934 self.assertEqual(offset, total_sent)
2935
2936 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002937 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002938 self.client.close()
2939 self.server.wait()
2940 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002941 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002942 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002943
2944 def test_send_at_certain_offset(self):
2945 # start sending a file at a certain offset
2946 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002947 offset = len(self.DATA) // 2
2948 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002949 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002950 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002951 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2952 if sent == 0:
2953 break
2954 offset += sent
2955 total_sent += sent
2956 self.assertTrue(sent <= nbytes)
2957
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002958 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002959 self.client.close()
2960 self.server.wait()
2961 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002962 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002963 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002964 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002965 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002966
2967 def test_offset_overflow(self):
2968 # specify an offset > file size
2969 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002970 try:
2971 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2972 except OSError as e:
2973 # Solaris can raise EINVAL if offset >= file length, ignore.
2974 if e.errno != errno.EINVAL:
2975 raise
2976 else:
2977 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002978 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002979 self.client.close()
2980 self.server.wait()
2981 data = self.server.handler_instance.get_data()
2982 self.assertEqual(data, b'')
2983
2984 def test_invalid_offset(self):
2985 with self.assertRaises(OSError) as cm:
2986 os.sendfile(self.sockno, self.fileno, -1, 4096)
2987 self.assertEqual(cm.exception.errno, errno.EINVAL)
2988
Martin Panterbf19d162015-09-09 01:01:13 +00002989 def test_keywords(self):
2990 # Keyword arguments should be supported
2991 os.sendfile(out=self.sockno, offset=0, count=4096,
2992 **{'in': self.fileno})
2993 if self.SUPPORT_HEADERS_TRAILERS:
2994 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002995 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002996
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002997 # --- headers / trailers tests
2998
Serhiy Storchaka43767632013-11-03 21:31:38 +02002999 @requires_headers_trailers
3000 def test_headers(self):
3001 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003002 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02003003 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003004 headers=[b"x" * 512, b"y" * 256])
3005 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003006 total_sent += sent
3007 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003008 while total_sent < len(expected_data):
3009 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003010 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3011 offset, nbytes)
3012 if sent == 0:
3013 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003014 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003015 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003016 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003017
Serhiy Storchaka43767632013-11-03 21:31:38 +02003018 self.assertEqual(total_sent, len(expected_data))
3019 self.client.close()
3020 self.server.wait()
3021 data = self.server.handler_instance.get_data()
3022 self.assertEqual(hash(data), hash(expected_data))
3023
3024 @requires_headers_trailers
3025 def test_trailers(self):
3026 TESTFN2 = support.TESTFN + "2"
3027 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003028
3029 self.addCleanup(support.unlink, TESTFN2)
3030 create_file(TESTFN2, file_data)
3031
3032 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003033 os.sendfile(self.sockno, f.fileno(), 0, 5,
3034 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003035 self.client.close()
3036 self.server.wait()
3037 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003038 self.assertEqual(data, b"abcde123456789")
3039
3040 @requires_headers_trailers
3041 @requires_32b
3042 def test_headers_overflow_32bits(self):
3043 self.server.handler_instance.accumulate = False
3044 with self.assertRaises(OSError) as cm:
3045 os.sendfile(self.sockno, self.fileno, 0, 0,
3046 headers=[b"x" * 2**16] * 2**15)
3047 self.assertEqual(cm.exception.errno, errno.EINVAL)
3048
3049 @requires_headers_trailers
3050 @requires_32b
3051 def test_trailers_overflow_32bits(self):
3052 self.server.handler_instance.accumulate = False
3053 with self.assertRaises(OSError) as cm:
3054 os.sendfile(self.sockno, self.fileno, 0, 0,
3055 trailers=[b"x" * 2**16] * 2**15)
3056 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003057
Serhiy Storchaka43767632013-11-03 21:31:38 +02003058 @requires_headers_trailers
3059 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3060 'test needs os.SF_NODISKIO')
3061 def test_flags(self):
3062 try:
3063 os.sendfile(self.sockno, self.fileno, 0, 4096,
3064 flags=os.SF_NODISKIO)
3065 except OSError as err:
3066 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3067 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003068
3069
Larry Hastings9cf065c2012-06-22 16:30:09 -07003070def supports_extended_attributes():
3071 if not hasattr(os, "setxattr"):
3072 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003073
Larry Hastings9cf065c2012-06-22 16:30:09 -07003074 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003075 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003076 try:
3077 os.setxattr(fp.fileno(), b"user.test", b"")
3078 except OSError:
3079 return False
3080 finally:
3081 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003082
3083 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003084
3085
3086@unittest.skipUnless(supports_extended_attributes(),
3087 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003088# Kernels < 2.6.39 don't respect setxattr flags.
3089@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003090class ExtendedAttributeTests(unittest.TestCase):
3091
Larry Hastings9cf065c2012-06-22 16:30:09 -07003092 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003093 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003094 self.addCleanup(support.unlink, fn)
3095 create_file(fn)
3096
Benjamin Peterson799bd802011-08-31 22:15:17 -04003097 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003098 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003099 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003100
Victor Stinnerf12e5062011-10-16 22:12:03 +02003101 init_xattr = listxattr(fn)
3102 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003103
Larry Hastings9cf065c2012-06-22 16:30:09 -07003104 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003105 xattr = set(init_xattr)
3106 xattr.add("user.test")
3107 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003108 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3109 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3110 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003111
Benjamin Peterson799bd802011-08-31 22:15:17 -04003112 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003113 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003114 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003115
Benjamin Peterson799bd802011-08-31 22:15:17 -04003116 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003117 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003118 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003119
Larry Hastings9cf065c2012-06-22 16:30:09 -07003120 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003121 xattr.add("user.test2")
3122 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003123 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003124
Benjamin Peterson799bd802011-08-31 22:15:17 -04003125 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003126 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003127 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003128
Victor Stinnerf12e5062011-10-16 22:12:03 +02003129 xattr.remove("user.test")
3130 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003131 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3132 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3133 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3134 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003135 many = sorted("user.test{}".format(i) for i in range(100))
3136 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003137 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003138 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003139
Larry Hastings9cf065c2012-06-22 16:30:09 -07003140 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003141 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003142 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003143
3144 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3145 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003146
3147 def test_simple(self):
3148 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3149 os.listxattr)
3150
3151 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003152 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3153 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003154
3155 def test_fds(self):
3156 def getxattr(path, *args):
3157 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003158 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003159 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003160 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003161 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003162 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003163 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003164 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003165 def listxattr(path, *args):
3166 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003167 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003168 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3169
3170
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003171@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3172class TermsizeTests(unittest.TestCase):
3173 def test_does_not_crash(self):
3174 """Check if get_terminal_size() returns a meaningful value.
3175
3176 There's no easy portable way to actually check the size of the
3177 terminal, so let's check if it returns something sensible instead.
3178 """
3179 try:
3180 size = os.get_terminal_size()
3181 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003182 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003183 # Under win32 a generic OSError can be thrown if the
3184 # handle cannot be retrieved
3185 self.skipTest("failed to query terminal size")
3186 raise
3187
Antoine Pitroucfade362012-02-08 23:48:59 +01003188 self.assertGreaterEqual(size.columns, 0)
3189 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003190
3191 def test_stty_match(self):
3192 """Check if stty returns the same results
3193
3194 stty actually tests stdin, so get_terminal_size is invoked on
3195 stdin explicitly. If stty succeeded, then get_terminal_size()
3196 should work too.
3197 """
3198 try:
3199 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003200 except (FileNotFoundError, subprocess.CalledProcessError,
3201 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003202 self.skipTest("stty invocation failed")
3203 expected = (int(size[1]), int(size[0])) # reversed order
3204
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003205 try:
3206 actual = os.get_terminal_size(sys.__stdin__.fileno())
3207 except OSError as e:
3208 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3209 # Under win32 a generic OSError can be thrown if the
3210 # handle cannot be retrieved
3211 self.skipTest("failed to query terminal size")
3212 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003213 self.assertEqual(expected, actual)
3214
3215
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003216@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003217@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003218class MemfdCreateTests(unittest.TestCase):
3219 def test_memfd_create(self):
3220 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3221 self.assertNotEqual(fd, -1)
3222 self.addCleanup(os.close, fd)
3223 self.assertFalse(os.get_inheritable(fd))
3224 with open(fd, "wb", closefd=False) as f:
3225 f.write(b'memfd_create')
3226 self.assertEqual(f.tell(), 12)
3227
3228 fd2 = os.memfd_create("Hi")
3229 self.addCleanup(os.close, fd2)
3230 self.assertFalse(os.get_inheritable(fd2))
3231
3232
Victor Stinner292c8352012-10-30 02:17:38 +01003233class OSErrorTests(unittest.TestCase):
3234 def setUp(self):
3235 class Str(str):
3236 pass
3237
Victor Stinnerafe17062012-10-31 22:47:43 +01003238 self.bytes_filenames = []
3239 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003240 if support.TESTFN_UNENCODABLE is not None:
3241 decoded = support.TESTFN_UNENCODABLE
3242 else:
3243 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003244 self.unicode_filenames.append(decoded)
3245 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003246 if support.TESTFN_UNDECODABLE is not None:
3247 encoded = support.TESTFN_UNDECODABLE
3248 else:
3249 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003250 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003251 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003252 self.bytes_filenames.append(memoryview(encoded))
3253
3254 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003255
3256 def test_oserror_filename(self):
3257 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003258 (self.filenames, os.chdir,),
3259 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003260 (self.filenames, os.lstat,),
3261 (self.filenames, os.open, os.O_RDONLY),
3262 (self.filenames, os.rmdir,),
3263 (self.filenames, os.stat,),
3264 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003265 ]
3266 if sys.platform == "win32":
3267 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003268 (self.bytes_filenames, os.rename, b"dst"),
3269 (self.bytes_filenames, os.replace, b"dst"),
3270 (self.unicode_filenames, os.rename, "dst"),
3271 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003272 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003273 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003274 else:
3275 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003276 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003277 (self.filenames, os.rename, "dst"),
3278 (self.filenames, os.replace, "dst"),
3279 ))
3280 if hasattr(os, "chown"):
3281 funcs.append((self.filenames, os.chown, 0, 0))
3282 if hasattr(os, "lchown"):
3283 funcs.append((self.filenames, os.lchown, 0, 0))
3284 if hasattr(os, "truncate"):
3285 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003286 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003287 funcs.append((self.filenames, os.chflags, 0))
3288 if hasattr(os, "lchflags"):
3289 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003290 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003291 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003292 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003293 if sys.platform == "win32":
3294 funcs.append((self.bytes_filenames, os.link, b"dst"))
3295 funcs.append((self.unicode_filenames, os.link, "dst"))
3296 else:
3297 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003298 if hasattr(os, "listxattr"):
3299 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003300 (self.filenames, os.listxattr,),
3301 (self.filenames, os.getxattr, "user.test"),
3302 (self.filenames, os.setxattr, "user.test", b'user'),
3303 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003304 ))
3305 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003306 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003307 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003308 if sys.platform == "win32":
3309 funcs.append((self.unicode_filenames, os.readlink,))
3310 else:
3311 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003312
Steve Dowercc16be82016-09-08 10:35:16 -07003313
Victor Stinnerafe17062012-10-31 22:47:43 +01003314 for filenames, func, *func_args in funcs:
3315 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003316 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003317 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003318 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003319 else:
3320 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3321 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003322 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003323 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003324 except UnicodeDecodeError:
3325 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003326 else:
3327 self.fail("No exception thrown by {}".format(func))
3328
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003329class CPUCountTests(unittest.TestCase):
3330 def test_cpu_count(self):
3331 cpus = os.cpu_count()
3332 if cpus is not None:
3333 self.assertIsInstance(cpus, int)
3334 self.assertGreater(cpus, 0)
3335 else:
3336 self.skipTest("Could not determine the number of CPUs")
3337
Victor Stinnerdaf45552013-08-28 00:53:59 +02003338
3339class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003340 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003341 fd = os.open(__file__, os.O_RDONLY)
3342 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003343 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003344
Victor Stinnerdaf45552013-08-28 00:53:59 +02003345 os.set_inheritable(fd, True)
3346 self.assertEqual(os.get_inheritable(fd), True)
3347
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003348 @unittest.skipIf(fcntl is None, "need fcntl")
3349 def test_get_inheritable_cloexec(self):
3350 fd = os.open(__file__, os.O_RDONLY)
3351 self.addCleanup(os.close, fd)
3352 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003353
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003354 # clear FD_CLOEXEC flag
3355 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3356 flags &= ~fcntl.FD_CLOEXEC
3357 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003358
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003359 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003360
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003361 @unittest.skipIf(fcntl is None, "need fcntl")
3362 def test_set_inheritable_cloexec(self):
3363 fd = os.open(__file__, os.O_RDONLY)
3364 self.addCleanup(os.close, fd)
3365 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3366 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003367
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003368 os.set_inheritable(fd, True)
3369 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3370 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003371
Victor Stinnerdaf45552013-08-28 00:53:59 +02003372 def test_open(self):
3373 fd = os.open(__file__, os.O_RDONLY)
3374 self.addCleanup(os.close, fd)
3375 self.assertEqual(os.get_inheritable(fd), False)
3376
3377 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3378 def test_pipe(self):
3379 rfd, wfd = os.pipe()
3380 self.addCleanup(os.close, rfd)
3381 self.addCleanup(os.close, wfd)
3382 self.assertEqual(os.get_inheritable(rfd), False)
3383 self.assertEqual(os.get_inheritable(wfd), False)
3384
3385 def test_dup(self):
3386 fd1 = os.open(__file__, os.O_RDONLY)
3387 self.addCleanup(os.close, fd1)
3388
3389 fd2 = os.dup(fd1)
3390 self.addCleanup(os.close, fd2)
3391 self.assertEqual(os.get_inheritable(fd2), False)
3392
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003393 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3394 def test_dup_nul(self):
3395 # os.dup() was creating inheritable fds for character files.
3396 fd1 = os.open('NUL', os.O_RDONLY)
3397 self.addCleanup(os.close, fd1)
3398 fd2 = os.dup(fd1)
3399 self.addCleanup(os.close, fd2)
3400 self.assertFalse(os.get_inheritable(fd2))
3401
Victor Stinnerdaf45552013-08-28 00:53:59 +02003402 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3403 def test_dup2(self):
3404 fd = os.open(__file__, os.O_RDONLY)
3405 self.addCleanup(os.close, fd)
3406
3407 # inheritable by default
3408 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003409 self.addCleanup(os.close, fd2)
3410 self.assertEqual(os.dup2(fd, fd2), fd2)
3411 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003412
3413 # force non-inheritable
3414 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003415 self.addCleanup(os.close, fd3)
3416 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3417 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003418
3419 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3420 def test_openpty(self):
3421 master_fd, slave_fd = os.openpty()
3422 self.addCleanup(os.close, master_fd)
3423 self.addCleanup(os.close, slave_fd)
3424 self.assertEqual(os.get_inheritable(master_fd), False)
3425 self.assertEqual(os.get_inheritable(slave_fd), False)
3426
3427
Brett Cannon3f9183b2016-08-26 14:44:48 -07003428class PathTConverterTests(unittest.TestCase):
3429 # tuples of (function name, allows fd arguments, additional arguments to
3430 # function, cleanup function)
3431 functions = [
3432 ('stat', True, (), None),
3433 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003434 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003435 ('chflags', False, (0,), None),
3436 ('lchflags', False, (0,), None),
3437 ('open', False, (0,), getattr(os, 'close', None)),
3438 ]
3439
3440 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003441 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003442 if os.name == 'nt':
3443 bytes_fspath = bytes_filename = None
3444 else:
3445 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003446 bytes_fspath = FakePath(bytes_filename)
3447 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003448 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003449 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003450
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003451 int_fspath = FakePath(fd)
3452 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003453
3454 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3455 with self.subTest(name=name):
3456 try:
3457 fn = getattr(os, name)
3458 except AttributeError:
3459 continue
3460
Brett Cannon8f96a302016-08-26 19:30:11 -07003461 for path in (str_filename, bytes_filename, str_fspath,
3462 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003463 if path is None:
3464 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003465 with self.subTest(name=name, path=path):
3466 result = fn(path, *extra_args)
3467 if cleanup_fn is not None:
3468 cleanup_fn(result)
3469
3470 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003471 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003472 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003473
3474 if allow_fd:
3475 result = fn(fd, *extra_args) # should not fail
3476 if cleanup_fn is not None:
3477 cleanup_fn(result)
3478 else:
3479 with self.assertRaisesRegex(
3480 TypeError,
3481 'os.PathLike'):
3482 fn(fd, *extra_args)
3483
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003484 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003485 msg = r'__fspath__\(\) to return str or bytes, not %s'
3486 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003487 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003488 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003489 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003490 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003491 os.stat(FakePath(object()))
3492
Brett Cannon3f9183b2016-08-26 14:44:48 -07003493
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003494@unittest.skipUnless(hasattr(os, 'get_blocking'),
3495 'needs os.get_blocking() and os.set_blocking()')
3496class BlockingTests(unittest.TestCase):
3497 def test_blocking(self):
3498 fd = os.open(__file__, os.O_RDONLY)
3499 self.addCleanup(os.close, fd)
3500 self.assertEqual(os.get_blocking(fd), True)
3501
3502 os.set_blocking(fd, False)
3503 self.assertEqual(os.get_blocking(fd), False)
3504
3505 os.set_blocking(fd, True)
3506 self.assertEqual(os.get_blocking(fd), True)
3507
3508
Yury Selivanov97e2e062014-09-26 12:33:06 -04003509
3510class ExportsTests(unittest.TestCase):
3511 def test_os_all(self):
3512 self.assertIn('open', os.__all__)
3513 self.assertIn('walk', os.__all__)
3514
3515
Victor Stinner6036e442015-03-08 01:58:04 +01003516class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003517 check_no_resource_warning = support.check_no_resource_warning
3518
Victor Stinner6036e442015-03-08 01:58:04 +01003519 def setUp(self):
3520 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003521 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003522 self.addCleanup(support.rmtree, self.path)
3523 os.mkdir(self.path)
3524
3525 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003526 path = self.bytes_path if isinstance(name, bytes) else self.path
3527 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003528 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003529 return filename
3530
3531 def get_entries(self, names):
3532 entries = dict((entry.name, entry)
3533 for entry in os.scandir(self.path))
3534 self.assertEqual(sorted(entries.keys()), names)
3535 return entries
3536
3537 def assert_stat_equal(self, stat1, stat2, skip_fields):
3538 if skip_fields:
3539 for attr in dir(stat1):
3540 if not attr.startswith("st_"):
3541 continue
3542 if attr in ("st_dev", "st_ino", "st_nlink"):
3543 continue
3544 self.assertEqual(getattr(stat1, attr),
3545 getattr(stat2, attr),
3546 (stat1, stat2, attr))
3547 else:
3548 self.assertEqual(stat1, stat2)
3549
3550 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003551 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003552 self.assertEqual(entry.name, name)
3553 self.assertEqual(entry.path, os.path.join(self.path, name))
3554 self.assertEqual(entry.inode(),
3555 os.stat(entry.path, follow_symlinks=False).st_ino)
3556
3557 entry_stat = os.stat(entry.path)
3558 self.assertEqual(entry.is_dir(),
3559 stat.S_ISDIR(entry_stat.st_mode))
3560 self.assertEqual(entry.is_file(),
3561 stat.S_ISREG(entry_stat.st_mode))
3562 self.assertEqual(entry.is_symlink(),
3563 os.path.islink(entry.path))
3564
3565 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3566 self.assertEqual(entry.is_dir(follow_symlinks=False),
3567 stat.S_ISDIR(entry_lstat.st_mode))
3568 self.assertEqual(entry.is_file(follow_symlinks=False),
3569 stat.S_ISREG(entry_lstat.st_mode))
3570
3571 self.assert_stat_equal(entry.stat(),
3572 entry_stat,
3573 os.name == 'nt' and not is_symlink)
3574 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3575 entry_lstat,
3576 os.name == 'nt')
3577
3578 def test_attributes(self):
3579 link = hasattr(os, 'link')
3580 symlink = support.can_symlink()
3581
3582 dirname = os.path.join(self.path, "dir")
3583 os.mkdir(dirname)
3584 filename = self.create_file("file.txt")
3585 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003586 try:
3587 os.link(filename, os.path.join(self.path, "link_file.txt"))
3588 except PermissionError as e:
3589 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003590 if symlink:
3591 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3592 target_is_directory=True)
3593 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3594
3595 names = ['dir', 'file.txt']
3596 if link:
3597 names.append('link_file.txt')
3598 if symlink:
3599 names.extend(('symlink_dir', 'symlink_file.txt'))
3600 entries = self.get_entries(names)
3601
3602 entry = entries['dir']
3603 self.check_entry(entry, 'dir', True, False, False)
3604
3605 entry = entries['file.txt']
3606 self.check_entry(entry, 'file.txt', False, True, False)
3607
3608 if link:
3609 entry = entries['link_file.txt']
3610 self.check_entry(entry, 'link_file.txt', False, True, False)
3611
3612 if symlink:
3613 entry = entries['symlink_dir']
3614 self.check_entry(entry, 'symlink_dir', True, False, True)
3615
3616 entry = entries['symlink_file.txt']
3617 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3618
3619 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003620 path = self.bytes_path if isinstance(name, bytes) else self.path
3621 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003622 self.assertEqual(len(entries), 1)
3623
3624 entry = entries[0]
3625 self.assertEqual(entry.name, name)
3626 return entry
3627
Brett Cannon96881cd2016-06-10 14:37:21 -07003628 def create_file_entry(self, name='file.txt'):
3629 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003630 return self.get_entry(os.path.basename(filename))
3631
3632 def test_current_directory(self):
3633 filename = self.create_file()
3634 old_dir = os.getcwd()
3635 try:
3636 os.chdir(self.path)
3637
3638 # call scandir() without parameter: it must list the content
3639 # of the current directory
3640 entries = dict((entry.name, entry) for entry in os.scandir())
3641 self.assertEqual(sorted(entries.keys()),
3642 [os.path.basename(filename)])
3643 finally:
3644 os.chdir(old_dir)
3645
3646 def test_repr(self):
3647 entry = self.create_file_entry()
3648 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3649
Brett Cannon96881cd2016-06-10 14:37:21 -07003650 def test_fspath_protocol(self):
3651 entry = self.create_file_entry()
3652 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3653
3654 def test_fspath_protocol_bytes(self):
3655 bytes_filename = os.fsencode('bytesfile.txt')
3656 bytes_entry = self.create_file_entry(name=bytes_filename)
3657 fspath = os.fspath(bytes_entry)
3658 self.assertIsInstance(fspath, bytes)
3659 self.assertEqual(fspath,
3660 os.path.join(os.fsencode(self.path),bytes_filename))
3661
Victor Stinner6036e442015-03-08 01:58:04 +01003662 def test_removed_dir(self):
3663 path = os.path.join(self.path, 'dir')
3664
3665 os.mkdir(path)
3666 entry = self.get_entry('dir')
3667 os.rmdir(path)
3668
3669 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3670 if os.name == 'nt':
3671 self.assertTrue(entry.is_dir())
3672 self.assertFalse(entry.is_file())
3673 self.assertFalse(entry.is_symlink())
3674 if os.name == 'nt':
3675 self.assertRaises(FileNotFoundError, entry.inode)
3676 # don't fail
3677 entry.stat()
3678 entry.stat(follow_symlinks=False)
3679 else:
3680 self.assertGreater(entry.inode(), 0)
3681 self.assertRaises(FileNotFoundError, entry.stat)
3682 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3683
3684 def test_removed_file(self):
3685 entry = self.create_file_entry()
3686 os.unlink(entry.path)
3687
3688 self.assertFalse(entry.is_dir())
3689 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3690 if os.name == 'nt':
3691 self.assertTrue(entry.is_file())
3692 self.assertFalse(entry.is_symlink())
3693 if os.name == 'nt':
3694 self.assertRaises(FileNotFoundError, entry.inode)
3695 # don't fail
3696 entry.stat()
3697 entry.stat(follow_symlinks=False)
3698 else:
3699 self.assertGreater(entry.inode(), 0)
3700 self.assertRaises(FileNotFoundError, entry.stat)
3701 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3702
3703 def test_broken_symlink(self):
3704 if not support.can_symlink():
3705 return self.skipTest('cannot create symbolic link')
3706
3707 filename = self.create_file("file.txt")
3708 os.symlink(filename,
3709 os.path.join(self.path, "symlink.txt"))
3710 entries = self.get_entries(['file.txt', 'symlink.txt'])
3711 entry = entries['symlink.txt']
3712 os.unlink(filename)
3713
3714 self.assertGreater(entry.inode(), 0)
3715 self.assertFalse(entry.is_dir())
3716 self.assertFalse(entry.is_file()) # broken symlink returns False
3717 self.assertFalse(entry.is_dir(follow_symlinks=False))
3718 self.assertFalse(entry.is_file(follow_symlinks=False))
3719 self.assertTrue(entry.is_symlink())
3720 self.assertRaises(FileNotFoundError, entry.stat)
3721 # don't fail
3722 entry.stat(follow_symlinks=False)
3723
3724 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003725 self.create_file("file.txt")
3726
3727 path_bytes = os.fsencode(self.path)
3728 entries = list(os.scandir(path_bytes))
3729 self.assertEqual(len(entries), 1, entries)
3730 entry = entries[0]
3731
3732 self.assertEqual(entry.name, b'file.txt')
3733 self.assertEqual(entry.path,
3734 os.fsencode(os.path.join(self.path, 'file.txt')))
3735
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003736 def test_bytes_like(self):
3737 self.create_file("file.txt")
3738
3739 for cls in bytearray, memoryview:
3740 path_bytes = cls(os.fsencode(self.path))
3741 with self.assertWarns(DeprecationWarning):
3742 entries = list(os.scandir(path_bytes))
3743 self.assertEqual(len(entries), 1, entries)
3744 entry = entries[0]
3745
3746 self.assertEqual(entry.name, b'file.txt')
3747 self.assertEqual(entry.path,
3748 os.fsencode(os.path.join(self.path, 'file.txt')))
3749 self.assertIs(type(entry.name), bytes)
3750 self.assertIs(type(entry.path), bytes)
3751
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003752 @unittest.skipUnless(os.listdir in os.supports_fd,
3753 'fd support for listdir required for this test.')
3754 def test_fd(self):
3755 self.assertIn(os.scandir, os.supports_fd)
3756 self.create_file('file.txt')
3757 expected_names = ['file.txt']
3758 if support.can_symlink():
3759 os.symlink('file.txt', os.path.join(self.path, 'link'))
3760 expected_names.append('link')
3761
3762 fd = os.open(self.path, os.O_RDONLY)
3763 try:
3764 with os.scandir(fd) as it:
3765 entries = list(it)
3766 names = [entry.name for entry in entries]
3767 self.assertEqual(sorted(names), expected_names)
3768 self.assertEqual(names, os.listdir(fd))
3769 for entry in entries:
3770 self.assertEqual(entry.path, entry.name)
3771 self.assertEqual(os.fspath(entry), entry.name)
3772 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3773 if os.stat in os.supports_dir_fd:
3774 st = os.stat(entry.name, dir_fd=fd)
3775 self.assertEqual(entry.stat(), st)
3776 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3777 self.assertEqual(entry.stat(follow_symlinks=False), st)
3778 finally:
3779 os.close(fd)
3780
Victor Stinner6036e442015-03-08 01:58:04 +01003781 def test_empty_path(self):
3782 self.assertRaises(FileNotFoundError, os.scandir, '')
3783
3784 def test_consume_iterator_twice(self):
3785 self.create_file("file.txt")
3786 iterator = os.scandir(self.path)
3787
3788 entries = list(iterator)
3789 self.assertEqual(len(entries), 1, entries)
3790
3791 # check than consuming the iterator twice doesn't raise exception
3792 entries2 = list(iterator)
3793 self.assertEqual(len(entries2), 0, entries2)
3794
3795 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003796 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003797 self.assertRaises(TypeError, os.scandir, obj)
3798
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003799 def test_close(self):
3800 self.create_file("file.txt")
3801 self.create_file("file2.txt")
3802 iterator = os.scandir(self.path)
3803 next(iterator)
3804 iterator.close()
3805 # multiple closes
3806 iterator.close()
3807 with self.check_no_resource_warning():
3808 del iterator
3809
3810 def test_context_manager(self):
3811 self.create_file("file.txt")
3812 self.create_file("file2.txt")
3813 with os.scandir(self.path) as iterator:
3814 next(iterator)
3815 with self.check_no_resource_warning():
3816 del iterator
3817
3818 def test_context_manager_close(self):
3819 self.create_file("file.txt")
3820 self.create_file("file2.txt")
3821 with os.scandir(self.path) as iterator:
3822 next(iterator)
3823 iterator.close()
3824
3825 def test_context_manager_exception(self):
3826 self.create_file("file.txt")
3827 self.create_file("file2.txt")
3828 with self.assertRaises(ZeroDivisionError):
3829 with os.scandir(self.path) as iterator:
3830 next(iterator)
3831 1/0
3832 with self.check_no_resource_warning():
3833 del iterator
3834
3835 def test_resource_warning(self):
3836 self.create_file("file.txt")
3837 self.create_file("file2.txt")
3838 iterator = os.scandir(self.path)
3839 next(iterator)
3840 with self.assertWarns(ResourceWarning):
3841 del iterator
3842 support.gc_collect()
3843 # exhausted iterator
3844 iterator = os.scandir(self.path)
3845 list(iterator)
3846 with self.check_no_resource_warning():
3847 del iterator
3848
Victor Stinner6036e442015-03-08 01:58:04 +01003849
Ethan Furmancdc08792016-06-02 15:06:09 -07003850class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003851
3852 # Abstracted so it can be overridden to test pure Python implementation
3853 # if a C version is provided.
3854 fspath = staticmethod(os.fspath)
3855
Ethan Furmancdc08792016-06-02 15:06:09 -07003856 def test_return_bytes(self):
3857 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003858 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003859
3860 def test_return_string(self):
3861 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003862 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003863
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003864 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003865 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003866 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003867
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003868 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003869 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3870 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3871
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003872 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003873 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3874 self.assertTrue(issubclass(FakePath, os.PathLike))
3875 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003876
Ethan Furmancdc08792016-06-02 15:06:09 -07003877 def test_garbage_in_exception_out(self):
3878 vapor = type('blah', (), {})
3879 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003880 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003881
3882 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003883 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003884
Brett Cannon044283a2016-07-15 10:41:49 -07003885 def test_bad_pathlike(self):
3886 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003887 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003888 # __fspath__ attribute that is not callable.
3889 c = type('foo', (), {})
3890 c.__fspath__ = 1
3891 self.assertRaises(TypeError, self.fspath, c())
3892 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003893 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003894 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003895
Victor Stinnerc29b5852017-11-02 07:28:27 -07003896
3897class TimesTests(unittest.TestCase):
3898 def test_times(self):
3899 times = os.times()
3900 self.assertIsInstance(times, os.times_result)
3901
3902 for field in ('user', 'system', 'children_user', 'children_system',
3903 'elapsed'):
3904 value = getattr(times, field)
3905 self.assertIsInstance(value, float)
3906
3907 if os.name == 'nt':
3908 self.assertEqual(times.children_user, 0)
3909 self.assertEqual(times.children_system, 0)
3910 self.assertEqual(times.elapsed, 0)
3911
3912
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003913# Only test if the C version is provided, otherwise TestPEP519 already tested
3914# the pure Python implementation.
3915if hasattr(os, "_fspath"):
3916 class TestPEP519PurePython(TestPEP519):
3917
3918 """Explicitly test the pure Python implementation of os.fspath()."""
3919
3920 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003921
3922
Fred Drake2e2be372001-09-20 21:33:42 +00003923if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003924 unittest.main()