blob: 527f81463280ac1dc8336cce8c63fbd65f6b3cf0 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070031from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020037try:
38 import fcntl
39except ImportError:
40 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010041try:
42 import _winapi
43except ImportError:
44 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020045try:
R David Murrayf2ad1732014-12-25 18:36:56 -050046 import pwd
47 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010048except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050049 all_users = []
50try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020051 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020052except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020053 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020054
Berker Peksagce643912015-05-06 06:33:17 +030055from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020056from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000057
Victor Stinner923590e2016-03-24 09:11:48 +010058
R David Murrayf2ad1732014-12-25 18:36:56 -050059root_in_posix = False
60if hasattr(os, 'geteuid'):
61 root_in_posix = (os.geteuid() == 0)
62
Mark Dickinson7cf03892010-04-16 13:45:35 +000063# Detect whether we're on a Linux system that uses the (now outdated
64# and unmaintained) linuxthreads threading library. There's an issue
65# when combining linuxthreads with a failed execv call: see
66# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020067if hasattr(sys, 'thread_info') and sys.thread_info.version:
68 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
69else:
70 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000071
Stefan Krahebee49a2013-01-17 15:31:00 +010072# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
73HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
74
Victor Stinner923590e2016-03-24 09:11:48 +010075
Berker Peksag4af23d72016-09-15 20:32:44 +030076def requires_os_func(name):
77 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
78
79
Victor Stinnerae39d232016-03-24 17:12:55 +010080def create_file(filename, content=b'content'):
81 with open(filename, "xb", 0) as fp:
82 fp.write(content)
83
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100144 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200145
146 # Issue #21932: Make sure that os.read() does not raise an
147 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200148 with open(support.TESTFN, "rb") as fp:
149 data = os.read(fp.fileno(), size)
150
Victor Stinner8c663fd2017-11-08 14:44:44 -0800151 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200152 # operating system is free to return less bytes than requested.
153 self.assertEqual(data, b'test')
154
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000155 def test_write(self):
156 # os.write() accepts bytes- and buffer-like objects but not strings
157 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
158 self.assertRaises(TypeError, os.write, fd, "beans")
159 os.write(fd, b"bacon\n")
160 os.write(fd, bytearray(b"eggs\n"))
161 os.write(fd, memoryview(b"spam\n"))
162 os.close(fd)
163 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000164 self.assertEqual(fobj.read().splitlines(),
165 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000166
Victor Stinnere0daff12011-03-20 23:36:35 +0100167 def write_windows_console(self, *args):
168 retcode = subprocess.call(args,
169 # use a new console to not flood the test output
170 creationflags=subprocess.CREATE_NEW_CONSOLE,
171 # use a shell to hide the console window (SW_HIDE)
172 shell=True)
173 self.assertEqual(retcode, 0)
174
175 @unittest.skipUnless(sys.platform == 'win32',
176 'test specific to the Windows console')
177 def test_write_windows_console(self):
178 # Issue #11395: the Windows console returns an error (12: not enough
179 # space error) on writing into stdout if stdout mode is binary and the
180 # length is greater than 66,000 bytes (or less, depending on heap
181 # usage).
182 code = "print('x' * 100000)"
183 self.write_windows_console(sys.executable, "-c", code)
184 self.write_windows_console(sys.executable, "-u", "-c", code)
185
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000186 def fdopen_helper(self, *args):
187 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200188 f = os.fdopen(fd, *args)
189 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000190
191 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200192 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
193 os.close(fd)
194
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000195 self.fdopen_helper()
196 self.fdopen_helper('r')
197 self.fdopen_helper('r', 100)
198
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100199 def test_replace(self):
200 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100201 self.addCleanup(support.unlink, support.TESTFN)
202 self.addCleanup(support.unlink, TESTFN2)
203
204 create_file(support.TESTFN, b"1")
205 create_file(TESTFN2, b"2")
206
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Pablo Galindoaac4d032019-05-31 19:39:47 +0100225 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
226 def test_copy_file_range_invalid_values(self):
227 with self.assertRaises(ValueError):
228 os.copy_file_range(0, 1, -10)
229
230 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
231 def test_copy_file_range(self):
232 TESTFN2 = support.TESTFN + ".3"
233 data = b'0123456789'
234
235 create_file(support.TESTFN, data)
236 self.addCleanup(support.unlink, support.TESTFN)
237
238 in_file = open(support.TESTFN, 'rb')
239 self.addCleanup(in_file.close)
240 in_fd = in_file.fileno()
241
242 out_file = open(TESTFN2, 'w+b')
243 self.addCleanup(support.unlink, TESTFN2)
244 self.addCleanup(out_file.close)
245 out_fd = out_file.fileno()
246
247 try:
248 i = os.copy_file_range(in_fd, out_fd, 5)
249 except OSError as e:
250 # Handle the case in which Python was compiled
251 # in a system with the syscall but without support
252 # in the kernel.
253 if e.errno != errno.ENOSYS:
254 raise
255 self.skipTest(e)
256 else:
257 # The number of copied bytes can be less than
258 # the number of bytes originally requested.
259 self.assertIn(i, range(0, 6));
260
261 with open(TESTFN2, 'rb') as in_file:
262 self.assertEqual(in_file.read(), data[:i])
263
264 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
265 def test_copy_file_range_offset(self):
266 TESTFN4 = support.TESTFN + ".4"
267 data = b'0123456789'
268 bytes_to_copy = 6
269 in_skip = 3
270 out_seek = 5
271
272 create_file(support.TESTFN, data)
273 self.addCleanup(support.unlink, support.TESTFN)
274
275 in_file = open(support.TESTFN, 'rb')
276 self.addCleanup(in_file.close)
277 in_fd = in_file.fileno()
278
279 out_file = open(TESTFN4, 'w+b')
280 self.addCleanup(support.unlink, TESTFN4)
281 self.addCleanup(out_file.close)
282 out_fd = out_file.fileno()
283
284 try:
285 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
286 offset_src=in_skip,
287 offset_dst=out_seek)
288 except OSError as e:
289 # Handle the case in which Python was compiled
290 # in a system with the syscall but without support
291 # in the kernel.
292 if e.errno != errno.ENOSYS:
293 raise
294 self.skipTest(e)
295 else:
296 # The number of copied bytes can be less than
297 # the number of bytes originally requested.
298 self.assertIn(i, range(0, bytes_to_copy+1));
299
300 with open(TESTFN4, 'rb') as in_file:
301 read = in_file.read()
302 # seeked bytes (5) are zero'ed
303 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
304 # 012 are skipped (in_skip)
305 # 345678 are copied in the file (in_skip + bytes_to_copy)
306 self.assertEqual(read[out_seek:],
307 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200308
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309# Test attributes on return values from os.*stat* family.
310class StatAttributeTests(unittest.TestCase):
311 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200312 self.fname = support.TESTFN
313 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100314 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315
Antoine Pitrou38425292010-09-21 18:19:07 +0000316 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000317 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000318
319 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000320 self.assertEqual(result[stat.ST_SIZE], 3)
321 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000323 # Make sure all the attributes are there
324 members = dir(result)
325 for name in dir(stat):
326 if name[:3] == 'ST_':
327 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000328 if name.endswith("TIME"):
329 def trunc(x): return int(x)
330 else:
331 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000334 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
Larry Hastings6fe20b32012-04-19 15:07:49 -0700336 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700337 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700338 for name in 'st_atime st_mtime st_ctime'.split():
339 floaty = int(getattr(result, name) * 100000)
340 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700341 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700342
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000343 try:
344 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200345 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000346 except IndexError:
347 pass
348
349 # Make sure that assignment fails
350 try:
351 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200352 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000353 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354 pass
355
356 try:
357 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200358 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000359 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000360 pass
361
362 try:
363 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200364 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 except AttributeError:
366 pass
367
368 # Use the stat_result constructor with a too-short tuple.
369 try:
370 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000372 except TypeError:
373 pass
374
Ezio Melotti42da6632011-03-15 05:18:48 +0200375 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000376 try:
377 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
378 except TypeError:
379 pass
380
Antoine Pitrou38425292010-09-21 18:19:07 +0000381 def test_stat_attributes(self):
382 self.check_stat_attributes(self.fname)
383
384 def test_stat_attributes_bytes(self):
385 try:
386 fname = self.fname.encode(sys.getfilesystemencoding())
387 except UnicodeEncodeError:
388 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700389 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000390
Christian Heimes25827622013-10-12 01:27:08 +0200391 def test_stat_result_pickle(self):
392 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200393 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
394 p = pickle.dumps(result, proto)
395 self.assertIn(b'stat_result', p)
396 if proto < 4:
397 self.assertIn(b'cos\nstat_result\n', p)
398 unpickled = pickle.loads(p)
399 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200400
Serhiy Storchaka43767632013-11-03 21:31:38 +0200401 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000402 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700403 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000404
405 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000406 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000407
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000408 # Make sure all the attributes are there.
409 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
410 'ffree', 'favail', 'flag', 'namemax')
411 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000412 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000413
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100414 self.assertTrue(isinstance(result.f_fsid, int))
415
416 # Test that the size of the tuple doesn't change
417 self.assertEqual(len(result), 10)
418
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000419 # Make sure that assignment really fails
420 try:
421 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200422 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000423 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000424 pass
425
426 try:
427 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200428 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000429 except AttributeError:
430 pass
431
432 # Use the constructor with a too-short tuple.
433 try:
434 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200435 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000436 except TypeError:
437 pass
438
Ezio Melotti42da6632011-03-15 05:18:48 +0200439 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000440 try:
441 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
442 except TypeError:
443 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000444
Christian Heimes25827622013-10-12 01:27:08 +0200445 @unittest.skipUnless(hasattr(os, 'statvfs'),
446 "need os.statvfs()")
447 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700448 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200449
Serhiy Storchakabad12572014-12-15 14:03:42 +0200450 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
451 p = pickle.dumps(result, proto)
452 self.assertIn(b'statvfs_result', p)
453 if proto < 4:
454 self.assertIn(b'cos\nstatvfs_result\n', p)
455 unpickled = pickle.loads(p)
456 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200457
Serhiy Storchaka43767632013-11-03 21:31:38 +0200458 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
459 def test_1686475(self):
460 # Verify that an open file can be stat'ed
461 try:
462 os.stat(r"c:\pagefile.sys")
463 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600464 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200465 except OSError as e:
466 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000467
Serhiy Storchaka43767632013-11-03 21:31:38 +0200468 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
469 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
470 def test_15261(self):
471 # Verify that stat'ing a closed fd does not cause crash
472 r, w = os.pipe()
473 try:
474 os.stat(r) # should not raise error
475 finally:
476 os.close(r)
477 os.close(w)
478 with self.assertRaises(OSError) as ctx:
479 os.stat(r)
480 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100481
Zachary Ware63f277b2014-06-19 09:46:37 -0500482 def check_file_attributes(self, result):
483 self.assertTrue(hasattr(result, 'st_file_attributes'))
484 self.assertTrue(isinstance(result.st_file_attributes, int))
485 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
486
487 @unittest.skipUnless(sys.platform == "win32",
488 "st_file_attributes is Win32 specific")
489 def test_file_attributes(self):
490 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
491 result = os.stat(self.fname)
492 self.check_file_attributes(result)
493 self.assertEqual(
494 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
495 0)
496
497 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200498 dirname = support.TESTFN + "dir"
499 os.mkdir(dirname)
500 self.addCleanup(os.rmdir, dirname)
501
502 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500503 self.check_file_attributes(result)
504 self.assertEqual(
505 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
506 stat.FILE_ATTRIBUTE_DIRECTORY)
507
Berker Peksag0b4dc482016-09-17 15:49:59 +0300508 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
509 def test_access_denied(self):
510 # Default to FindFirstFile WIN32_FIND_DATA when access is
511 # denied. See issue 28075.
512 # os.environ['TEMP'] should be located on a volume that
513 # supports file ACLs.
514 fname = os.path.join(os.environ['TEMP'], self.fname)
515 self.addCleanup(support.unlink, fname)
516 create_file(fname, b'ABC')
517 # Deny the right to [S]YNCHRONIZE on the file to
518 # force CreateFile to fail with ERROR_ACCESS_DENIED.
519 DETACHED_PROCESS = 8
520 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500521 # bpo-30584: Use security identifier *S-1-5-32-545 instead
522 # of localized "Users" to not depend on the locale.
523 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300524 creationflags=DETACHED_PROCESS
525 )
526 result = os.stat(fname)
527 self.assertNotEqual(result.st_size, 0)
528
Victor Stinner47aacc82015-06-12 17:26:23 +0200529
530class UtimeTests(unittest.TestCase):
531 def setUp(self):
532 self.dirname = support.TESTFN
533 self.fname = os.path.join(self.dirname, "f1")
534
535 self.addCleanup(support.rmtree, self.dirname)
536 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100537 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200538
Victor Stinner47aacc82015-06-12 17:26:23 +0200539 def support_subsecond(self, filename):
540 # Heuristic to check if the filesystem supports timestamp with
541 # subsecond resolution: check if float and int timestamps are different
542 st = os.stat(filename)
543 return ((st.st_atime != st[7])
544 or (st.st_mtime != st[8])
545 or (st.st_ctime != st[9]))
546
547 def _test_utime(self, set_time, filename=None):
548 if not filename:
549 filename = self.fname
550
551 support_subsecond = self.support_subsecond(filename)
552 if support_subsecond:
553 # Timestamp with a resolution of 1 microsecond (10^-6).
554 #
555 # The resolution of the C internal function used by os.utime()
556 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
557 # test with a resolution of 1 ns requires more work:
558 # see the issue #15745.
559 atime_ns = 1002003000 # 1.002003 seconds
560 mtime_ns = 4005006000 # 4.005006 seconds
561 else:
562 # use a resolution of 1 second
563 atime_ns = 5 * 10**9
564 mtime_ns = 8 * 10**9
565
566 set_time(filename, (atime_ns, mtime_ns))
567 st = os.stat(filename)
568
569 if support_subsecond:
570 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
571 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
572 else:
573 self.assertEqual(st.st_atime, atime_ns * 1e-9)
574 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
575 self.assertEqual(st.st_atime_ns, atime_ns)
576 self.assertEqual(st.st_mtime_ns, mtime_ns)
577
578 def test_utime(self):
579 def set_time(filename, ns):
580 # test the ns keyword parameter
581 os.utime(filename, ns=ns)
582 self._test_utime(set_time)
583
584 @staticmethod
585 def ns_to_sec(ns):
586 # Convert a number of nanosecond (int) to a number of seconds (float).
587 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
588 # issue, os.utime() rounds towards minus infinity.
589 return (ns * 1e-9) + 0.5e-9
590
591 def test_utime_by_indexed(self):
592 # pass times as floating point seconds as the second indexed parameter
593 def set_time(filename, ns):
594 atime_ns, mtime_ns = ns
595 atime = self.ns_to_sec(atime_ns)
596 mtime = self.ns_to_sec(mtime_ns)
597 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
598 # or utime(time_t)
599 os.utime(filename, (atime, mtime))
600 self._test_utime(set_time)
601
602 def test_utime_by_times(self):
603 def set_time(filename, ns):
604 atime_ns, mtime_ns = ns
605 atime = self.ns_to_sec(atime_ns)
606 mtime = self.ns_to_sec(mtime_ns)
607 # test the times keyword parameter
608 os.utime(filename, times=(atime, mtime))
609 self._test_utime(set_time)
610
611 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
612 "follow_symlinks support for utime required "
613 "for this test.")
614 def test_utime_nofollow_symlinks(self):
615 def set_time(filename, ns):
616 # use follow_symlinks=False to test utimensat(timespec)
617 # or lutimes(timeval)
618 os.utime(filename, ns=ns, follow_symlinks=False)
619 self._test_utime(set_time)
620
621 @unittest.skipUnless(os.utime in os.supports_fd,
622 "fd support for utime required for this test.")
623 def test_utime_fd(self):
624 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100625 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200626 # use a file descriptor to test futimens(timespec)
627 # or futimes(timeval)
628 os.utime(fp.fileno(), ns=ns)
629 self._test_utime(set_time)
630
631 @unittest.skipUnless(os.utime in os.supports_dir_fd,
632 "dir_fd support for utime required for this test.")
633 def test_utime_dir_fd(self):
634 def set_time(filename, ns):
635 dirname, name = os.path.split(filename)
636 dirfd = os.open(dirname, os.O_RDONLY)
637 try:
638 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
639 os.utime(name, dir_fd=dirfd, ns=ns)
640 finally:
641 os.close(dirfd)
642 self._test_utime(set_time)
643
644 def test_utime_directory(self):
645 def set_time(filename, ns):
646 # test calling os.utime() on a directory
647 os.utime(filename, ns=ns)
648 self._test_utime(set_time, filename=self.dirname)
649
650 def _test_utime_current(self, set_time):
651 # Get the system clock
652 current = time.time()
653
654 # Call os.utime() to set the timestamp to the current system clock
655 set_time(self.fname)
656
657 if not self.support_subsecond(self.fname):
658 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700659 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200660 # On Windows, the usual resolution of time.time() is 15.6 ms.
661 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700662 #
663 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
664 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200665 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200666 st = os.stat(self.fname)
667 msg = ("st_time=%r, current=%r, dt=%r"
668 % (st.st_mtime, current, st.st_mtime - current))
669 self.assertAlmostEqual(st.st_mtime, current,
670 delta=delta, msg=msg)
671
672 def test_utime_current(self):
673 def set_time(filename):
674 # Set to the current time in the new way
675 os.utime(self.fname)
676 self._test_utime_current(set_time)
677
678 def test_utime_current_old(self):
679 def set_time(filename):
680 # Set to the current time in the old explicit way.
681 os.utime(self.fname, None)
682 self._test_utime_current(set_time)
683
684 def get_file_system(self, path):
685 if sys.platform == 'win32':
686 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
687 import ctypes
688 kernel32 = ctypes.windll.kernel32
689 buf = ctypes.create_unicode_buffer("", 100)
690 ok = kernel32.GetVolumeInformationW(root, None, 0,
691 None, None, None,
692 buf, len(buf))
693 if ok:
694 return buf.value
695 # return None if the filesystem is unknown
696
697 def test_large_time(self):
698 # Many filesystems are limited to the year 2038. At least, the test
699 # pass with NTFS filesystem.
700 if self.get_file_system(self.dirname) != "NTFS":
701 self.skipTest("requires NTFS")
702
703 large = 5000000000 # some day in 2128
704 os.utime(self.fname, (large, large))
705 self.assertEqual(os.stat(self.fname).st_mtime, large)
706
707 def test_utime_invalid_arguments(self):
708 # seconds and nanoseconds parameters are mutually exclusive
709 with self.assertRaises(ValueError):
710 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200711 with self.assertRaises(TypeError):
712 os.utime(self.fname, [5, 5])
713 with self.assertRaises(TypeError):
714 os.utime(self.fname, (5,))
715 with self.assertRaises(TypeError):
716 os.utime(self.fname, (5, 5, 5))
717 with self.assertRaises(TypeError):
718 os.utime(self.fname, ns=[5, 5])
719 with self.assertRaises(TypeError):
720 os.utime(self.fname, ns=(5,))
721 with self.assertRaises(TypeError):
722 os.utime(self.fname, ns=(5, 5, 5))
723
724 if os.utime not in os.supports_follow_symlinks:
725 with self.assertRaises(NotImplementedError):
726 os.utime(self.fname, (5, 5), follow_symlinks=False)
727 if os.utime not in os.supports_fd:
728 with open(self.fname, 'wb', 0) as fp:
729 with self.assertRaises(TypeError):
730 os.utime(fp.fileno(), (5, 5))
731 if os.utime not in os.supports_dir_fd:
732 with self.assertRaises(NotImplementedError):
733 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200734
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300735 @support.cpython_only
736 def test_issue31577(self):
737 # The interpreter shouldn't crash in case utime() received a bad
738 # ns argument.
739 def get_bad_int(divmod_ret_val):
740 class BadInt:
741 def __divmod__(*args):
742 return divmod_ret_val
743 return BadInt()
744 with self.assertRaises(TypeError):
745 os.utime(self.fname, ns=(get_bad_int(42), 1))
746 with self.assertRaises(TypeError):
747 os.utime(self.fname, ns=(get_bad_int(()), 1))
748 with self.assertRaises(TypeError):
749 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
750
Victor Stinner47aacc82015-06-12 17:26:23 +0200751
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000752from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000753
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000754class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000755 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000756 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000757
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000758 def setUp(self):
759 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000760 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000761 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000762 for key, value in self._reference().items():
763 os.environ[key] = value
764
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000765 def tearDown(self):
766 os.environ.clear()
767 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000768 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000769 os.environb.clear()
770 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000771
Christian Heimes90333392007-11-01 19:08:42 +0000772 def _reference(self):
773 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
774
775 def _empty_mapping(self):
776 os.environ.clear()
777 return os.environ
778
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000779 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200780 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
781 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000782 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000783 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300784 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200785 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300786 value = popen.read().strip()
787 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000788
Xavier de Gayed1415312016-07-22 12:15:29 +0200789 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
790 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000791 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200792 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
793 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300794 it = iter(popen)
795 self.assertEqual(next(it), "line1\n")
796 self.assertEqual(next(it), "line2\n")
797 self.assertEqual(next(it), "line3\n")
798 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000799
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000800 # Verify environ keys and values from the OS are of the
801 # correct str type.
802 def test_keyvalue_types(self):
803 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(type(key), str)
805 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000806
Christian Heimes90333392007-11-01 19:08:42 +0000807 def test_items(self):
808 for key, value in self._reference().items():
809 self.assertEqual(os.environ.get(key), value)
810
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000811 # Issue 7310
812 def test___repr__(self):
813 """Check that the repr() of os.environ looks like environ({...})."""
814 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000815 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
816 '{!r}: {!r}'.format(key, value)
817 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000818
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000819 def test_get_exec_path(self):
820 defpath_list = os.defpath.split(os.pathsep)
821 test_path = ['/monty', '/python', '', '/flying/circus']
822 test_env = {'PATH': os.pathsep.join(test_path)}
823
824 saved_environ = os.environ
825 try:
826 os.environ = dict(test_env)
827 # Test that defaulting to os.environ works.
828 self.assertSequenceEqual(test_path, os.get_exec_path())
829 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
830 finally:
831 os.environ = saved_environ
832
833 # No PATH environment variable
834 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
835 # Empty PATH environment variable
836 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
837 # Supplied PATH environment variable
838 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
839
Victor Stinnerb745a742010-05-18 17:17:23 +0000840 if os.supports_bytes_environ:
841 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000842 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000843 # ignore BytesWarning warning
844 with warnings.catch_warnings(record=True):
845 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000846 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000847 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000848 pass
849 else:
850 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000851
852 # bytes key and/or value
853 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
854 ['abc'])
855 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
856 ['abc'])
857 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
858 ['abc'])
859
860 @unittest.skipUnless(os.supports_bytes_environ,
861 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000862 def test_environb(self):
863 # os.environ -> os.environb
864 value = 'euro\u20ac'
865 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000866 value_bytes = value.encode(sys.getfilesystemencoding(),
867 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000868 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000869 msg = "U+20AC character is not encodable to %s" % (
870 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000871 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000872 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(os.environ['unicode'], value)
874 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000875
876 # os.environb -> os.environ
877 value = b'\xff'
878 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000880 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000881 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000882
Victor Stinner13ff2452018-01-22 18:32:50 +0100883 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100884 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100885 def test_unset_error(self):
886 if sys.platform == "win32":
887 # an environment variable is limited to 32,767 characters
888 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100889 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100890 else:
891 # "=" is not allowed in a variable name
892 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100893 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100894
Victor Stinner6d101392013-04-14 16:35:04 +0200895 def test_key_type(self):
896 missing = 'missingkey'
897 self.assertNotIn(missing, os.environ)
898
Victor Stinner839e5ea2013-04-14 16:43:03 +0200899 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200900 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200901 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200902 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200903
Victor Stinner839e5ea2013-04-14 16:43:03 +0200904 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200905 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200906 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200907 self.assertTrue(cm.exception.__suppress_context__)
908
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300909 def _test_environ_iteration(self, collection):
910 iterator = iter(collection)
911 new_key = "__new_key__"
912
913 next(iterator) # start iteration over os.environ.items
914
915 # add a new key in os.environ mapping
916 os.environ[new_key] = "test_environ_iteration"
917
918 try:
919 next(iterator) # force iteration over modified mapping
920 self.assertEqual(os.environ[new_key], "test_environ_iteration")
921 finally:
922 del os.environ[new_key]
923
924 def test_iter_error_when_changing_os_environ(self):
925 self._test_environ_iteration(os.environ)
926
927 def test_iter_error_when_changing_os_environ_items(self):
928 self._test_environ_iteration(os.environ.items())
929
930 def test_iter_error_when_changing_os_environ_values(self):
931 self._test_environ_iteration(os.environ.values())
932
Victor Stinner6d101392013-04-14 16:35:04 +0200933
Tim Petersc4e09402003-04-25 07:11:48 +0000934class WalkTests(unittest.TestCase):
935 """Tests for os.walk()."""
936
Victor Stinner0561c532015-03-12 10:28:24 +0100937 # Wrapper to hide minor differences between os.walk and os.fwalk
938 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200939 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200940 if 'follow_symlinks' in kwargs:
941 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200942 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100943
Charles-François Natali7372b062012-02-05 15:15:38 +0100944 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100945 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100946 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000947
948 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000949 # TESTFN/
950 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000951 # tmp1
952 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000953 # tmp2
954 # SUB11/ no kids
955 # SUB2/ a file kid and a dirsymlink kid
956 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300957 # SUB21/ not readable
958 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000959 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200960 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300961 # broken_link2
962 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000963 # TEST2/
964 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100965 self.walk_path = join(support.TESTFN, "TEST1")
966 self.sub1_path = join(self.walk_path, "SUB1")
967 self.sub11_path = join(self.sub1_path, "SUB11")
968 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300969 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100970 tmp1_path = join(self.walk_path, "tmp1")
971 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000972 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300973 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100974 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000975 t2_path = join(support.TESTFN, "TEST2")
976 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200977 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300978 broken_link2_path = join(sub2_path, "broken_link2")
979 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000980
981 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100982 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000983 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300984 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000985 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100986
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300987 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100988 with open(path, "x") as f:
989 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000990
Victor Stinner0561c532015-03-12 10:28:24 +0100991 if support.can_symlink():
992 os.symlink(os.path.abspath(t2_path), self.link_path)
993 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300994 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
995 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300996 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300997 ["broken_link", "broken_link2", "broken_link3",
998 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100999 else:
pxinwr3e028b22019-02-15 13:04:47 +08001000 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001001
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001002 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001003 try:
1004 os.listdir(sub21_path)
1005 except PermissionError:
1006 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1007 else:
1008 os.chmod(sub21_path, stat.S_IRWXU)
1009 os.unlink(tmp5_path)
1010 os.rmdir(sub21_path)
1011 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001012
Victor Stinner0561c532015-03-12 10:28:24 +01001013 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001014 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001015 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001016
Tim Petersc4e09402003-04-25 07:11:48 +00001017 self.assertEqual(len(all), 4)
1018 # We can't know which order SUB1 and SUB2 will appear in.
1019 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1020 # flipped: TESTFN, SUB2, SUB1, SUB11
1021 flipped = all[0][1][0] != "SUB1"
1022 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001023 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001024 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001025 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1026 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1027 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1028 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001029
Brett Cannon3f9183b2016-08-26 14:44:48 -07001030 def test_walk_prune(self, walk_path=None):
1031 if walk_path is None:
1032 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001033 # Prune the search.
1034 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001035 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001036 all.append((root, dirs, files))
1037 # Don't descend into SUB1.
1038 if 'SUB1' in dirs:
1039 # Note that this also mutates the dirs we appended to all!
1040 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001041
Victor Stinner0561c532015-03-12 10:28:24 +01001042 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001043 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001044
1045 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001046 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001047 self.assertEqual(all[1], self.sub2_tree)
1048
Brett Cannon3f9183b2016-08-26 14:44:48 -07001049 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001050 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001051
Victor Stinner0561c532015-03-12 10:28:24 +01001052 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001053 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001054 all = list(self.walk(self.walk_path, topdown=False))
1055
Victor Stinner53b0a412016-03-26 01:12:36 +01001056 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001057 # We can't know which order SUB1 and SUB2 will appear in.
1058 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1059 # flipped: SUB2, SUB11, SUB1, TESTFN
1060 flipped = all[3][1][0] != "SUB1"
1061 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001062 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001063 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001064 self.assertEqual(all[3],
1065 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1066 self.assertEqual(all[flipped],
1067 (self.sub11_path, [], []))
1068 self.assertEqual(all[flipped + 1],
1069 (self.sub1_path, ["SUB11"], ["tmp2"]))
1070 self.assertEqual(all[2 - 2 * flipped],
1071 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001072
Victor Stinner0561c532015-03-12 10:28:24 +01001073 def test_walk_symlink(self):
1074 if not support.can_symlink():
1075 self.skipTest("need symlink support")
1076
1077 # Walk, following symlinks.
1078 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1079 for root, dirs, files in walk_it:
1080 if root == self.link_path:
1081 self.assertEqual(dirs, [])
1082 self.assertEqual(files, ["tmp4"])
1083 break
1084 else:
1085 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001086
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001087 def test_walk_bad_dir(self):
1088 # Walk top-down.
1089 errors = []
1090 walk_it = self.walk(self.walk_path, onerror=errors.append)
1091 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001092 self.assertEqual(errors, [])
1093 dir1 = 'SUB1'
1094 path1 = os.path.join(root, dir1)
1095 path1new = os.path.join(root, dir1 + '.new')
1096 os.rename(path1, path1new)
1097 try:
1098 roots = [r for r, d, f in walk_it]
1099 self.assertTrue(errors)
1100 self.assertNotIn(path1, roots)
1101 self.assertNotIn(path1new, roots)
1102 for dir2 in dirs:
1103 if dir2 != dir1:
1104 self.assertIn(os.path.join(root, dir2), roots)
1105 finally:
1106 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001107
Charles-François Natali7372b062012-02-05 15:15:38 +01001108
1109@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1110class FwalkTests(WalkTests):
1111 """Tests for os.fwalk()."""
1112
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001113 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001114 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001115 yield (root, dirs, files)
1116
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001117 def fwalk(self, *args, **kwargs):
1118 return os.fwalk(*args, **kwargs)
1119
Larry Hastingsc48fe982012-06-25 04:49:05 -07001120 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1121 """
1122 compare with walk() results.
1123 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001124 walk_kwargs = walk_kwargs.copy()
1125 fwalk_kwargs = fwalk_kwargs.copy()
1126 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1127 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1128 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001129
Charles-François Natali7372b062012-02-05 15:15:38 +01001130 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001131 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001132 expected[root] = (set(dirs), set(files))
1133
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001134 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001135 self.assertIn(root, expected)
1136 self.assertEqual(expected[root], (set(dirs), set(files)))
1137
Larry Hastingsc48fe982012-06-25 04:49:05 -07001138 def test_compare_to_walk(self):
1139 kwargs = {'top': support.TESTFN}
1140 self._compare_to_walk(kwargs, kwargs)
1141
Charles-François Natali7372b062012-02-05 15:15:38 +01001142 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001143 try:
1144 fd = os.open(".", os.O_RDONLY)
1145 walk_kwargs = {'top': support.TESTFN}
1146 fwalk_kwargs = walk_kwargs.copy()
1147 fwalk_kwargs['dir_fd'] = fd
1148 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1149 finally:
1150 os.close(fd)
1151
1152 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001153 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001154 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1155 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001156 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001157 # check that the FD is valid
1158 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001159 # redundant check
1160 os.stat(rootfd)
1161 # check that listdir() returns consistent information
1162 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001163
1164 def test_fd_leak(self):
1165 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1166 # we both check that calling fwalk() a large number of times doesn't
1167 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1168 minfd = os.dup(1)
1169 os.close(minfd)
1170 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001171 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001172 pass
1173 newfd = os.dup(1)
1174 self.addCleanup(os.close, newfd)
1175 self.assertEqual(newfd, minfd)
1176
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001177class BytesWalkTests(WalkTests):
1178 """Tests for os.walk() with bytes."""
1179 def walk(self, top, **kwargs):
1180 if 'follow_symlinks' in kwargs:
1181 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1182 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1183 root = os.fsdecode(broot)
1184 dirs = list(map(os.fsdecode, bdirs))
1185 files = list(map(os.fsdecode, bfiles))
1186 yield (root, dirs, files)
1187 bdirs[:] = list(map(os.fsencode, dirs))
1188 bfiles[:] = list(map(os.fsencode, files))
1189
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001190@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1191class BytesFwalkTests(FwalkTests):
1192 """Tests for os.walk() with bytes."""
1193 def fwalk(self, top='.', *args, **kwargs):
1194 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1195 root = os.fsdecode(broot)
1196 dirs = list(map(os.fsdecode, bdirs))
1197 files = list(map(os.fsdecode, bfiles))
1198 yield (root, dirs, files, topfd)
1199 bdirs[:] = list(map(os.fsencode, dirs))
1200 bfiles[:] = list(map(os.fsencode, files))
1201
Charles-François Natali7372b062012-02-05 15:15:38 +01001202
Guido van Rossume7ba4952007-06-06 23:52:48 +00001203class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001204 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001205 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001206
1207 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001208 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001209 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1210 os.makedirs(path) # Should work
1211 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1212 os.makedirs(path)
1213
1214 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001215 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001216 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1217 os.makedirs(path)
1218 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1219 'dir5', 'dir6')
1220 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001221
Serhiy Storchakae304e332017-03-24 13:27:42 +02001222 def test_mode(self):
1223 with support.temp_umask(0o002):
1224 base = support.TESTFN
1225 parent = os.path.join(base, 'dir1')
1226 path = os.path.join(parent, 'dir2')
1227 os.makedirs(path, 0o555)
1228 self.assertTrue(os.path.exists(path))
1229 self.assertTrue(os.path.isdir(path))
1230 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001231 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1232 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001233
Terry Reedy5a22b652010-12-02 07:05:56 +00001234 def test_exist_ok_existing_directory(self):
1235 path = os.path.join(support.TESTFN, 'dir1')
1236 mode = 0o777
1237 old_mask = os.umask(0o022)
1238 os.makedirs(path, mode)
1239 self.assertRaises(OSError, os.makedirs, path, mode)
1240 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001241 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001242 os.makedirs(path, mode=mode, exist_ok=True)
1243 os.umask(old_mask)
1244
Martin Pantera82642f2015-11-19 04:48:44 +00001245 # Issue #25583: A drive root could raise PermissionError on Windows
1246 os.makedirs(os.path.abspath('/'), exist_ok=True)
1247
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001248 def test_exist_ok_s_isgid_directory(self):
1249 path = os.path.join(support.TESTFN, 'dir1')
1250 S_ISGID = stat.S_ISGID
1251 mode = 0o777
1252 old_mask = os.umask(0o022)
1253 try:
1254 existing_testfn_mode = stat.S_IMODE(
1255 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001256 try:
1257 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001258 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001259 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001260 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1261 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1262 # The os should apply S_ISGID from the parent dir for us, but
1263 # this test need not depend on that behavior. Be explicit.
1264 os.makedirs(path, mode | S_ISGID)
1265 # http://bugs.python.org/issue14992
1266 # Should not fail when the bit is already set.
1267 os.makedirs(path, mode, exist_ok=True)
1268 # remove the bit.
1269 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001270 # May work even when the bit is not already set when demanded.
1271 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001272 finally:
1273 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001274
1275 def test_exist_ok_existing_regular_file(self):
1276 base = support.TESTFN
1277 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001278 with open(path, 'w') as f:
1279 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001280 self.assertRaises(OSError, os.makedirs, path)
1281 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1282 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1283 os.remove(path)
1284
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001285 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001286 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001287 'dir4', 'dir5', 'dir6')
1288 # If the tests failed, the bottom-most directory ('../dir6')
1289 # may not have been created, so we look for the outermost directory
1290 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001291 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001292 path = os.path.dirname(path)
1293
1294 os.removedirs(path)
1295
Andrew Svetlov405faed2012-12-25 12:18:09 +02001296
R David Murrayf2ad1732014-12-25 18:36:56 -05001297@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1298class ChownFileTests(unittest.TestCase):
1299
Berker Peksag036a71b2015-07-21 09:29:48 +03001300 @classmethod
1301 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001302 os.mkdir(support.TESTFN)
1303
1304 def test_chown_uid_gid_arguments_must_be_index(self):
1305 stat = os.stat(support.TESTFN)
1306 uid = stat.st_uid
1307 gid = stat.st_gid
1308 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1309 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1310 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1311 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1312 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1313
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001314 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1315 def test_chown_gid(self):
1316 groups = os.getgroups()
1317 if len(groups) < 2:
1318 self.skipTest("test needs at least 2 groups")
1319
R David Murrayf2ad1732014-12-25 18:36:56 -05001320 gid_1, gid_2 = groups[:2]
1321 uid = os.stat(support.TESTFN).st_uid
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001322
R David Murrayf2ad1732014-12-25 18:36:56 -05001323 os.chown(support.TESTFN, uid, gid_1)
1324 gid = os.stat(support.TESTFN).st_gid
1325 self.assertEqual(gid, gid_1)
Victor Stinnerd7c87d92019-06-25 17:06:24 +02001326
R David Murrayf2ad1732014-12-25 18:36:56 -05001327 os.chown(support.TESTFN, uid, gid_2)
1328 gid = os.stat(support.TESTFN).st_gid
1329 self.assertEqual(gid, gid_2)
1330
1331 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1332 "test needs root privilege and more than one user")
1333 def test_chown_with_root(self):
1334 uid_1, uid_2 = all_users[:2]
1335 gid = os.stat(support.TESTFN).st_gid
1336 os.chown(support.TESTFN, uid_1, gid)
1337 uid = os.stat(support.TESTFN).st_uid
1338 self.assertEqual(uid, uid_1)
1339 os.chown(support.TESTFN, uid_2, gid)
1340 uid = os.stat(support.TESTFN).st_uid
1341 self.assertEqual(uid, uid_2)
1342
1343 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1344 "test needs non-root account and more than one user")
1345 def test_chown_without_permission(self):
1346 uid_1, uid_2 = all_users[:2]
1347 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001348 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001349 os.chown(support.TESTFN, uid_1, gid)
1350 os.chown(support.TESTFN, uid_2, gid)
1351
Berker Peksag036a71b2015-07-21 09:29:48 +03001352 @classmethod
1353 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001354 os.rmdir(support.TESTFN)
1355
1356
Andrew Svetlov405faed2012-12-25 12:18:09 +02001357class RemoveDirsTests(unittest.TestCase):
1358 def setUp(self):
1359 os.makedirs(support.TESTFN)
1360
1361 def tearDown(self):
1362 support.rmtree(support.TESTFN)
1363
1364 def test_remove_all(self):
1365 dira = os.path.join(support.TESTFN, 'dira')
1366 os.mkdir(dira)
1367 dirb = os.path.join(dira, 'dirb')
1368 os.mkdir(dirb)
1369 os.removedirs(dirb)
1370 self.assertFalse(os.path.exists(dirb))
1371 self.assertFalse(os.path.exists(dira))
1372 self.assertFalse(os.path.exists(support.TESTFN))
1373
1374 def test_remove_partial(self):
1375 dira = os.path.join(support.TESTFN, 'dira')
1376 os.mkdir(dira)
1377 dirb = os.path.join(dira, 'dirb')
1378 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001379 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001380 os.removedirs(dirb)
1381 self.assertFalse(os.path.exists(dirb))
1382 self.assertTrue(os.path.exists(dira))
1383 self.assertTrue(os.path.exists(support.TESTFN))
1384
1385 def test_remove_nothing(self):
1386 dira = os.path.join(support.TESTFN, 'dira')
1387 os.mkdir(dira)
1388 dirb = os.path.join(dira, 'dirb')
1389 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001390 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001391 with self.assertRaises(OSError):
1392 os.removedirs(dirb)
1393 self.assertTrue(os.path.exists(dirb))
1394 self.assertTrue(os.path.exists(dira))
1395 self.assertTrue(os.path.exists(support.TESTFN))
1396
1397
Guido van Rossume7ba4952007-06-06 23:52:48 +00001398class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001399 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001400 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001401 f.write(b'hello')
1402 f.close()
1403 with open(os.devnull, 'rb') as f:
1404 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001405
Andrew Svetlov405faed2012-12-25 12:18:09 +02001406
Guido van Rossume7ba4952007-06-06 23:52:48 +00001407class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001408 def test_urandom_length(self):
1409 self.assertEqual(len(os.urandom(0)), 0)
1410 self.assertEqual(len(os.urandom(1)), 1)
1411 self.assertEqual(len(os.urandom(10)), 10)
1412 self.assertEqual(len(os.urandom(100)), 100)
1413 self.assertEqual(len(os.urandom(1000)), 1000)
1414
1415 def test_urandom_value(self):
1416 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001417 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001418 data2 = os.urandom(16)
1419 self.assertNotEqual(data1, data2)
1420
1421 def get_urandom_subprocess(self, count):
1422 code = '\n'.join((
1423 'import os, sys',
1424 'data = os.urandom(%s)' % count,
1425 'sys.stdout.buffer.write(data)',
1426 'sys.stdout.buffer.flush()'))
1427 out = assert_python_ok('-c', code)
1428 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001429 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001430 return stdout
1431
1432 def test_urandom_subprocess(self):
1433 data1 = self.get_urandom_subprocess(16)
1434 data2 = self.get_urandom_subprocess(16)
1435 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001436
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001437
Victor Stinner9b1f4742016-09-06 16:18:52 -07001438@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1439class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001440 @classmethod
1441 def setUpClass(cls):
1442 try:
1443 os.getrandom(1)
1444 except OSError as exc:
1445 if exc.errno == errno.ENOSYS:
1446 # Python compiled on a more recent Linux version
1447 # than the current Linux kernel
1448 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1449 else:
1450 raise
1451
Victor Stinner9b1f4742016-09-06 16:18:52 -07001452 def test_getrandom_type(self):
1453 data = os.getrandom(16)
1454 self.assertIsInstance(data, bytes)
1455 self.assertEqual(len(data), 16)
1456
1457 def test_getrandom0(self):
1458 empty = os.getrandom(0)
1459 self.assertEqual(empty, b'')
1460
1461 def test_getrandom_random(self):
1462 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1463
1464 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1465 # resource /dev/random
1466
1467 def test_getrandom_nonblock(self):
1468 # The call must not fail. Check also that the flag exists
1469 try:
1470 os.getrandom(1, os.GRND_NONBLOCK)
1471 except BlockingIOError:
1472 # System urandom is not initialized yet
1473 pass
1474
1475 def test_getrandom_value(self):
1476 data1 = os.getrandom(16)
1477 data2 = os.getrandom(16)
1478 self.assertNotEqual(data1, data2)
1479
1480
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001481# os.urandom() doesn't use a file descriptor when it is implemented with the
1482# getentropy() function, the getrandom() function or the getrandom() syscall
1483OS_URANDOM_DONT_USE_FD = (
1484 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1485 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1486 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001487
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001488@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1489 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001490@unittest.skipIf(sys.platform == "vxworks",
1491 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001492class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001493 @unittest.skipUnless(resource, "test requires the resource module")
1494 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001495 # Check urandom() failing when it is not able to open /dev/random.
1496 # We spawn a new process to make the test more robust (if getrlimit()
1497 # failed to restore the file descriptor limit after this, the whole
1498 # test suite would crash; this actually happened on the OS X Tiger
1499 # buildbot).
1500 code = """if 1:
1501 import errno
1502 import os
1503 import resource
1504
1505 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1506 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1507 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001508 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001509 except OSError as e:
1510 assert e.errno == errno.EMFILE, e.errno
1511 else:
1512 raise AssertionError("OSError not raised")
1513 """
1514 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001515
Antoine Pitroue472aea2014-04-26 14:33:03 +02001516 def test_urandom_fd_closed(self):
1517 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1518 # closed.
1519 code = """if 1:
1520 import os
1521 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001522 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001523 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001524 with test.support.SuppressCrashReport():
1525 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001526 sys.stdout.buffer.write(os.urandom(4))
1527 """
1528 rc, out, err = assert_python_ok('-Sc', code)
1529
1530 def test_urandom_fd_reopened(self):
1531 # Issue #21207: urandom() should detect its fd to /dev/urandom
1532 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001533 self.addCleanup(support.unlink, support.TESTFN)
1534 create_file(support.TESTFN, b"x" * 256)
1535
Antoine Pitroue472aea2014-04-26 14:33:03 +02001536 code = """if 1:
1537 import os
1538 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001539 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001540 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001541 with test.support.SuppressCrashReport():
1542 for fd in range(3, 256):
1543 try:
1544 os.close(fd)
1545 except OSError:
1546 pass
1547 else:
1548 # Found the urandom fd (XXX hopefully)
1549 break
1550 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001551 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001552 new_fd = f.fileno()
1553 # Issue #26935: posix allows new_fd and fd to be equal but
1554 # some libc implementations have dup2 return an error in this
1555 # case.
1556 if new_fd != fd:
1557 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001558 sys.stdout.buffer.write(os.urandom(4))
1559 sys.stdout.buffer.write(os.urandom(4))
1560 """.format(TESTFN=support.TESTFN)
1561 rc, out, err = assert_python_ok('-Sc', code)
1562 self.assertEqual(len(out), 8)
1563 self.assertNotEqual(out[0:4], out[4:8])
1564 rc, out2, err2 = assert_python_ok('-Sc', code)
1565 self.assertEqual(len(out2), 8)
1566 self.assertNotEqual(out2, out)
1567
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001568
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001569@contextlib.contextmanager
1570def _execvpe_mockup(defpath=None):
1571 """
1572 Stubs out execv and execve functions when used as context manager.
1573 Records exec calls. The mock execv and execve functions always raise an
1574 exception as they would normally never return.
1575 """
1576 # A list of tuples containing (function name, first arg, args)
1577 # of calls to execv or execve that have been made.
1578 calls = []
1579
1580 def mock_execv(name, *args):
1581 calls.append(('execv', name, args))
1582 raise RuntimeError("execv called")
1583
1584 def mock_execve(name, *args):
1585 calls.append(('execve', name, args))
1586 raise OSError(errno.ENOTDIR, "execve called")
1587
1588 try:
1589 orig_execv = os.execv
1590 orig_execve = os.execve
1591 orig_defpath = os.defpath
1592 os.execv = mock_execv
1593 os.execve = mock_execve
1594 if defpath is not None:
1595 os.defpath = defpath
1596 yield calls
1597 finally:
1598 os.execv = orig_execv
1599 os.execve = orig_execve
1600 os.defpath = orig_defpath
1601
pxinwrf2d7ac72019-05-21 18:46:37 +08001602@unittest.skipUnless(hasattr(os, 'execv'),
1603 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001604class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001605 @unittest.skipIf(USING_LINUXTHREADS,
1606 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001607 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001608 self.assertRaises(OSError, os.execvpe, 'no such app-',
1609 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001610
Steve Dowerbce26262016-11-19 19:17:26 -08001611 def test_execv_with_bad_arglist(self):
1612 self.assertRaises(ValueError, os.execv, 'notepad', ())
1613 self.assertRaises(ValueError, os.execv, 'notepad', [])
1614 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1615 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1616
Thomas Heller6790d602007-08-30 17:15:14 +00001617 def test_execvpe_with_bad_arglist(self):
1618 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001619 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1620 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001621
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001622 @unittest.skipUnless(hasattr(os, '_execvpe'),
1623 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001624 def _test_internal_execvpe(self, test_type):
1625 program_path = os.sep + 'absolutepath'
1626 if test_type is bytes:
1627 program = b'executable'
1628 fullpath = os.path.join(os.fsencode(program_path), program)
1629 native_fullpath = fullpath
1630 arguments = [b'progname', 'arg1', 'arg2']
1631 else:
1632 program = 'executable'
1633 arguments = ['progname', 'arg1', 'arg2']
1634 fullpath = os.path.join(program_path, program)
1635 if os.name != "nt":
1636 native_fullpath = os.fsencode(fullpath)
1637 else:
1638 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001639 env = {'spam': 'beans'}
1640
Victor Stinnerb745a742010-05-18 17:17:23 +00001641 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001642 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001643 self.assertRaises(RuntimeError,
1644 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001645 self.assertEqual(len(calls), 1)
1646 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1647
Victor Stinnerb745a742010-05-18 17:17:23 +00001648 # test os._execvpe() with a relative path:
1649 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001650 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001651 self.assertRaises(OSError,
1652 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001653 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001654 self.assertSequenceEqual(calls[0],
1655 ('execve', native_fullpath, (arguments, env)))
1656
1657 # test os._execvpe() with a relative path:
1658 # os.get_exec_path() reads the 'PATH' variable
1659 with _execvpe_mockup() as calls:
1660 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001661 if test_type is bytes:
1662 env_path[b'PATH'] = program_path
1663 else:
1664 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001665 self.assertRaises(OSError,
1666 os._execvpe, program, arguments, env=env_path)
1667 self.assertEqual(len(calls), 1)
1668 self.assertSequenceEqual(calls[0],
1669 ('execve', native_fullpath, (arguments, env_path)))
1670
1671 def test_internal_execvpe_str(self):
1672 self._test_internal_execvpe(str)
1673 if os.name != "nt":
1674 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001675
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001676 def test_execve_invalid_env(self):
1677 args = [sys.executable, '-c', 'pass']
1678
Ville Skyttä49b27342017-08-03 09:00:59 +03001679 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001680 newenv = os.environ.copy()
1681 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1682 with self.assertRaises(ValueError):
1683 os.execve(args[0], args, newenv)
1684
Ville Skyttä49b27342017-08-03 09:00:59 +03001685 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001686 newenv = os.environ.copy()
1687 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1688 with self.assertRaises(ValueError):
1689 os.execve(args[0], args, newenv)
1690
Ville Skyttä49b27342017-08-03 09:00:59 +03001691 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001692 newenv = os.environ.copy()
1693 newenv["FRUIT=ORANGE"] = "lemon"
1694 with self.assertRaises(ValueError):
1695 os.execve(args[0], args, newenv)
1696
Alexey Izbyshev83460312018-10-20 03:28:22 +03001697 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1698 def test_execve_with_empty_path(self):
1699 # bpo-32890: Check GetLastError() misuse
1700 try:
1701 os.execve('', ['arg'], {})
1702 except OSError as e:
1703 self.assertTrue(e.winerror is None or e.winerror != 0)
1704 else:
1705 self.fail('No OSError raised')
1706
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001707
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001709class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001710 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001711 try:
1712 os.stat(support.TESTFN)
1713 except FileNotFoundError:
1714 exists = False
1715 except OSError as exc:
1716 exists = True
1717 self.fail("file %s must not exist; os.stat failed with %s"
1718 % (support.TESTFN, exc))
1719 else:
1720 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001721
Thomas Wouters477c8d52006-05-27 19:21:47 +00001722 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001723 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001724
1725 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001726 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001727
1728 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001729 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001730
1731 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001732 self.addCleanup(support.unlink, support.TESTFN)
1733
Victor Stinnere77c9742016-03-25 10:28:23 +01001734 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001735 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001736
1737 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001738 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001739
Thomas Wouters477c8d52006-05-27 19:21:47 +00001740 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001741 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001742
Victor Stinnere77c9742016-03-25 10:28:23 +01001743
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001744class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001745 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001746 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1747 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001748 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001749 def get_single(f):
1750 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001751 if hasattr(os, f):
1752 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001753 return helper
1754 for f in singles:
1755 locals()["test_"+f] = get_single(f)
1756
Benjamin Peterson7522c742009-01-19 21:00:09 +00001757 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001758 try:
1759 f(support.make_bad_fd(), *args)
1760 except OSError as e:
1761 self.assertEqual(e.errno, errno.EBADF)
1762 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001763 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001764 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001765
Serhiy Storchaka43767632013-11-03 21:31:38 +02001766 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001767 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001768 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001769
Serhiy Storchaka43767632013-11-03 21:31:38 +02001770 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001771 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001772 fd = support.make_bad_fd()
1773 # Make sure none of the descriptors we are about to close are
1774 # currently valid (issue 6542).
1775 for i in range(10):
1776 try: os.fstat(fd+i)
1777 except OSError:
1778 pass
1779 else:
1780 break
1781 if i < 2:
1782 raise unittest.SkipTest(
1783 "Unable to acquire a range of invalid file descriptors")
1784 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001785
Serhiy Storchaka43767632013-11-03 21:31:38 +02001786 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001787 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001788 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001789
Serhiy Storchaka43767632013-11-03 21:31:38 +02001790 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001791 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001792 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001793
Serhiy Storchaka43767632013-11-03 21:31:38 +02001794 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001795 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001796 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001797
Serhiy Storchaka43767632013-11-03 21:31:38 +02001798 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001799 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001800 self.check(os.pathconf, "PC_NAME_MAX")
1801 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001802
Serhiy Storchaka43767632013-11-03 21:31:38 +02001803 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001804 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001805 self.check(os.truncate, 0)
1806 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001807
Serhiy Storchaka43767632013-11-03 21:31:38 +02001808 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001809 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001810 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001811
Serhiy Storchaka43767632013-11-03 21:31:38 +02001812 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001813 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001814 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001815
Victor Stinner57ddf782014-01-08 15:21:28 +01001816 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1817 def test_readv(self):
1818 buf = bytearray(10)
1819 self.check(os.readv, [buf])
1820
Serhiy Storchaka43767632013-11-03 21:31:38 +02001821 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001822 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001823 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001824
Serhiy Storchaka43767632013-11-03 21:31:38 +02001825 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001826 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001827 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001828
Victor Stinner57ddf782014-01-08 15:21:28 +01001829 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1830 def test_writev(self):
1831 self.check(os.writev, [b'abc'])
1832
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001833 def test_inheritable(self):
1834 self.check(os.get_inheritable)
1835 self.check(os.set_inheritable, True)
1836
1837 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1838 'needs os.get_blocking() and os.set_blocking()')
1839 def test_blocking(self):
1840 self.check(os.get_blocking)
1841 self.check(os.set_blocking, True)
1842
Brian Curtin1b9df392010-11-24 20:24:31 +00001843
1844class LinkTests(unittest.TestCase):
1845 def setUp(self):
1846 self.file1 = support.TESTFN
1847 self.file2 = os.path.join(support.TESTFN + "2")
1848
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001849 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001850 for file in (self.file1, self.file2):
1851 if os.path.exists(file):
1852 os.unlink(file)
1853
Brian Curtin1b9df392010-11-24 20:24:31 +00001854 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001855 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001856
xdegaye6a55d092017-11-12 17:57:04 +01001857 try:
1858 os.link(file1, file2)
1859 except PermissionError as e:
1860 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001861 with open(file1, "r") as f1, open(file2, "r") as f2:
1862 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1863
1864 def test_link(self):
1865 self._test_link(self.file1, self.file2)
1866
1867 def test_link_bytes(self):
1868 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1869 bytes(self.file2, sys.getfilesystemencoding()))
1870
Brian Curtinf498b752010-11-30 15:54:04 +00001871 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001872 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001873 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001874 except UnicodeError:
1875 raise unittest.SkipTest("Unable to encode for this platform.")
1876
Brian Curtinf498b752010-11-30 15:54:04 +00001877 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001878 self.file2 = self.file1 + "2"
1879 self._test_link(self.file1, self.file2)
1880
Serhiy Storchaka43767632013-11-03 21:31:38 +02001881@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1882class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001883 # uid_t and gid_t are 32-bit unsigned integers on Linux
1884 UID_OVERFLOW = (1 << 32)
1885 GID_OVERFLOW = (1 << 32)
1886
Serhiy Storchaka43767632013-11-03 21:31:38 +02001887 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1888 def test_setuid(self):
1889 if os.getuid() != 0:
1890 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001891 self.assertRaises(TypeError, os.setuid, 'not an int')
1892 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001893
Serhiy Storchaka43767632013-11-03 21:31:38 +02001894 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1895 def test_setgid(self):
1896 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1897 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001898 self.assertRaises(TypeError, os.setgid, 'not an int')
1899 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001900
Serhiy Storchaka43767632013-11-03 21:31:38 +02001901 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1902 def test_seteuid(self):
1903 if os.getuid() != 0:
1904 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001905 self.assertRaises(TypeError, os.setegid, 'not an int')
1906 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001907
Serhiy Storchaka43767632013-11-03 21:31:38 +02001908 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1909 def test_setegid(self):
1910 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1911 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001912 self.assertRaises(TypeError, os.setegid, 'not an int')
1913 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001914
Serhiy Storchaka43767632013-11-03 21:31:38 +02001915 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1916 def test_setreuid(self):
1917 if os.getuid() != 0:
1918 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001919 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1920 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1921 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1922 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001923
Serhiy Storchaka43767632013-11-03 21:31:38 +02001924 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1925 def test_setreuid_neg1(self):
1926 # Needs to accept -1. We run this in a subprocess to avoid
1927 # altering the test runner's process state (issue8045).
1928 subprocess.check_call([
1929 sys.executable, '-c',
1930 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001931
Serhiy Storchaka43767632013-11-03 21:31:38 +02001932 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1933 def test_setregid(self):
1934 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1935 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001936 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
1937 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
1938 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
1939 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001940
Serhiy Storchaka43767632013-11-03 21:31:38 +02001941 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1942 def test_setregid_neg1(self):
1943 # Needs to accept -1. We run this in a subprocess to avoid
1944 # altering the test runner's process state (issue8045).
1945 subprocess.check_call([
1946 sys.executable, '-c',
1947 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001948
Serhiy Storchaka43767632013-11-03 21:31:38 +02001949@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1950class Pep383Tests(unittest.TestCase):
1951 def setUp(self):
1952 if support.TESTFN_UNENCODABLE:
1953 self.dir = support.TESTFN_UNENCODABLE
1954 elif support.TESTFN_NONASCII:
1955 self.dir = support.TESTFN_NONASCII
1956 else:
1957 self.dir = support.TESTFN
1958 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001959
Serhiy Storchaka43767632013-11-03 21:31:38 +02001960 bytesfn = []
1961 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001962 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001963 fn = os.fsencode(fn)
1964 except UnicodeEncodeError:
1965 return
1966 bytesfn.append(fn)
1967 add_filename(support.TESTFN_UNICODE)
1968 if support.TESTFN_UNENCODABLE:
1969 add_filename(support.TESTFN_UNENCODABLE)
1970 if support.TESTFN_NONASCII:
1971 add_filename(support.TESTFN_NONASCII)
1972 if not bytesfn:
1973 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001974
Serhiy Storchaka43767632013-11-03 21:31:38 +02001975 self.unicodefn = set()
1976 os.mkdir(self.dir)
1977 try:
1978 for fn in bytesfn:
1979 support.create_empty_file(os.path.join(self.bdir, fn))
1980 fn = os.fsdecode(fn)
1981 if fn in self.unicodefn:
1982 raise ValueError("duplicate filename")
1983 self.unicodefn.add(fn)
1984 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001985 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001986 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001987
Serhiy Storchaka43767632013-11-03 21:31:38 +02001988 def tearDown(self):
1989 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001990
Serhiy Storchaka43767632013-11-03 21:31:38 +02001991 def test_listdir(self):
1992 expected = self.unicodefn
1993 found = set(os.listdir(self.dir))
1994 self.assertEqual(found, expected)
1995 # test listdir without arguments
1996 current_directory = os.getcwd()
1997 try:
1998 os.chdir(os.sep)
1999 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2000 finally:
2001 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002002
Serhiy Storchaka43767632013-11-03 21:31:38 +02002003 def test_open(self):
2004 for fn in self.unicodefn:
2005 f = open(os.path.join(self.dir, fn), 'rb')
2006 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002007
Serhiy Storchaka43767632013-11-03 21:31:38 +02002008 @unittest.skipUnless(hasattr(os, 'statvfs'),
2009 "need os.statvfs()")
2010 def test_statvfs(self):
2011 # issue #9645
2012 for fn in self.unicodefn:
2013 # should not fail with file not found error
2014 fullname = os.path.join(self.dir, fn)
2015 os.statvfs(fullname)
2016
2017 def test_stat(self):
2018 for fn in self.unicodefn:
2019 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002020
Brian Curtineb24d742010-04-12 17:16:38 +00002021@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2022class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002023 def _kill(self, sig):
2024 # Start sys.executable as a subprocess and communicate from the
2025 # subprocess to the parent that the interpreter is ready. When it
2026 # becomes ready, send *sig* via os.kill to the subprocess and check
2027 # that the return code is equal to *sig*.
2028 import ctypes
2029 from ctypes import wintypes
2030 import msvcrt
2031
2032 # Since we can't access the contents of the process' stdout until the
2033 # process has exited, use PeekNamedPipe to see what's inside stdout
2034 # without waiting. This is done so we can tell that the interpreter
2035 # is started and running at a point where it could handle a signal.
2036 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2037 PeekNamedPipe.restype = wintypes.BOOL
2038 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2039 ctypes.POINTER(ctypes.c_char), # stdout buf
2040 wintypes.DWORD, # Buffer size
2041 ctypes.POINTER(wintypes.DWORD), # bytes read
2042 ctypes.POINTER(wintypes.DWORD), # bytes avail
2043 ctypes.POINTER(wintypes.DWORD)) # bytes left
2044 msg = "running"
2045 proc = subprocess.Popen([sys.executable, "-c",
2046 "import sys;"
2047 "sys.stdout.write('{}');"
2048 "sys.stdout.flush();"
2049 "input()".format(msg)],
2050 stdout=subprocess.PIPE,
2051 stderr=subprocess.PIPE,
2052 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002053 self.addCleanup(proc.stdout.close)
2054 self.addCleanup(proc.stderr.close)
2055 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002056
2057 count, max = 0, 100
2058 while count < max and proc.poll() is None:
2059 # Create a string buffer to store the result of stdout from the pipe
2060 buf = ctypes.create_string_buffer(len(msg))
2061 # Obtain the text currently in proc.stdout
2062 # Bytes read/avail/left are left as NULL and unused
2063 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2064 buf, ctypes.sizeof(buf), None, None, None)
2065 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2066 if buf.value:
2067 self.assertEqual(msg, buf.value.decode())
2068 break
2069 time.sleep(0.1)
2070 count += 1
2071 else:
2072 self.fail("Did not receive communication from the subprocess")
2073
Brian Curtineb24d742010-04-12 17:16:38 +00002074 os.kill(proc.pid, sig)
2075 self.assertEqual(proc.wait(), sig)
2076
2077 def test_kill_sigterm(self):
2078 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002079 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002080
2081 def test_kill_int(self):
2082 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002083 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002084
2085 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002086 tagname = "test_os_%s" % uuid.uuid1()
2087 m = mmap.mmap(-1, 1, tagname)
2088 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002089 # Run a script which has console control handling enabled.
2090 proc = subprocess.Popen([sys.executable,
2091 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002092 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002093 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2094 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002095 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002096 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002097 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002098 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002099 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002100 count += 1
2101 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002102 # Forcefully kill the process if we weren't able to signal it.
2103 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002104 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002105 os.kill(proc.pid, event)
2106 # proc.send_signal(event) could also be done here.
2107 # Allow time for the signal to be passed and the process to exit.
2108 time.sleep(0.5)
2109 if not proc.poll():
2110 # Forcefully kill the process if we weren't able to signal it.
2111 os.kill(proc.pid, signal.SIGINT)
2112 self.fail("subprocess did not stop on {}".format(name))
2113
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002114 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002115 def test_CTRL_C_EVENT(self):
2116 from ctypes import wintypes
2117 import ctypes
2118
2119 # Make a NULL value by creating a pointer with no argument.
2120 NULL = ctypes.POINTER(ctypes.c_int)()
2121 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2122 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2123 wintypes.BOOL)
2124 SetConsoleCtrlHandler.restype = wintypes.BOOL
2125
2126 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002127 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002128 # by subprocesses.
2129 SetConsoleCtrlHandler(NULL, 0)
2130
2131 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2132
2133 def test_CTRL_BREAK_EVENT(self):
2134 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2135
2136
Brian Curtind40e6f72010-07-08 21:39:08 +00002137@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002138class Win32ListdirTests(unittest.TestCase):
2139 """Test listdir on Windows."""
2140
2141 def setUp(self):
2142 self.created_paths = []
2143 for i in range(2):
2144 dir_name = 'SUB%d' % i
2145 dir_path = os.path.join(support.TESTFN, dir_name)
2146 file_name = 'FILE%d' % i
2147 file_path = os.path.join(support.TESTFN, file_name)
2148 os.makedirs(dir_path)
2149 with open(file_path, 'w') as f:
2150 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2151 self.created_paths.extend([dir_name, file_name])
2152 self.created_paths.sort()
2153
2154 def tearDown(self):
2155 shutil.rmtree(support.TESTFN)
2156
2157 def test_listdir_no_extended_path(self):
2158 """Test when the path is not an "extended" path."""
2159 # unicode
2160 self.assertEqual(
2161 sorted(os.listdir(support.TESTFN)),
2162 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002163
Tim Golden781bbeb2013-10-25 20:24:06 +01002164 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002165 self.assertEqual(
2166 sorted(os.listdir(os.fsencode(support.TESTFN))),
2167 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002168
2169 def test_listdir_extended_path(self):
2170 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002171 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002172 # unicode
2173 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2174 self.assertEqual(
2175 sorted(os.listdir(path)),
2176 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002177
Tim Golden781bbeb2013-10-25 20:24:06 +01002178 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002179 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2180 self.assertEqual(
2181 sorted(os.listdir(path)),
2182 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002183
2184
Berker Peksage0b5b202018-08-15 13:03:41 +03002185@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2186class ReadlinkTests(unittest.TestCase):
2187 filelink = 'readlinktest'
2188 filelink_target = os.path.abspath(__file__)
2189 filelinkb = os.fsencode(filelink)
2190 filelinkb_target = os.fsencode(filelink_target)
2191
2192 def setUp(self):
2193 self.assertTrue(os.path.exists(self.filelink_target))
2194 self.assertTrue(os.path.exists(self.filelinkb_target))
2195 self.assertFalse(os.path.exists(self.filelink))
2196 self.assertFalse(os.path.exists(self.filelinkb))
2197
2198 def test_not_symlink(self):
2199 filelink_target = FakePath(self.filelink_target)
2200 self.assertRaises(OSError, os.readlink, self.filelink_target)
2201 self.assertRaises(OSError, os.readlink, filelink_target)
2202
2203 def test_missing_link(self):
2204 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2205 self.assertRaises(FileNotFoundError, os.readlink,
2206 FakePath('missing-link'))
2207
2208 @support.skip_unless_symlink
2209 def test_pathlike(self):
2210 os.symlink(self.filelink_target, self.filelink)
2211 self.addCleanup(support.unlink, self.filelink)
2212 filelink = FakePath(self.filelink)
2213 self.assertEqual(os.readlink(filelink), self.filelink_target)
2214
2215 @support.skip_unless_symlink
2216 def test_pathlike_bytes(self):
2217 os.symlink(self.filelinkb_target, self.filelinkb)
2218 self.addCleanup(support.unlink, self.filelinkb)
2219 path = os.readlink(FakePath(self.filelinkb))
2220 self.assertEqual(path, self.filelinkb_target)
2221 self.assertIsInstance(path, bytes)
2222
2223 @support.skip_unless_symlink
2224 def test_bytes(self):
2225 os.symlink(self.filelinkb_target, self.filelinkb)
2226 self.addCleanup(support.unlink, self.filelinkb)
2227 path = os.readlink(self.filelinkb)
2228 self.assertEqual(path, self.filelinkb_target)
2229 self.assertIsInstance(path, bytes)
2230
2231
Tim Golden781bbeb2013-10-25 20:24:06 +01002232@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002233@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002234class Win32SymlinkTests(unittest.TestCase):
2235 filelink = 'filelinktest'
2236 filelink_target = os.path.abspath(__file__)
2237 dirlink = 'dirlinktest'
2238 dirlink_target = os.path.dirname(filelink_target)
2239 missing_link = 'missing link'
2240
2241 def setUp(self):
2242 assert os.path.exists(self.dirlink_target)
2243 assert os.path.exists(self.filelink_target)
2244 assert not os.path.exists(self.dirlink)
2245 assert not os.path.exists(self.filelink)
2246 assert not os.path.exists(self.missing_link)
2247
2248 def tearDown(self):
2249 if os.path.exists(self.filelink):
2250 os.remove(self.filelink)
2251 if os.path.exists(self.dirlink):
2252 os.rmdir(self.dirlink)
2253 if os.path.lexists(self.missing_link):
2254 os.remove(self.missing_link)
2255
2256 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002257 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002258 self.assertTrue(os.path.exists(self.dirlink))
2259 self.assertTrue(os.path.isdir(self.dirlink))
2260 self.assertTrue(os.path.islink(self.dirlink))
2261 self.check_stat(self.dirlink, self.dirlink_target)
2262
2263 def test_file_link(self):
2264 os.symlink(self.filelink_target, self.filelink)
2265 self.assertTrue(os.path.exists(self.filelink))
2266 self.assertTrue(os.path.isfile(self.filelink))
2267 self.assertTrue(os.path.islink(self.filelink))
2268 self.check_stat(self.filelink, self.filelink_target)
2269
2270 def _create_missing_dir_link(self):
2271 'Create a "directory" link to a non-existent target'
2272 linkname = self.missing_link
2273 if os.path.lexists(linkname):
2274 os.remove(linkname)
2275 target = r'c:\\target does not exist.29r3c740'
2276 assert not os.path.exists(target)
2277 target_is_dir = True
2278 os.symlink(target, linkname, target_is_dir)
2279
2280 def test_remove_directory_link_to_missing_target(self):
2281 self._create_missing_dir_link()
2282 # For compatibility with Unix, os.remove will check the
2283 # directory status and call RemoveDirectory if the symlink
2284 # was created with target_is_dir==True.
2285 os.remove(self.missing_link)
2286
2287 @unittest.skip("currently fails; consider for improvement")
2288 def test_isdir_on_directory_link_to_missing_target(self):
2289 self._create_missing_dir_link()
2290 # consider having isdir return true for directory links
2291 self.assertTrue(os.path.isdir(self.missing_link))
2292
2293 @unittest.skip("currently fails; consider for improvement")
2294 def test_rmdir_on_directory_link_to_missing_target(self):
2295 self._create_missing_dir_link()
2296 # consider allowing rmdir to remove directory links
2297 os.rmdir(self.missing_link)
2298
2299 def check_stat(self, link, target):
2300 self.assertEqual(os.stat(link), os.stat(target))
2301 self.assertNotEqual(os.lstat(link), os.stat(link))
2302
Brian Curtind25aef52011-06-13 15:16:04 -05002303 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002304 self.assertEqual(os.stat(bytes_link), os.stat(target))
2305 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002306
2307 def test_12084(self):
2308 level1 = os.path.abspath(support.TESTFN)
2309 level2 = os.path.join(level1, "level2")
2310 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002311 self.addCleanup(support.rmtree, level1)
2312
2313 os.mkdir(level1)
2314 os.mkdir(level2)
2315 os.mkdir(level3)
2316
2317 file1 = os.path.abspath(os.path.join(level1, "file1"))
2318 create_file(file1)
2319
2320 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002321 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002322 os.chdir(level2)
2323 link = os.path.join(level2, "link")
2324 os.symlink(os.path.relpath(file1), "link")
2325 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002326
Victor Stinnerae39d232016-03-24 17:12:55 +01002327 # Check os.stat calls from the same dir as the link
2328 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002329
Victor Stinnerae39d232016-03-24 17:12:55 +01002330 # Check os.stat calls from a dir below the link
2331 os.chdir(level1)
2332 self.assertEqual(os.stat(file1),
2333 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002334
Victor Stinnerae39d232016-03-24 17:12:55 +01002335 # Check os.stat calls from a dir above the link
2336 os.chdir(level3)
2337 self.assertEqual(os.stat(file1),
2338 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002339 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002340 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002341
SSE43c34aad2018-02-13 00:10:35 +07002342 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2343 and os.path.exists(r'C:\ProgramData'),
2344 'Test directories not found')
2345 def test_29248(self):
2346 # os.symlink() calls CreateSymbolicLink, which creates
2347 # the reparse data buffer with the print name stored
2348 # first, so the offset is always 0. CreateSymbolicLink
2349 # stores the "PrintName" DOS path (e.g. "C:\") first,
2350 # with an offset of 0, followed by the "SubstituteName"
2351 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2352 # the other hand, seems to have been created manually
2353 # with an inverted order.
2354 target = os.readlink(r'C:\Users\All Users')
2355 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2356
Steve Dower6921e732018-03-05 14:26:08 -08002357 def test_buffer_overflow(self):
2358 # Older versions would have a buffer overflow when detecting
2359 # whether a link source was a directory. This test ensures we
2360 # no longer crash, but does not otherwise validate the behavior
2361 segment = 'X' * 27
2362 path = os.path.join(*[segment] * 10)
2363 test_cases = [
2364 # overflow with absolute src
2365 ('\\' + path, segment),
2366 # overflow dest with relative src
2367 (segment, path),
2368 # overflow when joining src
2369 (path[:180], path[:180]),
2370 ]
2371 for src, dest in test_cases:
2372 try:
2373 os.symlink(src, dest)
2374 except FileNotFoundError:
2375 pass
2376 else:
2377 try:
2378 os.remove(dest)
2379 except OSError:
2380 pass
2381 # Also test with bytes, since that is a separate code path.
2382 try:
2383 os.symlink(os.fsencode(src), os.fsencode(dest))
2384 except FileNotFoundError:
2385 pass
2386 else:
2387 try:
2388 os.remove(dest)
2389 except OSError:
2390 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002391
Tim Golden0321cf22014-05-05 19:46:17 +01002392@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2393class Win32JunctionTests(unittest.TestCase):
2394 junction = 'junctiontest'
2395 junction_target = os.path.dirname(os.path.abspath(__file__))
2396
2397 def setUp(self):
2398 assert os.path.exists(self.junction_target)
2399 assert not os.path.exists(self.junction)
2400
2401 def tearDown(self):
2402 if os.path.exists(self.junction):
2403 # os.rmdir delegates to Windows' RemoveDirectoryW,
2404 # which removes junction points safely.
2405 os.rmdir(self.junction)
2406
2407 def test_create_junction(self):
2408 _winapi.CreateJunction(self.junction_target, self.junction)
2409 self.assertTrue(os.path.exists(self.junction))
2410 self.assertTrue(os.path.isdir(self.junction))
2411
2412 # Junctions are not recognized as links.
2413 self.assertFalse(os.path.islink(self.junction))
2414
2415 def test_unlink_removes_junction(self):
2416 _winapi.CreateJunction(self.junction_target, self.junction)
2417 self.assertTrue(os.path.exists(self.junction))
2418
2419 os.unlink(self.junction)
2420 self.assertFalse(os.path.exists(self.junction))
2421
Mark Becwarb82bfac2019-02-02 16:08:23 -05002422@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2423class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002424 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002425 nt = support.import_module('nt')
2426 ctypes = support.import_module('ctypes')
2427 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002428
2429 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2430 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2431
2432 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2433 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2434 ctypes.wintypes.LPDWORD)
2435
2436 # This is a pseudo-handle that doesn't need to be closed
2437 hproc = kernel.GetCurrentProcess()
2438
2439 handle_count = ctypes.wintypes.DWORD()
2440 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2441 self.assertEqual(1, ok)
2442
2443 before_count = handle_count.value
2444
2445 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002446 filenames = [
2447 r'\\?\C:',
2448 r'\\?\NUL',
2449 r'\\?\CONIN',
2450 __file__,
2451 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002452
Berker Peksag6ef726a2019-04-22 18:46:28 +03002453 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002454 for name in filenames:
2455 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002456 nt._getfinalpathname(name)
2457 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002458 # Failure is expected
2459 pass
2460 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002461 os.stat(name)
2462 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002463 pass
2464
2465 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2466 self.assertEqual(1, ok)
2467
2468 handle_delta = handle_count.value - before_count
2469
2470 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002471
Jason R. Coombs3a092862013-05-27 23:21:28 -04002472@support.skip_unless_symlink
2473class NonLocalSymlinkTests(unittest.TestCase):
2474
2475 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002476 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002477 Create this structure:
2478
2479 base
2480 \___ some_dir
2481 """
2482 os.makedirs('base/some_dir')
2483
2484 def tearDown(self):
2485 shutil.rmtree('base')
2486
2487 def test_directory_link_nonlocal(self):
2488 """
2489 The symlink target should resolve relative to the link, not relative
2490 to the current directory.
2491
2492 Then, link base/some_link -> base/some_dir and ensure that some_link
2493 is resolved as a directory.
2494
2495 In issue13772, it was discovered that directory detection failed if
2496 the symlink target was not specified relative to the current
2497 directory, which was a defect in the implementation.
2498 """
2499 src = os.path.join('base', 'some_link')
2500 os.symlink('some_dir', src)
2501 assert os.path.isdir(src)
2502
2503
Victor Stinnere8d51452010-08-19 01:05:19 +00002504class FSEncodingTests(unittest.TestCase):
2505 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002506 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2507 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002508
Victor Stinnere8d51452010-08-19 01:05:19 +00002509 def test_identity(self):
2510 # assert fsdecode(fsencode(x)) == x
2511 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2512 try:
2513 bytesfn = os.fsencode(fn)
2514 except UnicodeEncodeError:
2515 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002516 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002517
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002518
Brett Cannonefb00c02012-02-29 18:31:31 -05002519
2520class DeviceEncodingTests(unittest.TestCase):
2521
2522 def test_bad_fd(self):
2523 # Return None when an fd doesn't actually exist.
2524 self.assertIsNone(os.device_encoding(123456))
2525
Paul Monson62dfd7d2019-04-25 11:36:45 -07002526 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002527 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002528 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002529 def test_device_encoding(self):
2530 encoding = os.device_encoding(0)
2531 self.assertIsNotNone(encoding)
2532 self.assertTrue(codecs.lookup(encoding))
2533
2534
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002535class PidTests(unittest.TestCase):
2536 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2537 def test_getppid(self):
2538 p = subprocess.Popen([sys.executable, '-c',
2539 'import os; print(os.getppid())'],
2540 stdout=subprocess.PIPE)
2541 stdout, _ = p.communicate()
2542 # We are the parent of our subprocess
2543 self.assertEqual(int(stdout), os.getpid())
2544
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002545 def test_waitpid(self):
2546 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002547 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002548 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002549 status = os.waitpid(pid, 0)
2550 self.assertEqual(status, (pid, 0))
2551
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002552
Victor Stinner4659ccf2016-09-14 10:57:00 +02002553class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002554 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002555 self.exitcode = 17
2556
2557 filename = support.TESTFN
2558 self.addCleanup(support.unlink, filename)
2559
2560 if not with_env:
2561 code = 'import sys; sys.exit(%s)' % self.exitcode
2562 else:
2563 self.env = dict(os.environ)
2564 # create an unique key
2565 self.key = str(uuid.uuid4())
2566 self.env[self.key] = self.key
2567 # read the variable from os.environ to check that it exists
2568 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2569 % (self.key, self.exitcode))
2570
2571 with open(filename, "w") as fp:
2572 fp.write(code)
2573
Berker Peksag81816462016-09-15 20:19:47 +03002574 args = [sys.executable, filename]
2575 if use_bytes:
2576 args = [os.fsencode(a) for a in args]
2577 self.env = {os.fsencode(k): os.fsencode(v)
2578 for k, v in self.env.items()}
2579
2580 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002581
Berker Peksag4af23d72016-09-15 20:32:44 +03002582 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002583 def test_spawnl(self):
2584 args = self.create_args()
2585 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2586 self.assertEqual(exitcode, self.exitcode)
2587
Berker Peksag4af23d72016-09-15 20:32:44 +03002588 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002589 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002590 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002591 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2592 self.assertEqual(exitcode, self.exitcode)
2593
Berker Peksag4af23d72016-09-15 20:32:44 +03002594 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002595 def test_spawnlp(self):
2596 args = self.create_args()
2597 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2598 self.assertEqual(exitcode, self.exitcode)
2599
Berker Peksag4af23d72016-09-15 20:32:44 +03002600 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002601 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002602 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002603 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2604 self.assertEqual(exitcode, self.exitcode)
2605
Berker Peksag4af23d72016-09-15 20:32:44 +03002606 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002607 def test_spawnv(self):
2608 args = self.create_args()
2609 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2610 self.assertEqual(exitcode, self.exitcode)
2611
Berker Peksag4af23d72016-09-15 20:32:44 +03002612 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002613 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002614 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002615 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2616 self.assertEqual(exitcode, self.exitcode)
2617
Berker Peksag4af23d72016-09-15 20:32:44 +03002618 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002619 def test_spawnvp(self):
2620 args = self.create_args()
2621 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2622 self.assertEqual(exitcode, self.exitcode)
2623
Berker Peksag4af23d72016-09-15 20:32:44 +03002624 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002625 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002626 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002627 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2628 self.assertEqual(exitcode, self.exitcode)
2629
Berker Peksag4af23d72016-09-15 20:32:44 +03002630 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002631 def test_nowait(self):
2632 args = self.create_args()
2633 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2634 result = os.waitpid(pid, 0)
2635 self.assertEqual(result[0], pid)
2636 status = result[1]
2637 if hasattr(os, 'WIFEXITED'):
2638 self.assertTrue(os.WIFEXITED(status))
2639 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2640 else:
2641 self.assertEqual(status, self.exitcode << 8)
2642
Berker Peksag4af23d72016-09-15 20:32:44 +03002643 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002644 def test_spawnve_bytes(self):
2645 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2646 args = self.create_args(with_env=True, use_bytes=True)
2647 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2648 self.assertEqual(exitcode, self.exitcode)
2649
Steve Dower859fd7b2016-11-19 18:53:19 -08002650 @requires_os_func('spawnl')
2651 def test_spawnl_noargs(self):
2652 args = self.create_args()
2653 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002654 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002655
2656 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002657 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002658 args = self.create_args()
2659 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002660 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002661
2662 @requires_os_func('spawnv')
2663 def test_spawnv_noargs(self):
2664 args = self.create_args()
2665 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2666 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002667 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2668 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002669
2670 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002671 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002672 args = self.create_args()
2673 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2674 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002675 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2676 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002677
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002678 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002679 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002680
Ville Skyttä49b27342017-08-03 09:00:59 +03002681 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002682 newenv = os.environ.copy()
2683 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2684 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002685 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002686 except ValueError:
2687 pass
2688 else:
2689 self.assertEqual(exitcode, 127)
2690
Ville Skyttä49b27342017-08-03 09:00:59 +03002691 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002692 newenv = os.environ.copy()
2693 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2694 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002695 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002696 except ValueError:
2697 pass
2698 else:
2699 self.assertEqual(exitcode, 127)
2700
Ville Skyttä49b27342017-08-03 09:00:59 +03002701 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002702 newenv = os.environ.copy()
2703 newenv["FRUIT=ORANGE"] = "lemon"
2704 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002705 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002706 except ValueError:
2707 pass
2708 else:
2709 self.assertEqual(exitcode, 127)
2710
Ville Skyttä49b27342017-08-03 09:00:59 +03002711 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002712 filename = support.TESTFN
2713 self.addCleanup(support.unlink, filename)
2714 with open(filename, "w") as fp:
2715 fp.write('import sys, os\n'
2716 'if os.getenv("FRUIT") != "orange=lemon":\n'
2717 ' raise AssertionError')
2718 args = [sys.executable, filename]
2719 newenv = os.environ.copy()
2720 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002721 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002722 self.assertEqual(exitcode, 0)
2723
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002724 @requires_os_func('spawnve')
2725 def test_spawnve_invalid_env(self):
2726 self._test_invalid_env(os.spawnve)
2727
2728 @requires_os_func('spawnvpe')
2729 def test_spawnvpe_invalid_env(self):
2730 self._test_invalid_env(os.spawnvpe)
2731
Serhiy Storchaka77703942017-06-25 07:33:01 +03002732
Brian Curtin0151b8e2010-09-24 13:43:43 +00002733# The introduction of this TestCase caused at least two different errors on
2734# *nix buildbots. Temporarily skip this to let the buildbots move along.
2735@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002736@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2737class LoginTests(unittest.TestCase):
2738 def test_getlogin(self):
2739 user_name = os.getlogin()
2740 self.assertNotEqual(len(user_name), 0)
2741
2742
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002743@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2744 "needs os.getpriority and os.setpriority")
2745class ProgramPriorityTests(unittest.TestCase):
2746 """Tests for os.getpriority() and os.setpriority()."""
2747
2748 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002749
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002750 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2751 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2752 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002753 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2754 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002755 raise unittest.SkipTest("unable to reliably test setpriority "
2756 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002757 else:
2758 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002759 finally:
2760 try:
2761 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2762 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002763 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002764 raise
2765
2766
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002767class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002768
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002769 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002770
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002771 def __init__(self, conn):
2772 asynchat.async_chat.__init__(self, conn)
2773 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002774 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775 self.closed = False
2776 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002777
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778 def handle_read(self):
2779 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002780 if self.accumulate:
2781 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002782
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002783 def get_data(self):
2784 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002785
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002786 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002787 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002788 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002789
2790 def handle_error(self):
2791 raise
2792
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002793 def __init__(self, address):
2794 threading.Thread.__init__(self)
2795 asyncore.dispatcher.__init__(self)
2796 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2797 self.bind(address)
2798 self.listen(5)
2799 self.host, self.port = self.socket.getsockname()[:2]
2800 self.handler_instance = None
2801 self._active = False
2802 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804 # --- public API
2805
2806 @property
2807 def running(self):
2808 return self._active
2809
2810 def start(self):
2811 assert not self.running
2812 self.__flag = threading.Event()
2813 threading.Thread.start(self)
2814 self.__flag.wait()
2815
2816 def stop(self):
2817 assert self.running
2818 self._active = False
2819 self.join()
2820
2821 def wait(self):
2822 # wait for handler connection to be closed, then stop the server
2823 while not getattr(self.handler_instance, "closed", False):
2824 time.sleep(0.001)
2825 self.stop()
2826
2827 # --- internals
2828
2829 def run(self):
2830 self._active = True
2831 self.__flag.set()
2832 while self._active and asyncore.socket_map:
2833 self._active_lock.acquire()
2834 asyncore.loop(timeout=0.001, count=1)
2835 self._active_lock.release()
2836 asyncore.close_all()
2837
2838 def handle_accept(self):
2839 conn, addr = self.accept()
2840 self.handler_instance = self.Handler(conn)
2841
2842 def handle_connect(self):
2843 self.close()
2844 handle_read = handle_connect
2845
2846 def writable(self):
2847 return 0
2848
2849 def handle_error(self):
2850 raise
2851
2852
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002853@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2854class TestSendfile(unittest.TestCase):
2855
Victor Stinner8c663fd2017-11-08 14:44:44 -08002856 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002857 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002858 not sys.platform.startswith("solaris") and \
2859 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002860 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2861 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002862 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2863 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002864
2865 @classmethod
2866 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002867 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002868 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002869
2870 @classmethod
2871 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002872 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002873 support.unlink(support.TESTFN)
2874
2875 def setUp(self):
2876 self.server = SendfileTestServer((support.HOST, 0))
2877 self.server.start()
2878 self.client = socket.socket()
2879 self.client.connect((self.server.host, self.server.port))
2880 self.client.settimeout(1)
2881 # synchronize by waiting for "220 ready" response
2882 self.client.recv(1024)
2883 self.sockno = self.client.fileno()
2884 self.file = open(support.TESTFN, 'rb')
2885 self.fileno = self.file.fileno()
2886
2887 def tearDown(self):
2888 self.file.close()
2889 self.client.close()
2890 if self.server.running:
2891 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002892 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002893
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002894 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002895 """A higher level wrapper representing how an application is
2896 supposed to use sendfile().
2897 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002898 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002899 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002900 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002901 except OSError as err:
2902 if err.errno == errno.ECONNRESET:
2903 # disconnected
2904 raise
2905 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2906 # we have to retry send data
2907 continue
2908 else:
2909 raise
2910
2911 def test_send_whole_file(self):
2912 # normal send
2913 total_sent = 0
2914 offset = 0
2915 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002916 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002917 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2918 if sent == 0:
2919 break
2920 offset += sent
2921 total_sent += sent
2922 self.assertTrue(sent <= nbytes)
2923 self.assertEqual(offset, total_sent)
2924
2925 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002926 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002927 self.client.close()
2928 self.server.wait()
2929 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002930 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002931 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002932
2933 def test_send_at_certain_offset(self):
2934 # start sending a file at a certain offset
2935 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002936 offset = len(self.DATA) // 2
2937 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002938 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002939 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002940 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2941 if sent == 0:
2942 break
2943 offset += sent
2944 total_sent += sent
2945 self.assertTrue(sent <= nbytes)
2946
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002947 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002948 self.client.close()
2949 self.server.wait()
2950 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002951 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002952 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002953 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002954 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002955
2956 def test_offset_overflow(self):
2957 # specify an offset > file size
2958 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002959 try:
2960 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2961 except OSError as e:
2962 # Solaris can raise EINVAL if offset >= file length, ignore.
2963 if e.errno != errno.EINVAL:
2964 raise
2965 else:
2966 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002967 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002968 self.client.close()
2969 self.server.wait()
2970 data = self.server.handler_instance.get_data()
2971 self.assertEqual(data, b'')
2972
2973 def test_invalid_offset(self):
2974 with self.assertRaises(OSError) as cm:
2975 os.sendfile(self.sockno, self.fileno, -1, 4096)
2976 self.assertEqual(cm.exception.errno, errno.EINVAL)
2977
Martin Panterbf19d162015-09-09 01:01:13 +00002978 def test_keywords(self):
2979 # Keyword arguments should be supported
2980 os.sendfile(out=self.sockno, offset=0, count=4096,
2981 **{'in': self.fileno})
2982 if self.SUPPORT_HEADERS_TRAILERS:
2983 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002984 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002985
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002986 # --- headers / trailers tests
2987
Serhiy Storchaka43767632013-11-03 21:31:38 +02002988 @requires_headers_trailers
2989 def test_headers(self):
2990 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002991 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002992 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002993 headers=[b"x" * 512, b"y" * 256])
2994 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002995 total_sent += sent
2996 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002997 while total_sent < len(expected_data):
2998 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002999 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3000 offset, nbytes)
3001 if sent == 0:
3002 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003003 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003004 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003005 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003006
Serhiy Storchaka43767632013-11-03 21:31:38 +02003007 self.assertEqual(total_sent, len(expected_data))
3008 self.client.close()
3009 self.server.wait()
3010 data = self.server.handler_instance.get_data()
3011 self.assertEqual(hash(data), hash(expected_data))
3012
3013 @requires_headers_trailers
3014 def test_trailers(self):
3015 TESTFN2 = support.TESTFN + "2"
3016 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003017
3018 self.addCleanup(support.unlink, TESTFN2)
3019 create_file(TESTFN2, file_data)
3020
3021 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003022 os.sendfile(self.sockno, f.fileno(), 0, 5,
3023 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003024 self.client.close()
3025 self.server.wait()
3026 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003027 self.assertEqual(data, b"abcde123456789")
3028
3029 @requires_headers_trailers
3030 @requires_32b
3031 def test_headers_overflow_32bits(self):
3032 self.server.handler_instance.accumulate = False
3033 with self.assertRaises(OSError) as cm:
3034 os.sendfile(self.sockno, self.fileno, 0, 0,
3035 headers=[b"x" * 2**16] * 2**15)
3036 self.assertEqual(cm.exception.errno, errno.EINVAL)
3037
3038 @requires_headers_trailers
3039 @requires_32b
3040 def test_trailers_overflow_32bits(self):
3041 self.server.handler_instance.accumulate = False
3042 with self.assertRaises(OSError) as cm:
3043 os.sendfile(self.sockno, self.fileno, 0, 0,
3044 trailers=[b"x" * 2**16] * 2**15)
3045 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003046
Serhiy Storchaka43767632013-11-03 21:31:38 +02003047 @requires_headers_trailers
3048 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3049 'test needs os.SF_NODISKIO')
3050 def test_flags(self):
3051 try:
3052 os.sendfile(self.sockno, self.fileno, 0, 4096,
3053 flags=os.SF_NODISKIO)
3054 except OSError as err:
3055 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3056 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003057
3058
Larry Hastings9cf065c2012-06-22 16:30:09 -07003059def supports_extended_attributes():
3060 if not hasattr(os, "setxattr"):
3061 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003062
Larry Hastings9cf065c2012-06-22 16:30:09 -07003063 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003064 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003065 try:
3066 os.setxattr(fp.fileno(), b"user.test", b"")
3067 except OSError:
3068 return False
3069 finally:
3070 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003071
3072 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003073
3074
3075@unittest.skipUnless(supports_extended_attributes(),
3076 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003077# Kernels < 2.6.39 don't respect setxattr flags.
3078@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003079class ExtendedAttributeTests(unittest.TestCase):
3080
Larry Hastings9cf065c2012-06-22 16:30:09 -07003081 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003082 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003083 self.addCleanup(support.unlink, fn)
3084 create_file(fn)
3085
Benjamin Peterson799bd802011-08-31 22:15:17 -04003086 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003087 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003088 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003089
Victor Stinnerf12e5062011-10-16 22:12:03 +02003090 init_xattr = listxattr(fn)
3091 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003092
Larry Hastings9cf065c2012-06-22 16:30:09 -07003093 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003094 xattr = set(init_xattr)
3095 xattr.add("user.test")
3096 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003097 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3098 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3099 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003100
Benjamin Peterson799bd802011-08-31 22:15:17 -04003101 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003102 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003103 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003104
Benjamin Peterson799bd802011-08-31 22:15:17 -04003105 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003106 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003107 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003108
Larry Hastings9cf065c2012-06-22 16:30:09 -07003109 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003110 xattr.add("user.test2")
3111 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003112 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003113
Benjamin Peterson799bd802011-08-31 22:15:17 -04003114 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003115 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003116 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003117
Victor Stinnerf12e5062011-10-16 22:12:03 +02003118 xattr.remove("user.test")
3119 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003120 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3121 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3122 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3123 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003124 many = sorted("user.test{}".format(i) for i in range(100))
3125 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003126 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003127 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003128
Larry Hastings9cf065c2012-06-22 16:30:09 -07003129 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003130 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003131 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003132
3133 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3134 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003135
3136 def test_simple(self):
3137 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3138 os.listxattr)
3139
3140 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003141 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3142 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003143
3144 def test_fds(self):
3145 def getxattr(path, *args):
3146 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003147 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003148 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003149 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003150 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003151 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003152 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003153 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003154 def listxattr(path, *args):
3155 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003156 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003157 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3158
3159
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003160@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3161class TermsizeTests(unittest.TestCase):
3162 def test_does_not_crash(self):
3163 """Check if get_terminal_size() returns a meaningful value.
3164
3165 There's no easy portable way to actually check the size of the
3166 terminal, so let's check if it returns something sensible instead.
3167 """
3168 try:
3169 size = os.get_terminal_size()
3170 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003171 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003172 # Under win32 a generic OSError can be thrown if the
3173 # handle cannot be retrieved
3174 self.skipTest("failed to query terminal size")
3175 raise
3176
Antoine Pitroucfade362012-02-08 23:48:59 +01003177 self.assertGreaterEqual(size.columns, 0)
3178 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003179
3180 def test_stty_match(self):
3181 """Check if stty returns the same results
3182
3183 stty actually tests stdin, so get_terminal_size is invoked on
3184 stdin explicitly. If stty succeeded, then get_terminal_size()
3185 should work too.
3186 """
3187 try:
3188 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003189 except (FileNotFoundError, subprocess.CalledProcessError,
3190 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003191 self.skipTest("stty invocation failed")
3192 expected = (int(size[1]), int(size[0])) # reversed order
3193
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003194 try:
3195 actual = os.get_terminal_size(sys.__stdin__.fileno())
3196 except OSError as e:
3197 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3198 # Under win32 a generic OSError can be thrown if the
3199 # handle cannot be retrieved
3200 self.skipTest("failed to query terminal size")
3201 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003202 self.assertEqual(expected, actual)
3203
3204
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003205@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003206@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003207class MemfdCreateTests(unittest.TestCase):
3208 def test_memfd_create(self):
3209 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3210 self.assertNotEqual(fd, -1)
3211 self.addCleanup(os.close, fd)
3212 self.assertFalse(os.get_inheritable(fd))
3213 with open(fd, "wb", closefd=False) as f:
3214 f.write(b'memfd_create')
3215 self.assertEqual(f.tell(), 12)
3216
3217 fd2 = os.memfd_create("Hi")
3218 self.addCleanup(os.close, fd2)
3219 self.assertFalse(os.get_inheritable(fd2))
3220
3221
Victor Stinner292c8352012-10-30 02:17:38 +01003222class OSErrorTests(unittest.TestCase):
3223 def setUp(self):
3224 class Str(str):
3225 pass
3226
Victor Stinnerafe17062012-10-31 22:47:43 +01003227 self.bytes_filenames = []
3228 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003229 if support.TESTFN_UNENCODABLE is not None:
3230 decoded = support.TESTFN_UNENCODABLE
3231 else:
3232 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003233 self.unicode_filenames.append(decoded)
3234 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003235 if support.TESTFN_UNDECODABLE is not None:
3236 encoded = support.TESTFN_UNDECODABLE
3237 else:
3238 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003239 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003240 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003241 self.bytes_filenames.append(memoryview(encoded))
3242
3243 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003244
3245 def test_oserror_filename(self):
3246 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003247 (self.filenames, os.chdir,),
3248 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003249 (self.filenames, os.lstat,),
3250 (self.filenames, os.open, os.O_RDONLY),
3251 (self.filenames, os.rmdir,),
3252 (self.filenames, os.stat,),
3253 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003254 ]
3255 if sys.platform == "win32":
3256 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003257 (self.bytes_filenames, os.rename, b"dst"),
3258 (self.bytes_filenames, os.replace, b"dst"),
3259 (self.unicode_filenames, os.rename, "dst"),
3260 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003261 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003262 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003263 else:
3264 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003265 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003266 (self.filenames, os.rename, "dst"),
3267 (self.filenames, os.replace, "dst"),
3268 ))
3269 if hasattr(os, "chown"):
3270 funcs.append((self.filenames, os.chown, 0, 0))
3271 if hasattr(os, "lchown"):
3272 funcs.append((self.filenames, os.lchown, 0, 0))
3273 if hasattr(os, "truncate"):
3274 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003275 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003276 funcs.append((self.filenames, os.chflags, 0))
3277 if hasattr(os, "lchflags"):
3278 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003279 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003280 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003281 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003282 if sys.platform == "win32":
3283 funcs.append((self.bytes_filenames, os.link, b"dst"))
3284 funcs.append((self.unicode_filenames, os.link, "dst"))
3285 else:
3286 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003287 if hasattr(os, "listxattr"):
3288 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003289 (self.filenames, os.listxattr,),
3290 (self.filenames, os.getxattr, "user.test"),
3291 (self.filenames, os.setxattr, "user.test", b'user'),
3292 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003293 ))
3294 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003295 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003296 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003297 if sys.platform == "win32":
3298 funcs.append((self.unicode_filenames, os.readlink,))
3299 else:
3300 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003301
Steve Dowercc16be82016-09-08 10:35:16 -07003302
Victor Stinnerafe17062012-10-31 22:47:43 +01003303 for filenames, func, *func_args in funcs:
3304 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003305 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003306 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003307 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003308 else:
3309 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3310 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003311 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003312 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003313 except UnicodeDecodeError:
3314 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003315 else:
3316 self.fail("No exception thrown by {}".format(func))
3317
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003318class CPUCountTests(unittest.TestCase):
3319 def test_cpu_count(self):
3320 cpus = os.cpu_count()
3321 if cpus is not None:
3322 self.assertIsInstance(cpus, int)
3323 self.assertGreater(cpus, 0)
3324 else:
3325 self.skipTest("Could not determine the number of CPUs")
3326
Victor Stinnerdaf45552013-08-28 00:53:59 +02003327
3328class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003329 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003330 fd = os.open(__file__, os.O_RDONLY)
3331 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003332 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003333
Victor Stinnerdaf45552013-08-28 00:53:59 +02003334 os.set_inheritable(fd, True)
3335 self.assertEqual(os.get_inheritable(fd), True)
3336
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003337 @unittest.skipIf(fcntl is None, "need fcntl")
3338 def test_get_inheritable_cloexec(self):
3339 fd = os.open(__file__, os.O_RDONLY)
3340 self.addCleanup(os.close, fd)
3341 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003342
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003343 # clear FD_CLOEXEC flag
3344 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3345 flags &= ~fcntl.FD_CLOEXEC
3346 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003347
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003348 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003349
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003350 @unittest.skipIf(fcntl is None, "need fcntl")
3351 def test_set_inheritable_cloexec(self):
3352 fd = os.open(__file__, os.O_RDONLY)
3353 self.addCleanup(os.close, fd)
3354 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3355 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003356
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003357 os.set_inheritable(fd, True)
3358 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3359 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003360
Victor Stinnerdaf45552013-08-28 00:53:59 +02003361 def test_open(self):
3362 fd = os.open(__file__, os.O_RDONLY)
3363 self.addCleanup(os.close, fd)
3364 self.assertEqual(os.get_inheritable(fd), False)
3365
3366 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3367 def test_pipe(self):
3368 rfd, wfd = os.pipe()
3369 self.addCleanup(os.close, rfd)
3370 self.addCleanup(os.close, wfd)
3371 self.assertEqual(os.get_inheritable(rfd), False)
3372 self.assertEqual(os.get_inheritable(wfd), False)
3373
3374 def test_dup(self):
3375 fd1 = os.open(__file__, os.O_RDONLY)
3376 self.addCleanup(os.close, fd1)
3377
3378 fd2 = os.dup(fd1)
3379 self.addCleanup(os.close, fd2)
3380 self.assertEqual(os.get_inheritable(fd2), False)
3381
Zackery Spytz28fca0c2019-06-17 01:17:14 -06003382 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3383 def test_dup_nul(self):
3384 # os.dup() was creating inheritable fds for character files.
3385 fd1 = os.open('NUL', os.O_RDONLY)
3386 self.addCleanup(os.close, fd1)
3387 fd2 = os.dup(fd1)
3388 self.addCleanup(os.close, fd2)
3389 self.assertFalse(os.get_inheritable(fd2))
3390
Victor Stinnerdaf45552013-08-28 00:53:59 +02003391 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3392 def test_dup2(self):
3393 fd = os.open(__file__, os.O_RDONLY)
3394 self.addCleanup(os.close, fd)
3395
3396 # inheritable by default
3397 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003398 self.addCleanup(os.close, fd2)
3399 self.assertEqual(os.dup2(fd, fd2), fd2)
3400 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003401
3402 # force non-inheritable
3403 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003404 self.addCleanup(os.close, fd3)
3405 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3406 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003407
3408 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3409 def test_openpty(self):
3410 master_fd, slave_fd = os.openpty()
3411 self.addCleanup(os.close, master_fd)
3412 self.addCleanup(os.close, slave_fd)
3413 self.assertEqual(os.get_inheritable(master_fd), False)
3414 self.assertEqual(os.get_inheritable(slave_fd), False)
3415
3416
Brett Cannon3f9183b2016-08-26 14:44:48 -07003417class PathTConverterTests(unittest.TestCase):
3418 # tuples of (function name, allows fd arguments, additional arguments to
3419 # function, cleanup function)
3420 functions = [
3421 ('stat', True, (), None),
3422 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003423 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003424 ('chflags', False, (0,), None),
3425 ('lchflags', False, (0,), None),
3426 ('open', False, (0,), getattr(os, 'close', None)),
3427 ]
3428
3429 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003430 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003431 if os.name == 'nt':
3432 bytes_fspath = bytes_filename = None
3433 else:
3434 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003435 bytes_fspath = FakePath(bytes_filename)
3436 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003437 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003438 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003439
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003440 int_fspath = FakePath(fd)
3441 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003442
3443 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3444 with self.subTest(name=name):
3445 try:
3446 fn = getattr(os, name)
3447 except AttributeError:
3448 continue
3449
Brett Cannon8f96a302016-08-26 19:30:11 -07003450 for path in (str_filename, bytes_filename, str_fspath,
3451 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003452 if path is None:
3453 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003454 with self.subTest(name=name, path=path):
3455 result = fn(path, *extra_args)
3456 if cleanup_fn is not None:
3457 cleanup_fn(result)
3458
3459 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003460 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003461 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003462
3463 if allow_fd:
3464 result = fn(fd, *extra_args) # should not fail
3465 if cleanup_fn is not None:
3466 cleanup_fn(result)
3467 else:
3468 with self.assertRaisesRegex(
3469 TypeError,
3470 'os.PathLike'):
3471 fn(fd, *extra_args)
3472
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003473 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003474 msg = r'__fspath__\(\) to return str or bytes, not %s'
3475 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003476 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003477 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003478 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003479 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003480 os.stat(FakePath(object()))
3481
Brett Cannon3f9183b2016-08-26 14:44:48 -07003482
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003483@unittest.skipUnless(hasattr(os, 'get_blocking'),
3484 'needs os.get_blocking() and os.set_blocking()')
3485class BlockingTests(unittest.TestCase):
3486 def test_blocking(self):
3487 fd = os.open(__file__, os.O_RDONLY)
3488 self.addCleanup(os.close, fd)
3489 self.assertEqual(os.get_blocking(fd), True)
3490
3491 os.set_blocking(fd, False)
3492 self.assertEqual(os.get_blocking(fd), False)
3493
3494 os.set_blocking(fd, True)
3495 self.assertEqual(os.get_blocking(fd), True)
3496
3497
Yury Selivanov97e2e062014-09-26 12:33:06 -04003498
3499class ExportsTests(unittest.TestCase):
3500 def test_os_all(self):
3501 self.assertIn('open', os.__all__)
3502 self.assertIn('walk', os.__all__)
3503
3504
Victor Stinner6036e442015-03-08 01:58:04 +01003505class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003506 check_no_resource_warning = support.check_no_resource_warning
3507
Victor Stinner6036e442015-03-08 01:58:04 +01003508 def setUp(self):
3509 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003510 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003511 self.addCleanup(support.rmtree, self.path)
3512 os.mkdir(self.path)
3513
3514 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003515 path = self.bytes_path if isinstance(name, bytes) else self.path
3516 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003517 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003518 return filename
3519
3520 def get_entries(self, names):
3521 entries = dict((entry.name, entry)
3522 for entry in os.scandir(self.path))
3523 self.assertEqual(sorted(entries.keys()), names)
3524 return entries
3525
3526 def assert_stat_equal(self, stat1, stat2, skip_fields):
3527 if skip_fields:
3528 for attr in dir(stat1):
3529 if not attr.startswith("st_"):
3530 continue
3531 if attr in ("st_dev", "st_ino", "st_nlink"):
3532 continue
3533 self.assertEqual(getattr(stat1, attr),
3534 getattr(stat2, attr),
3535 (stat1, stat2, attr))
3536 else:
3537 self.assertEqual(stat1, stat2)
3538
3539 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003540 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003541 self.assertEqual(entry.name, name)
3542 self.assertEqual(entry.path, os.path.join(self.path, name))
3543 self.assertEqual(entry.inode(),
3544 os.stat(entry.path, follow_symlinks=False).st_ino)
3545
3546 entry_stat = os.stat(entry.path)
3547 self.assertEqual(entry.is_dir(),
3548 stat.S_ISDIR(entry_stat.st_mode))
3549 self.assertEqual(entry.is_file(),
3550 stat.S_ISREG(entry_stat.st_mode))
3551 self.assertEqual(entry.is_symlink(),
3552 os.path.islink(entry.path))
3553
3554 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3555 self.assertEqual(entry.is_dir(follow_symlinks=False),
3556 stat.S_ISDIR(entry_lstat.st_mode))
3557 self.assertEqual(entry.is_file(follow_symlinks=False),
3558 stat.S_ISREG(entry_lstat.st_mode))
3559
3560 self.assert_stat_equal(entry.stat(),
3561 entry_stat,
3562 os.name == 'nt' and not is_symlink)
3563 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3564 entry_lstat,
3565 os.name == 'nt')
3566
3567 def test_attributes(self):
3568 link = hasattr(os, 'link')
3569 symlink = support.can_symlink()
3570
3571 dirname = os.path.join(self.path, "dir")
3572 os.mkdir(dirname)
3573 filename = self.create_file("file.txt")
3574 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003575 try:
3576 os.link(filename, os.path.join(self.path, "link_file.txt"))
3577 except PermissionError as e:
3578 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003579 if symlink:
3580 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3581 target_is_directory=True)
3582 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3583
3584 names = ['dir', 'file.txt']
3585 if link:
3586 names.append('link_file.txt')
3587 if symlink:
3588 names.extend(('symlink_dir', 'symlink_file.txt'))
3589 entries = self.get_entries(names)
3590
3591 entry = entries['dir']
3592 self.check_entry(entry, 'dir', True, False, False)
3593
3594 entry = entries['file.txt']
3595 self.check_entry(entry, 'file.txt', False, True, False)
3596
3597 if link:
3598 entry = entries['link_file.txt']
3599 self.check_entry(entry, 'link_file.txt', False, True, False)
3600
3601 if symlink:
3602 entry = entries['symlink_dir']
3603 self.check_entry(entry, 'symlink_dir', True, False, True)
3604
3605 entry = entries['symlink_file.txt']
3606 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3607
3608 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003609 path = self.bytes_path if isinstance(name, bytes) else self.path
3610 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003611 self.assertEqual(len(entries), 1)
3612
3613 entry = entries[0]
3614 self.assertEqual(entry.name, name)
3615 return entry
3616
Brett Cannon96881cd2016-06-10 14:37:21 -07003617 def create_file_entry(self, name='file.txt'):
3618 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003619 return self.get_entry(os.path.basename(filename))
3620
3621 def test_current_directory(self):
3622 filename = self.create_file()
3623 old_dir = os.getcwd()
3624 try:
3625 os.chdir(self.path)
3626
3627 # call scandir() without parameter: it must list the content
3628 # of the current directory
3629 entries = dict((entry.name, entry) for entry in os.scandir())
3630 self.assertEqual(sorted(entries.keys()),
3631 [os.path.basename(filename)])
3632 finally:
3633 os.chdir(old_dir)
3634
3635 def test_repr(self):
3636 entry = self.create_file_entry()
3637 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3638
Brett Cannon96881cd2016-06-10 14:37:21 -07003639 def test_fspath_protocol(self):
3640 entry = self.create_file_entry()
3641 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3642
3643 def test_fspath_protocol_bytes(self):
3644 bytes_filename = os.fsencode('bytesfile.txt')
3645 bytes_entry = self.create_file_entry(name=bytes_filename)
3646 fspath = os.fspath(bytes_entry)
3647 self.assertIsInstance(fspath, bytes)
3648 self.assertEqual(fspath,
3649 os.path.join(os.fsencode(self.path),bytes_filename))
3650
Victor Stinner6036e442015-03-08 01:58:04 +01003651 def test_removed_dir(self):
3652 path = os.path.join(self.path, 'dir')
3653
3654 os.mkdir(path)
3655 entry = self.get_entry('dir')
3656 os.rmdir(path)
3657
3658 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3659 if os.name == 'nt':
3660 self.assertTrue(entry.is_dir())
3661 self.assertFalse(entry.is_file())
3662 self.assertFalse(entry.is_symlink())
3663 if os.name == 'nt':
3664 self.assertRaises(FileNotFoundError, entry.inode)
3665 # don't fail
3666 entry.stat()
3667 entry.stat(follow_symlinks=False)
3668 else:
3669 self.assertGreater(entry.inode(), 0)
3670 self.assertRaises(FileNotFoundError, entry.stat)
3671 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3672
3673 def test_removed_file(self):
3674 entry = self.create_file_entry()
3675 os.unlink(entry.path)
3676
3677 self.assertFalse(entry.is_dir())
3678 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3679 if os.name == 'nt':
3680 self.assertTrue(entry.is_file())
3681 self.assertFalse(entry.is_symlink())
3682 if os.name == 'nt':
3683 self.assertRaises(FileNotFoundError, entry.inode)
3684 # don't fail
3685 entry.stat()
3686 entry.stat(follow_symlinks=False)
3687 else:
3688 self.assertGreater(entry.inode(), 0)
3689 self.assertRaises(FileNotFoundError, entry.stat)
3690 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3691
3692 def test_broken_symlink(self):
3693 if not support.can_symlink():
3694 return self.skipTest('cannot create symbolic link')
3695
3696 filename = self.create_file("file.txt")
3697 os.symlink(filename,
3698 os.path.join(self.path, "symlink.txt"))
3699 entries = self.get_entries(['file.txt', 'symlink.txt'])
3700 entry = entries['symlink.txt']
3701 os.unlink(filename)
3702
3703 self.assertGreater(entry.inode(), 0)
3704 self.assertFalse(entry.is_dir())
3705 self.assertFalse(entry.is_file()) # broken symlink returns False
3706 self.assertFalse(entry.is_dir(follow_symlinks=False))
3707 self.assertFalse(entry.is_file(follow_symlinks=False))
3708 self.assertTrue(entry.is_symlink())
3709 self.assertRaises(FileNotFoundError, entry.stat)
3710 # don't fail
3711 entry.stat(follow_symlinks=False)
3712
3713 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003714 self.create_file("file.txt")
3715
3716 path_bytes = os.fsencode(self.path)
3717 entries = list(os.scandir(path_bytes))
3718 self.assertEqual(len(entries), 1, entries)
3719 entry = entries[0]
3720
3721 self.assertEqual(entry.name, b'file.txt')
3722 self.assertEqual(entry.path,
3723 os.fsencode(os.path.join(self.path, 'file.txt')))
3724
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003725 def test_bytes_like(self):
3726 self.create_file("file.txt")
3727
3728 for cls in bytearray, memoryview:
3729 path_bytes = cls(os.fsencode(self.path))
3730 with self.assertWarns(DeprecationWarning):
3731 entries = list(os.scandir(path_bytes))
3732 self.assertEqual(len(entries), 1, entries)
3733 entry = entries[0]
3734
3735 self.assertEqual(entry.name, b'file.txt')
3736 self.assertEqual(entry.path,
3737 os.fsencode(os.path.join(self.path, 'file.txt')))
3738 self.assertIs(type(entry.name), bytes)
3739 self.assertIs(type(entry.path), bytes)
3740
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003741 @unittest.skipUnless(os.listdir in os.supports_fd,
3742 'fd support for listdir required for this test.')
3743 def test_fd(self):
3744 self.assertIn(os.scandir, os.supports_fd)
3745 self.create_file('file.txt')
3746 expected_names = ['file.txt']
3747 if support.can_symlink():
3748 os.symlink('file.txt', os.path.join(self.path, 'link'))
3749 expected_names.append('link')
3750
3751 fd = os.open(self.path, os.O_RDONLY)
3752 try:
3753 with os.scandir(fd) as it:
3754 entries = list(it)
3755 names = [entry.name for entry in entries]
3756 self.assertEqual(sorted(names), expected_names)
3757 self.assertEqual(names, os.listdir(fd))
3758 for entry in entries:
3759 self.assertEqual(entry.path, entry.name)
3760 self.assertEqual(os.fspath(entry), entry.name)
3761 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3762 if os.stat in os.supports_dir_fd:
3763 st = os.stat(entry.name, dir_fd=fd)
3764 self.assertEqual(entry.stat(), st)
3765 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3766 self.assertEqual(entry.stat(follow_symlinks=False), st)
3767 finally:
3768 os.close(fd)
3769
Victor Stinner6036e442015-03-08 01:58:04 +01003770 def test_empty_path(self):
3771 self.assertRaises(FileNotFoundError, os.scandir, '')
3772
3773 def test_consume_iterator_twice(self):
3774 self.create_file("file.txt")
3775 iterator = os.scandir(self.path)
3776
3777 entries = list(iterator)
3778 self.assertEqual(len(entries), 1, entries)
3779
3780 # check than consuming the iterator twice doesn't raise exception
3781 entries2 = list(iterator)
3782 self.assertEqual(len(entries2), 0, entries2)
3783
3784 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003785 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003786 self.assertRaises(TypeError, os.scandir, obj)
3787
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003788 def test_close(self):
3789 self.create_file("file.txt")
3790 self.create_file("file2.txt")
3791 iterator = os.scandir(self.path)
3792 next(iterator)
3793 iterator.close()
3794 # multiple closes
3795 iterator.close()
3796 with self.check_no_resource_warning():
3797 del iterator
3798
3799 def test_context_manager(self):
3800 self.create_file("file.txt")
3801 self.create_file("file2.txt")
3802 with os.scandir(self.path) as iterator:
3803 next(iterator)
3804 with self.check_no_resource_warning():
3805 del iterator
3806
3807 def test_context_manager_close(self):
3808 self.create_file("file.txt")
3809 self.create_file("file2.txt")
3810 with os.scandir(self.path) as iterator:
3811 next(iterator)
3812 iterator.close()
3813
3814 def test_context_manager_exception(self):
3815 self.create_file("file.txt")
3816 self.create_file("file2.txt")
3817 with self.assertRaises(ZeroDivisionError):
3818 with os.scandir(self.path) as iterator:
3819 next(iterator)
3820 1/0
3821 with self.check_no_resource_warning():
3822 del iterator
3823
3824 def test_resource_warning(self):
3825 self.create_file("file.txt")
3826 self.create_file("file2.txt")
3827 iterator = os.scandir(self.path)
3828 next(iterator)
3829 with self.assertWarns(ResourceWarning):
3830 del iterator
3831 support.gc_collect()
3832 # exhausted iterator
3833 iterator = os.scandir(self.path)
3834 list(iterator)
3835 with self.check_no_resource_warning():
3836 del iterator
3837
Victor Stinner6036e442015-03-08 01:58:04 +01003838
Ethan Furmancdc08792016-06-02 15:06:09 -07003839class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003840
3841 # Abstracted so it can be overridden to test pure Python implementation
3842 # if a C version is provided.
3843 fspath = staticmethod(os.fspath)
3844
Ethan Furmancdc08792016-06-02 15:06:09 -07003845 def test_return_bytes(self):
3846 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003847 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003848
3849 def test_return_string(self):
3850 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003851 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003852
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003853 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003854 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003855 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003856
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003857 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003858 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3859 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3860
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003861 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003862 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3863 self.assertTrue(issubclass(FakePath, os.PathLike))
3864 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003865
Ethan Furmancdc08792016-06-02 15:06:09 -07003866 def test_garbage_in_exception_out(self):
3867 vapor = type('blah', (), {})
3868 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003869 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003870
3871 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003872 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003873
Brett Cannon044283a2016-07-15 10:41:49 -07003874 def test_bad_pathlike(self):
3875 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003876 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003877 # __fspath__ attribute that is not callable.
3878 c = type('foo', (), {})
3879 c.__fspath__ = 1
3880 self.assertRaises(TypeError, self.fspath, c())
3881 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003882 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003883 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003884
Victor Stinnerc29b5852017-11-02 07:28:27 -07003885
3886class TimesTests(unittest.TestCase):
3887 def test_times(self):
3888 times = os.times()
3889 self.assertIsInstance(times, os.times_result)
3890
3891 for field in ('user', 'system', 'children_user', 'children_system',
3892 'elapsed'):
3893 value = getattr(times, field)
3894 self.assertIsInstance(value, float)
3895
3896 if os.name == 'nt':
3897 self.assertEqual(times.children_user, 0)
3898 self.assertEqual(times.children_system, 0)
3899 self.assertEqual(times.elapsed, 0)
3900
3901
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003902# Only test if the C version is provided, otherwise TestPEP519 already tested
3903# the pure Python implementation.
3904if hasattr(os, "_fspath"):
3905 class TestPEP519PurePython(TestPEP519):
3906
3907 """Explicitly test the pure Python implementation of os.fspath()."""
3908
3909 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003910
3911
Fred Drake2e2be372001-09-20 21:33:42 +00003912if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003913 unittest.main()