blob: 4c620ccae9c84ceeb1859eaa138e3eeeabbaf6bf [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020064from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Victor Stinnerae39d232016-03-24 17:12:55 +010088def create_file(filename, content=b'content'):
89 with open(filename, "xb", 0) as fp:
90 fp.write(content)
91
92
Thomas Wouters0e3f5912006-08-11 14:57:12 +000093# Tests creating TESTFN
94class FileTests(unittest.TestCase):
95 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000096 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000097 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000098 tearDown = setUp
99
100 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000101 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000102 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000103 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000104
Christian Heimesfdab48e2008-01-20 09:06:41 +0000105 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000106 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
107 # We must allocate two consecutive file descriptors, otherwise
108 # it will mess up other file descriptors (perhaps even the three
109 # standard ones).
110 second = os.dup(first)
111 try:
112 retries = 0
113 while second != first + 1:
114 os.close(first)
115 retries += 1
116 if retries > 10:
117 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000118 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000119 first, second = second, os.dup(second)
120 finally:
121 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000122 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000123 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000124 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000126 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000127 def test_rename(self):
128 path = support.TESTFN
129 old = sys.getrefcount(path)
130 self.assertRaises(TypeError, os.rename, path, 0)
131 new = sys.getrefcount(path)
132 self.assertEqual(old, new)
133
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000134 def test_read(self):
135 with open(support.TESTFN, "w+b") as fobj:
136 fobj.write(b"spam")
137 fobj.flush()
138 fd = fobj.fileno()
139 os.lseek(fd, 0, 0)
140 s = os.read(fd, 4)
141 self.assertEqual(type(s), bytes)
142 self.assertEqual(s, b"spam")
143
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200144 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200145 # Skip the test on 32-bit platforms: the number of bytes must fit in a
146 # Py_ssize_t type
147 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
148 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200149 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
150 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200151 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100152 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200153
154 # Issue #21932: Make sure that os.read() does not raise an
155 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200156 with open(support.TESTFN, "rb") as fp:
157 data = os.read(fp.fileno(), size)
158
Victor Stinner8c663fd2017-11-08 14:44:44 -0800159 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200160 # operating system is free to return less bytes than requested.
161 self.assertEqual(data, b'test')
162
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000163 def test_write(self):
164 # os.write() accepts bytes- and buffer-like objects but not strings
165 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
166 self.assertRaises(TypeError, os.write, fd, "beans")
167 os.write(fd, b"bacon\n")
168 os.write(fd, bytearray(b"eggs\n"))
169 os.write(fd, memoryview(b"spam\n"))
170 os.close(fd)
171 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000172 self.assertEqual(fobj.read().splitlines(),
173 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000174
Victor Stinnere0daff12011-03-20 23:36:35 +0100175 def write_windows_console(self, *args):
176 retcode = subprocess.call(args,
177 # use a new console to not flood the test output
178 creationflags=subprocess.CREATE_NEW_CONSOLE,
179 # use a shell to hide the console window (SW_HIDE)
180 shell=True)
181 self.assertEqual(retcode, 0)
182
183 @unittest.skipUnless(sys.platform == 'win32',
184 'test specific to the Windows console')
185 def test_write_windows_console(self):
186 # Issue #11395: the Windows console returns an error (12: not enough
187 # space error) on writing into stdout if stdout mode is binary and the
188 # length is greater than 66,000 bytes (or less, depending on heap
189 # usage).
190 code = "print('x' * 100000)"
191 self.write_windows_console(sys.executable, "-c", code)
192 self.write_windows_console(sys.executable, "-u", "-c", code)
193
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000194 def fdopen_helper(self, *args):
195 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200196 f = os.fdopen(fd, *args)
197 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000198
199 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200200 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
201 os.close(fd)
202
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000203 self.fdopen_helper()
204 self.fdopen_helper('r')
205 self.fdopen_helper('r', 100)
206
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100207 def test_replace(self):
208 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 self.addCleanup(support.unlink, support.TESTFN)
210 self.addCleanup(support.unlink, TESTFN2)
211
212 create_file(support.TESTFN, b"1")
213 create_file(TESTFN2, b"2")
214
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100215 os.replace(support.TESTFN, TESTFN2)
216 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
217 with open(TESTFN2, 'r') as f:
218 self.assertEqual(f.read(), "1")
219
Martin Panterbf19d162015-09-09 01:01:13 +0000220 def test_open_keywords(self):
221 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
222 dir_fd=None)
223 os.close(f)
224
225 def test_symlink_keywords(self):
226 symlink = support.get_attribute(os, "symlink")
227 try:
228 symlink(src='target', dst=support.TESTFN,
229 target_is_directory=False, dir_fd=None)
230 except (NotImplementedError, OSError):
231 pass # No OS support or unprivileged user
232
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200233
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234# Test attributes on return values from os.*stat* family.
235class StatAttributeTests(unittest.TestCase):
236 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200237 self.fname = support.TESTFN
238 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100239 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000242 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243
244 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(result[stat.ST_SIZE], 3)
246 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248 # Make sure all the attributes are there
249 members = dir(result)
250 for name in dir(stat):
251 if name[:3] == 'ST_':
252 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000253 if name.endswith("TIME"):
254 def trunc(x): return int(x)
255 else:
256 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000259 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260
Larry Hastings6fe20b32012-04-19 15:07:49 -0700261 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700262 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700263 for name in 'st_atime st_mtime st_ctime'.split():
264 floaty = int(getattr(result, name) * 100000)
265 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700266 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700267
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268 try:
269 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200270 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271 except IndexError:
272 pass
273
274 # Make sure that assignment fails
275 try:
276 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200277 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000278 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000279 pass
280
281 try:
282 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200283 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000284 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285 pass
286
287 try:
288 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200289 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000290 except AttributeError:
291 pass
292
293 # Use the stat_result constructor with a too-short tuple.
294 try:
295 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200296 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000297 except TypeError:
298 pass
299
Ezio Melotti42da6632011-03-15 05:18:48 +0200300 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 try:
302 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
303 except TypeError:
304 pass
305
Antoine Pitrou38425292010-09-21 18:19:07 +0000306 def test_stat_attributes(self):
307 self.check_stat_attributes(self.fname)
308
309 def test_stat_attributes_bytes(self):
310 try:
311 fname = self.fname.encode(sys.getfilesystemencoding())
312 except UnicodeEncodeError:
313 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700314 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000315
Christian Heimes25827622013-10-12 01:27:08 +0200316 def test_stat_result_pickle(self):
317 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200318 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
319 p = pickle.dumps(result, proto)
320 self.assertIn(b'stat_result', p)
321 if proto < 4:
322 self.assertIn(b'cos\nstat_result\n', p)
323 unpickled = pickle.loads(p)
324 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200325
Serhiy Storchaka43767632013-11-03 21:31:38 +0200326 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000327 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700328 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000329
330 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000331 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000333 # Make sure all the attributes are there.
334 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
335 'ffree', 'favail', 'flag', 'namemax')
336 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000337 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000338
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100339 self.assertTrue(isinstance(result.f_fsid, int))
340
341 # Test that the size of the tuple doesn't change
342 self.assertEqual(len(result), 10)
343
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344 # Make sure that assignment really fails
345 try:
346 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200347 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000348 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000349 pass
350
351 try:
352 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200353 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354 except AttributeError:
355 pass
356
357 # Use the constructor with a too-short tuple.
358 try:
359 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200360 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 except TypeError:
362 pass
363
Ezio Melotti42da6632011-03-15 05:18:48 +0200364 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 try:
366 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
367 except TypeError:
368 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000369
Christian Heimes25827622013-10-12 01:27:08 +0200370 @unittest.skipUnless(hasattr(os, 'statvfs'),
371 "need os.statvfs()")
372 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700373 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200374
Serhiy Storchakabad12572014-12-15 14:03:42 +0200375 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
376 p = pickle.dumps(result, proto)
377 self.assertIn(b'statvfs_result', p)
378 if proto < 4:
379 self.assertIn(b'cos\nstatvfs_result\n', p)
380 unpickled = pickle.loads(p)
381 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200382
Serhiy Storchaka43767632013-11-03 21:31:38 +0200383 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
384 def test_1686475(self):
385 # Verify that an open file can be stat'ed
386 try:
387 os.stat(r"c:\pagefile.sys")
388 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600389 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200390 except OSError as e:
391 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000392
Serhiy Storchaka43767632013-11-03 21:31:38 +0200393 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
394 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
395 def test_15261(self):
396 # Verify that stat'ing a closed fd does not cause crash
397 r, w = os.pipe()
398 try:
399 os.stat(r) # should not raise error
400 finally:
401 os.close(r)
402 os.close(w)
403 with self.assertRaises(OSError) as ctx:
404 os.stat(r)
405 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100406
Zachary Ware63f277b2014-06-19 09:46:37 -0500407 def check_file_attributes(self, result):
408 self.assertTrue(hasattr(result, 'st_file_attributes'))
409 self.assertTrue(isinstance(result.st_file_attributes, int))
410 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
411
412 @unittest.skipUnless(sys.platform == "win32",
413 "st_file_attributes is Win32 specific")
414 def test_file_attributes(self):
415 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
416 result = os.stat(self.fname)
417 self.check_file_attributes(result)
418 self.assertEqual(
419 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
420 0)
421
422 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200423 dirname = support.TESTFN + "dir"
424 os.mkdir(dirname)
425 self.addCleanup(os.rmdir, dirname)
426
427 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500428 self.check_file_attributes(result)
429 self.assertEqual(
430 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
431 stat.FILE_ATTRIBUTE_DIRECTORY)
432
Berker Peksag0b4dc482016-09-17 15:49:59 +0300433 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
434 def test_access_denied(self):
435 # Default to FindFirstFile WIN32_FIND_DATA when access is
436 # denied. See issue 28075.
437 # os.environ['TEMP'] should be located on a volume that
438 # supports file ACLs.
439 fname = os.path.join(os.environ['TEMP'], self.fname)
440 self.addCleanup(support.unlink, fname)
441 create_file(fname, b'ABC')
442 # Deny the right to [S]YNCHRONIZE on the file to
443 # force CreateFile to fail with ERROR_ACCESS_DENIED.
444 DETACHED_PROCESS = 8
445 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500446 # bpo-30584: Use security identifier *S-1-5-32-545 instead
447 # of localized "Users" to not depend on the locale.
448 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300449 creationflags=DETACHED_PROCESS
450 )
451 result = os.stat(fname)
452 self.assertNotEqual(result.st_size, 0)
453
Victor Stinner47aacc82015-06-12 17:26:23 +0200454
455class UtimeTests(unittest.TestCase):
456 def setUp(self):
457 self.dirname = support.TESTFN
458 self.fname = os.path.join(self.dirname, "f1")
459
460 self.addCleanup(support.rmtree, self.dirname)
461 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100462 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200463
Victor Stinner47aacc82015-06-12 17:26:23 +0200464 def support_subsecond(self, filename):
465 # Heuristic to check if the filesystem supports timestamp with
466 # subsecond resolution: check if float and int timestamps are different
467 st = os.stat(filename)
468 return ((st.st_atime != st[7])
469 or (st.st_mtime != st[8])
470 or (st.st_ctime != st[9]))
471
472 def _test_utime(self, set_time, filename=None):
473 if not filename:
474 filename = self.fname
475
476 support_subsecond = self.support_subsecond(filename)
477 if support_subsecond:
478 # Timestamp with a resolution of 1 microsecond (10^-6).
479 #
480 # The resolution of the C internal function used by os.utime()
481 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
482 # test with a resolution of 1 ns requires more work:
483 # see the issue #15745.
484 atime_ns = 1002003000 # 1.002003 seconds
485 mtime_ns = 4005006000 # 4.005006 seconds
486 else:
487 # use a resolution of 1 second
488 atime_ns = 5 * 10**9
489 mtime_ns = 8 * 10**9
490
491 set_time(filename, (atime_ns, mtime_ns))
492 st = os.stat(filename)
493
494 if support_subsecond:
495 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
496 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
497 else:
498 self.assertEqual(st.st_atime, atime_ns * 1e-9)
499 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
500 self.assertEqual(st.st_atime_ns, atime_ns)
501 self.assertEqual(st.st_mtime_ns, mtime_ns)
502
503 def test_utime(self):
504 def set_time(filename, ns):
505 # test the ns keyword parameter
506 os.utime(filename, ns=ns)
507 self._test_utime(set_time)
508
509 @staticmethod
510 def ns_to_sec(ns):
511 # Convert a number of nanosecond (int) to a number of seconds (float).
512 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
513 # issue, os.utime() rounds towards minus infinity.
514 return (ns * 1e-9) + 0.5e-9
515
516 def test_utime_by_indexed(self):
517 # pass times as floating point seconds as the second indexed parameter
518 def set_time(filename, ns):
519 atime_ns, mtime_ns = ns
520 atime = self.ns_to_sec(atime_ns)
521 mtime = self.ns_to_sec(mtime_ns)
522 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
523 # or utime(time_t)
524 os.utime(filename, (atime, mtime))
525 self._test_utime(set_time)
526
527 def test_utime_by_times(self):
528 def set_time(filename, ns):
529 atime_ns, mtime_ns = ns
530 atime = self.ns_to_sec(atime_ns)
531 mtime = self.ns_to_sec(mtime_ns)
532 # test the times keyword parameter
533 os.utime(filename, times=(atime, mtime))
534 self._test_utime(set_time)
535
536 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
537 "follow_symlinks support for utime required "
538 "for this test.")
539 def test_utime_nofollow_symlinks(self):
540 def set_time(filename, ns):
541 # use follow_symlinks=False to test utimensat(timespec)
542 # or lutimes(timeval)
543 os.utime(filename, ns=ns, follow_symlinks=False)
544 self._test_utime(set_time)
545
546 @unittest.skipUnless(os.utime in os.supports_fd,
547 "fd support for utime required for this test.")
548 def test_utime_fd(self):
549 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100550 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200551 # use a file descriptor to test futimens(timespec)
552 # or futimes(timeval)
553 os.utime(fp.fileno(), ns=ns)
554 self._test_utime(set_time)
555
556 @unittest.skipUnless(os.utime in os.supports_dir_fd,
557 "dir_fd support for utime required for this test.")
558 def test_utime_dir_fd(self):
559 def set_time(filename, ns):
560 dirname, name = os.path.split(filename)
561 dirfd = os.open(dirname, os.O_RDONLY)
562 try:
563 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
564 os.utime(name, dir_fd=dirfd, ns=ns)
565 finally:
566 os.close(dirfd)
567 self._test_utime(set_time)
568
569 def test_utime_directory(self):
570 def set_time(filename, ns):
571 # test calling os.utime() on a directory
572 os.utime(filename, ns=ns)
573 self._test_utime(set_time, filename=self.dirname)
574
575 def _test_utime_current(self, set_time):
576 # Get the system clock
577 current = time.time()
578
579 # Call os.utime() to set the timestamp to the current system clock
580 set_time(self.fname)
581
582 if not self.support_subsecond(self.fname):
583 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700584 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200585 # On Windows, the usual resolution of time.time() is 15.6 ms.
586 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700587 #
588 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
589 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200590 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200591 st = os.stat(self.fname)
592 msg = ("st_time=%r, current=%r, dt=%r"
593 % (st.st_mtime, current, st.st_mtime - current))
594 self.assertAlmostEqual(st.st_mtime, current,
595 delta=delta, msg=msg)
596
597 def test_utime_current(self):
598 def set_time(filename):
599 # Set to the current time in the new way
600 os.utime(self.fname)
601 self._test_utime_current(set_time)
602
603 def test_utime_current_old(self):
604 def set_time(filename):
605 # Set to the current time in the old explicit way.
606 os.utime(self.fname, None)
607 self._test_utime_current(set_time)
608
609 def get_file_system(self, path):
610 if sys.platform == 'win32':
611 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
612 import ctypes
613 kernel32 = ctypes.windll.kernel32
614 buf = ctypes.create_unicode_buffer("", 100)
615 ok = kernel32.GetVolumeInformationW(root, None, 0,
616 None, None, None,
617 buf, len(buf))
618 if ok:
619 return buf.value
620 # return None if the filesystem is unknown
621
622 def test_large_time(self):
623 # Many filesystems are limited to the year 2038. At least, the test
624 # pass with NTFS filesystem.
625 if self.get_file_system(self.dirname) != "NTFS":
626 self.skipTest("requires NTFS")
627
628 large = 5000000000 # some day in 2128
629 os.utime(self.fname, (large, large))
630 self.assertEqual(os.stat(self.fname).st_mtime, large)
631
632 def test_utime_invalid_arguments(self):
633 # seconds and nanoseconds parameters are mutually exclusive
634 with self.assertRaises(ValueError):
635 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200636 with self.assertRaises(TypeError):
637 os.utime(self.fname, [5, 5])
638 with self.assertRaises(TypeError):
639 os.utime(self.fname, (5,))
640 with self.assertRaises(TypeError):
641 os.utime(self.fname, (5, 5, 5))
642 with self.assertRaises(TypeError):
643 os.utime(self.fname, ns=[5, 5])
644 with self.assertRaises(TypeError):
645 os.utime(self.fname, ns=(5,))
646 with self.assertRaises(TypeError):
647 os.utime(self.fname, ns=(5, 5, 5))
648
649 if os.utime not in os.supports_follow_symlinks:
650 with self.assertRaises(NotImplementedError):
651 os.utime(self.fname, (5, 5), follow_symlinks=False)
652 if os.utime not in os.supports_fd:
653 with open(self.fname, 'wb', 0) as fp:
654 with self.assertRaises(TypeError):
655 os.utime(fp.fileno(), (5, 5))
656 if os.utime not in os.supports_dir_fd:
657 with self.assertRaises(NotImplementedError):
658 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200659
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300660 @support.cpython_only
661 def test_issue31577(self):
662 # The interpreter shouldn't crash in case utime() received a bad
663 # ns argument.
664 def get_bad_int(divmod_ret_val):
665 class BadInt:
666 def __divmod__(*args):
667 return divmod_ret_val
668 return BadInt()
669 with self.assertRaises(TypeError):
670 os.utime(self.fname, ns=(get_bad_int(42), 1))
671 with self.assertRaises(TypeError):
672 os.utime(self.fname, ns=(get_bad_int(()), 1))
673 with self.assertRaises(TypeError):
674 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
675
Victor Stinner47aacc82015-06-12 17:26:23 +0200676
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000677from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000679class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000680 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000681 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000682
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000683 def setUp(self):
684 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000685 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000686 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000687 for key, value in self._reference().items():
688 os.environ[key] = value
689
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000690 def tearDown(self):
691 os.environ.clear()
692 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000693 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000694 os.environb.clear()
695 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000696
Christian Heimes90333392007-11-01 19:08:42 +0000697 def _reference(self):
698 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
699
700 def _empty_mapping(self):
701 os.environ.clear()
702 return os.environ
703
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000704 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200705 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
706 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000707 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000708 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300709 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200710 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300711 value = popen.read().strip()
712 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000713
Xavier de Gayed1415312016-07-22 12:15:29 +0200714 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
715 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000716 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200717 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
718 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300719 it = iter(popen)
720 self.assertEqual(next(it), "line1\n")
721 self.assertEqual(next(it), "line2\n")
722 self.assertEqual(next(it), "line3\n")
723 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000724
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000725 # Verify environ keys and values from the OS are of the
726 # correct str type.
727 def test_keyvalue_types(self):
728 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000729 self.assertEqual(type(key), str)
730 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000731
Christian Heimes90333392007-11-01 19:08:42 +0000732 def test_items(self):
733 for key, value in self._reference().items():
734 self.assertEqual(os.environ.get(key), value)
735
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000736 # Issue 7310
737 def test___repr__(self):
738 """Check that the repr() of os.environ looks like environ({...})."""
739 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000740 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
741 '{!r}: {!r}'.format(key, value)
742 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000743
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000744 def test_get_exec_path(self):
745 defpath_list = os.defpath.split(os.pathsep)
746 test_path = ['/monty', '/python', '', '/flying/circus']
747 test_env = {'PATH': os.pathsep.join(test_path)}
748
749 saved_environ = os.environ
750 try:
751 os.environ = dict(test_env)
752 # Test that defaulting to os.environ works.
753 self.assertSequenceEqual(test_path, os.get_exec_path())
754 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
755 finally:
756 os.environ = saved_environ
757
758 # No PATH environment variable
759 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
760 # Empty PATH environment variable
761 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
762 # Supplied PATH environment variable
763 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
764
Victor Stinnerb745a742010-05-18 17:17:23 +0000765 if os.supports_bytes_environ:
766 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000767 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000768 # ignore BytesWarning warning
769 with warnings.catch_warnings(record=True):
770 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000771 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000772 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000773 pass
774 else:
775 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000776
777 # bytes key and/or value
778 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
779 ['abc'])
780 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
781 ['abc'])
782 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
783 ['abc'])
784
785 @unittest.skipUnless(os.supports_bytes_environ,
786 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000787 def test_environb(self):
788 # os.environ -> os.environb
789 value = 'euro\u20ac'
790 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000791 value_bytes = value.encode(sys.getfilesystemencoding(),
792 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000793 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000794 msg = "U+20AC character is not encodable to %s" % (
795 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000796 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000797 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000798 self.assertEqual(os.environ['unicode'], value)
799 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800
801 # os.environb -> os.environ
802 value = b'\xff'
803 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000805 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000807
Victor Stinner13ff2452018-01-22 18:32:50 +0100808 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100809 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100810 def test_unset_error(self):
811 if sys.platform == "win32":
812 # an environment variable is limited to 32,767 characters
813 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100814 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100815 else:
816 # "=" is not allowed in a variable name
817 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100818 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100819
Victor Stinner6d101392013-04-14 16:35:04 +0200820 def test_key_type(self):
821 missing = 'missingkey'
822 self.assertNotIn(missing, os.environ)
823
Victor Stinner839e5ea2013-04-14 16:43:03 +0200824 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200825 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200827 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200828
Victor Stinner839e5ea2013-04-14 16:43:03 +0200829 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200830 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200831 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200832 self.assertTrue(cm.exception.__suppress_context__)
833
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300834 def _test_environ_iteration(self, collection):
835 iterator = iter(collection)
836 new_key = "__new_key__"
837
838 next(iterator) # start iteration over os.environ.items
839
840 # add a new key in os.environ mapping
841 os.environ[new_key] = "test_environ_iteration"
842
843 try:
844 next(iterator) # force iteration over modified mapping
845 self.assertEqual(os.environ[new_key], "test_environ_iteration")
846 finally:
847 del os.environ[new_key]
848
849 def test_iter_error_when_changing_os_environ(self):
850 self._test_environ_iteration(os.environ)
851
852 def test_iter_error_when_changing_os_environ_items(self):
853 self._test_environ_iteration(os.environ.items())
854
855 def test_iter_error_when_changing_os_environ_values(self):
856 self._test_environ_iteration(os.environ.values())
857
Victor Stinner6d101392013-04-14 16:35:04 +0200858
Tim Petersc4e09402003-04-25 07:11:48 +0000859class WalkTests(unittest.TestCase):
860 """Tests for os.walk()."""
861
Victor Stinner0561c532015-03-12 10:28:24 +0100862 # Wrapper to hide minor differences between os.walk and os.fwalk
863 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200864 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200865 if 'follow_symlinks' in kwargs:
866 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200867 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100868
Charles-François Natali7372b062012-02-05 15:15:38 +0100869 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100870 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100871 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000872
873 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000874 # TESTFN/
875 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000876 # tmp1
877 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000878 # tmp2
879 # SUB11/ no kids
880 # SUB2/ a file kid and a dirsymlink kid
881 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300882 # SUB21/ not readable
883 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000884 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200885 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300886 # broken_link2
887 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000888 # TEST2/
889 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100890 self.walk_path = join(support.TESTFN, "TEST1")
891 self.sub1_path = join(self.walk_path, "SUB1")
892 self.sub11_path = join(self.sub1_path, "SUB11")
893 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300894 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100895 tmp1_path = join(self.walk_path, "tmp1")
896 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000897 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300898 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100899 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000900 t2_path = join(support.TESTFN, "TEST2")
901 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200902 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300903 broken_link2_path = join(sub2_path, "broken_link2")
904 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000905
906 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100907 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000908 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300909 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000910 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100911
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300912 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100913 with open(path, "x") as f:
914 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000915
Victor Stinner0561c532015-03-12 10:28:24 +0100916 if support.can_symlink():
917 os.symlink(os.path.abspath(t2_path), self.link_path)
918 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300919 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
920 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300921 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300922 ["broken_link", "broken_link2", "broken_link3",
923 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100924 else:
pxinwr3e028b22019-02-15 13:04:47 +0800925 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100926
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300927 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300928 try:
929 os.listdir(sub21_path)
930 except PermissionError:
931 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
932 else:
933 os.chmod(sub21_path, stat.S_IRWXU)
934 os.unlink(tmp5_path)
935 os.rmdir(sub21_path)
936 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300937
Victor Stinner0561c532015-03-12 10:28:24 +0100938 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000939 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300940 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100941
Tim Petersc4e09402003-04-25 07:11:48 +0000942 self.assertEqual(len(all), 4)
943 # We can't know which order SUB1 and SUB2 will appear in.
944 # Not flipped: TESTFN, SUB1, SUB11, SUB2
945 # flipped: TESTFN, SUB2, SUB1, SUB11
946 flipped = all[0][1][0] != "SUB1"
947 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200948 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300949 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100950 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
951 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
952 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
953 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000954
Brett Cannon3f9183b2016-08-26 14:44:48 -0700955 def test_walk_prune(self, walk_path=None):
956 if walk_path is None:
957 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000958 # Prune the search.
959 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700960 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000961 all.append((root, dirs, files))
962 # Don't descend into SUB1.
963 if 'SUB1' in dirs:
964 # Note that this also mutates the dirs we appended to all!
965 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000966
Victor Stinner0561c532015-03-12 10:28:24 +0100967 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200968 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100969
970 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300971 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100972 self.assertEqual(all[1], self.sub2_tree)
973
Brett Cannon3f9183b2016-08-26 14:44:48 -0700974 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200975 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700976
Victor Stinner0561c532015-03-12 10:28:24 +0100977 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000978 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100979 all = list(self.walk(self.walk_path, topdown=False))
980
Victor Stinner53b0a412016-03-26 01:12:36 +0100981 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000982 # We can't know which order SUB1 and SUB2 will appear in.
983 # Not flipped: SUB11, SUB1, SUB2, TESTFN
984 # flipped: SUB2, SUB11, SUB1, TESTFN
985 flipped = all[3][1][0] != "SUB1"
986 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200987 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300988 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100989 self.assertEqual(all[3],
990 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
991 self.assertEqual(all[flipped],
992 (self.sub11_path, [], []))
993 self.assertEqual(all[flipped + 1],
994 (self.sub1_path, ["SUB11"], ["tmp2"]))
995 self.assertEqual(all[2 - 2 * flipped],
996 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000997
Victor Stinner0561c532015-03-12 10:28:24 +0100998 def test_walk_symlink(self):
999 if not support.can_symlink():
1000 self.skipTest("need symlink support")
1001
1002 # Walk, following symlinks.
1003 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1004 for root, dirs, files in walk_it:
1005 if root == self.link_path:
1006 self.assertEqual(dirs, [])
1007 self.assertEqual(files, ["tmp4"])
1008 break
1009 else:
1010 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001011
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001012 def test_walk_bad_dir(self):
1013 # Walk top-down.
1014 errors = []
1015 walk_it = self.walk(self.walk_path, onerror=errors.append)
1016 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001017 self.assertEqual(errors, [])
1018 dir1 = 'SUB1'
1019 path1 = os.path.join(root, dir1)
1020 path1new = os.path.join(root, dir1 + '.new')
1021 os.rename(path1, path1new)
1022 try:
1023 roots = [r for r, d, f in walk_it]
1024 self.assertTrue(errors)
1025 self.assertNotIn(path1, roots)
1026 self.assertNotIn(path1new, roots)
1027 for dir2 in dirs:
1028 if dir2 != dir1:
1029 self.assertIn(os.path.join(root, dir2), roots)
1030 finally:
1031 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001032
Charles-François Natali7372b062012-02-05 15:15:38 +01001033
1034@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1035class FwalkTests(WalkTests):
1036 """Tests for os.fwalk()."""
1037
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001038 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001039 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001040 yield (root, dirs, files)
1041
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001042 def fwalk(self, *args, **kwargs):
1043 return os.fwalk(*args, **kwargs)
1044
Larry Hastingsc48fe982012-06-25 04:49:05 -07001045 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1046 """
1047 compare with walk() results.
1048 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001049 walk_kwargs = walk_kwargs.copy()
1050 fwalk_kwargs = fwalk_kwargs.copy()
1051 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1052 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1053 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001054
Charles-François Natali7372b062012-02-05 15:15:38 +01001055 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001056 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001057 expected[root] = (set(dirs), set(files))
1058
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001059 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001060 self.assertIn(root, expected)
1061 self.assertEqual(expected[root], (set(dirs), set(files)))
1062
Larry Hastingsc48fe982012-06-25 04:49:05 -07001063 def test_compare_to_walk(self):
1064 kwargs = {'top': support.TESTFN}
1065 self._compare_to_walk(kwargs, kwargs)
1066
Charles-François Natali7372b062012-02-05 15:15:38 +01001067 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001068 try:
1069 fd = os.open(".", os.O_RDONLY)
1070 walk_kwargs = {'top': support.TESTFN}
1071 fwalk_kwargs = walk_kwargs.copy()
1072 fwalk_kwargs['dir_fd'] = fd
1073 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1074 finally:
1075 os.close(fd)
1076
1077 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001078 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001079 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1080 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001081 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001082 # check that the FD is valid
1083 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001084 # redundant check
1085 os.stat(rootfd)
1086 # check that listdir() returns consistent information
1087 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001088
1089 def test_fd_leak(self):
1090 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1091 # we both check that calling fwalk() a large number of times doesn't
1092 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1093 minfd = os.dup(1)
1094 os.close(minfd)
1095 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001096 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001097 pass
1098 newfd = os.dup(1)
1099 self.addCleanup(os.close, newfd)
1100 self.assertEqual(newfd, minfd)
1101
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001102class BytesWalkTests(WalkTests):
1103 """Tests for os.walk() with bytes."""
1104 def walk(self, top, **kwargs):
1105 if 'follow_symlinks' in kwargs:
1106 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1107 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1108 root = os.fsdecode(broot)
1109 dirs = list(map(os.fsdecode, bdirs))
1110 files = list(map(os.fsdecode, bfiles))
1111 yield (root, dirs, files)
1112 bdirs[:] = list(map(os.fsencode, dirs))
1113 bfiles[:] = list(map(os.fsencode, files))
1114
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001115@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1116class BytesFwalkTests(FwalkTests):
1117 """Tests for os.walk() with bytes."""
1118 def fwalk(self, top='.', *args, **kwargs):
1119 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1120 root = os.fsdecode(broot)
1121 dirs = list(map(os.fsdecode, bdirs))
1122 files = list(map(os.fsdecode, bfiles))
1123 yield (root, dirs, files, topfd)
1124 bdirs[:] = list(map(os.fsencode, dirs))
1125 bfiles[:] = list(map(os.fsencode, files))
1126
Charles-François Natali7372b062012-02-05 15:15:38 +01001127
Guido van Rossume7ba4952007-06-06 23:52:48 +00001128class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001129 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001130 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001131
1132 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001133 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001134 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1135 os.makedirs(path) # Should work
1136 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1137 os.makedirs(path)
1138
1139 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001140 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001141 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1142 os.makedirs(path)
1143 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1144 'dir5', 'dir6')
1145 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001146
Serhiy Storchakae304e332017-03-24 13:27:42 +02001147 def test_mode(self):
1148 with support.temp_umask(0o002):
1149 base = support.TESTFN
1150 parent = os.path.join(base, 'dir1')
1151 path = os.path.join(parent, 'dir2')
1152 os.makedirs(path, 0o555)
1153 self.assertTrue(os.path.exists(path))
1154 self.assertTrue(os.path.isdir(path))
1155 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001156 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1157 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001158
Terry Reedy5a22b652010-12-02 07:05:56 +00001159 def test_exist_ok_existing_directory(self):
1160 path = os.path.join(support.TESTFN, 'dir1')
1161 mode = 0o777
1162 old_mask = os.umask(0o022)
1163 os.makedirs(path, mode)
1164 self.assertRaises(OSError, os.makedirs, path, mode)
1165 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001166 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001167 os.makedirs(path, mode=mode, exist_ok=True)
1168 os.umask(old_mask)
1169
Martin Pantera82642f2015-11-19 04:48:44 +00001170 # Issue #25583: A drive root could raise PermissionError on Windows
1171 os.makedirs(os.path.abspath('/'), exist_ok=True)
1172
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001173 def test_exist_ok_s_isgid_directory(self):
1174 path = os.path.join(support.TESTFN, 'dir1')
1175 S_ISGID = stat.S_ISGID
1176 mode = 0o777
1177 old_mask = os.umask(0o022)
1178 try:
1179 existing_testfn_mode = stat.S_IMODE(
1180 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001181 try:
1182 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001183 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001184 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001185 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1186 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1187 # The os should apply S_ISGID from the parent dir for us, but
1188 # this test need not depend on that behavior. Be explicit.
1189 os.makedirs(path, mode | S_ISGID)
1190 # http://bugs.python.org/issue14992
1191 # Should not fail when the bit is already set.
1192 os.makedirs(path, mode, exist_ok=True)
1193 # remove the bit.
1194 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001195 # May work even when the bit is not already set when demanded.
1196 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001197 finally:
1198 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001199
1200 def test_exist_ok_existing_regular_file(self):
1201 base = support.TESTFN
1202 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001203 with open(path, 'w') as f:
1204 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001205 self.assertRaises(OSError, os.makedirs, path)
1206 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1207 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1208 os.remove(path)
1209
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001210 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001211 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001212 'dir4', 'dir5', 'dir6')
1213 # If the tests failed, the bottom-most directory ('../dir6')
1214 # may not have been created, so we look for the outermost directory
1215 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001216 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001217 path = os.path.dirname(path)
1218
1219 os.removedirs(path)
1220
Andrew Svetlov405faed2012-12-25 12:18:09 +02001221
R David Murrayf2ad1732014-12-25 18:36:56 -05001222@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1223class ChownFileTests(unittest.TestCase):
1224
Berker Peksag036a71b2015-07-21 09:29:48 +03001225 @classmethod
1226 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001227 os.mkdir(support.TESTFN)
1228
1229 def test_chown_uid_gid_arguments_must_be_index(self):
1230 stat = os.stat(support.TESTFN)
1231 uid = stat.st_uid
1232 gid = stat.st_gid
1233 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1234 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1235 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1236 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1237 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1238
1239 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1240 def test_chown(self):
1241 gid_1, gid_2 = groups[:2]
1242 uid = os.stat(support.TESTFN).st_uid
1243 os.chown(support.TESTFN, uid, gid_1)
1244 gid = os.stat(support.TESTFN).st_gid
1245 self.assertEqual(gid, gid_1)
1246 os.chown(support.TESTFN, uid, gid_2)
1247 gid = os.stat(support.TESTFN).st_gid
1248 self.assertEqual(gid, gid_2)
1249
1250 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1251 "test needs root privilege and more than one user")
1252 def test_chown_with_root(self):
1253 uid_1, uid_2 = all_users[:2]
1254 gid = os.stat(support.TESTFN).st_gid
1255 os.chown(support.TESTFN, uid_1, gid)
1256 uid = os.stat(support.TESTFN).st_uid
1257 self.assertEqual(uid, uid_1)
1258 os.chown(support.TESTFN, uid_2, gid)
1259 uid = os.stat(support.TESTFN).st_uid
1260 self.assertEqual(uid, uid_2)
1261
1262 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1263 "test needs non-root account and more than one user")
1264 def test_chown_without_permission(self):
1265 uid_1, uid_2 = all_users[:2]
1266 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001267 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001268 os.chown(support.TESTFN, uid_1, gid)
1269 os.chown(support.TESTFN, uid_2, gid)
1270
Berker Peksag036a71b2015-07-21 09:29:48 +03001271 @classmethod
1272 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001273 os.rmdir(support.TESTFN)
1274
1275
Andrew Svetlov405faed2012-12-25 12:18:09 +02001276class RemoveDirsTests(unittest.TestCase):
1277 def setUp(self):
1278 os.makedirs(support.TESTFN)
1279
1280 def tearDown(self):
1281 support.rmtree(support.TESTFN)
1282
1283 def test_remove_all(self):
1284 dira = os.path.join(support.TESTFN, 'dira')
1285 os.mkdir(dira)
1286 dirb = os.path.join(dira, 'dirb')
1287 os.mkdir(dirb)
1288 os.removedirs(dirb)
1289 self.assertFalse(os.path.exists(dirb))
1290 self.assertFalse(os.path.exists(dira))
1291 self.assertFalse(os.path.exists(support.TESTFN))
1292
1293 def test_remove_partial(self):
1294 dira = os.path.join(support.TESTFN, 'dira')
1295 os.mkdir(dira)
1296 dirb = os.path.join(dira, 'dirb')
1297 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001298 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001299 os.removedirs(dirb)
1300 self.assertFalse(os.path.exists(dirb))
1301 self.assertTrue(os.path.exists(dira))
1302 self.assertTrue(os.path.exists(support.TESTFN))
1303
1304 def test_remove_nothing(self):
1305 dira = os.path.join(support.TESTFN, 'dira')
1306 os.mkdir(dira)
1307 dirb = os.path.join(dira, 'dirb')
1308 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001309 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001310 with self.assertRaises(OSError):
1311 os.removedirs(dirb)
1312 self.assertTrue(os.path.exists(dirb))
1313 self.assertTrue(os.path.exists(dira))
1314 self.assertTrue(os.path.exists(support.TESTFN))
1315
1316
Guido van Rossume7ba4952007-06-06 23:52:48 +00001317class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001318 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001319 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001320 f.write(b'hello')
1321 f.close()
1322 with open(os.devnull, 'rb') as f:
1323 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001324
Andrew Svetlov405faed2012-12-25 12:18:09 +02001325
Guido van Rossume7ba4952007-06-06 23:52:48 +00001326class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001327 def test_urandom_length(self):
1328 self.assertEqual(len(os.urandom(0)), 0)
1329 self.assertEqual(len(os.urandom(1)), 1)
1330 self.assertEqual(len(os.urandom(10)), 10)
1331 self.assertEqual(len(os.urandom(100)), 100)
1332 self.assertEqual(len(os.urandom(1000)), 1000)
1333
1334 def test_urandom_value(self):
1335 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001336 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001337 data2 = os.urandom(16)
1338 self.assertNotEqual(data1, data2)
1339
1340 def get_urandom_subprocess(self, count):
1341 code = '\n'.join((
1342 'import os, sys',
1343 'data = os.urandom(%s)' % count,
1344 'sys.stdout.buffer.write(data)',
1345 'sys.stdout.buffer.flush()'))
1346 out = assert_python_ok('-c', code)
1347 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001348 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001349 return stdout
1350
1351 def test_urandom_subprocess(self):
1352 data1 = self.get_urandom_subprocess(16)
1353 data2 = self.get_urandom_subprocess(16)
1354 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001355
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001356
Victor Stinner9b1f4742016-09-06 16:18:52 -07001357@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1358class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001359 @classmethod
1360 def setUpClass(cls):
1361 try:
1362 os.getrandom(1)
1363 except OSError as exc:
1364 if exc.errno == errno.ENOSYS:
1365 # Python compiled on a more recent Linux version
1366 # than the current Linux kernel
1367 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1368 else:
1369 raise
1370
Victor Stinner9b1f4742016-09-06 16:18:52 -07001371 def test_getrandom_type(self):
1372 data = os.getrandom(16)
1373 self.assertIsInstance(data, bytes)
1374 self.assertEqual(len(data), 16)
1375
1376 def test_getrandom0(self):
1377 empty = os.getrandom(0)
1378 self.assertEqual(empty, b'')
1379
1380 def test_getrandom_random(self):
1381 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1382
1383 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1384 # resource /dev/random
1385
1386 def test_getrandom_nonblock(self):
1387 # The call must not fail. Check also that the flag exists
1388 try:
1389 os.getrandom(1, os.GRND_NONBLOCK)
1390 except BlockingIOError:
1391 # System urandom is not initialized yet
1392 pass
1393
1394 def test_getrandom_value(self):
1395 data1 = os.getrandom(16)
1396 data2 = os.getrandom(16)
1397 self.assertNotEqual(data1, data2)
1398
1399
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001400# os.urandom() doesn't use a file descriptor when it is implemented with the
1401# getentropy() function, the getrandom() function or the getrandom() syscall
1402OS_URANDOM_DONT_USE_FD = (
1403 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1404 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1405 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001406
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001407@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1408 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001409class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001410 @unittest.skipUnless(resource, "test requires the resource module")
1411 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001412 # Check urandom() failing when it is not able to open /dev/random.
1413 # We spawn a new process to make the test more robust (if getrlimit()
1414 # failed to restore the file descriptor limit after this, the whole
1415 # test suite would crash; this actually happened on the OS X Tiger
1416 # buildbot).
1417 code = """if 1:
1418 import errno
1419 import os
1420 import resource
1421
1422 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1423 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1424 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001425 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001426 except OSError as e:
1427 assert e.errno == errno.EMFILE, e.errno
1428 else:
1429 raise AssertionError("OSError not raised")
1430 """
1431 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001432
Antoine Pitroue472aea2014-04-26 14:33:03 +02001433 def test_urandom_fd_closed(self):
1434 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1435 # closed.
1436 code = """if 1:
1437 import os
1438 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001439 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001440 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001441 with test.support.SuppressCrashReport():
1442 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001443 sys.stdout.buffer.write(os.urandom(4))
1444 """
1445 rc, out, err = assert_python_ok('-Sc', code)
1446
1447 def test_urandom_fd_reopened(self):
1448 # Issue #21207: urandom() should detect its fd to /dev/urandom
1449 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001450 self.addCleanup(support.unlink, support.TESTFN)
1451 create_file(support.TESTFN, b"x" * 256)
1452
Antoine Pitroue472aea2014-04-26 14:33:03 +02001453 code = """if 1:
1454 import os
1455 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001456 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001457 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001458 with test.support.SuppressCrashReport():
1459 for fd in range(3, 256):
1460 try:
1461 os.close(fd)
1462 except OSError:
1463 pass
1464 else:
1465 # Found the urandom fd (XXX hopefully)
1466 break
1467 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001468 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001469 new_fd = f.fileno()
1470 # Issue #26935: posix allows new_fd and fd to be equal but
1471 # some libc implementations have dup2 return an error in this
1472 # case.
1473 if new_fd != fd:
1474 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001475 sys.stdout.buffer.write(os.urandom(4))
1476 sys.stdout.buffer.write(os.urandom(4))
1477 """.format(TESTFN=support.TESTFN)
1478 rc, out, err = assert_python_ok('-Sc', code)
1479 self.assertEqual(len(out), 8)
1480 self.assertNotEqual(out[0:4], out[4:8])
1481 rc, out2, err2 = assert_python_ok('-Sc', code)
1482 self.assertEqual(len(out2), 8)
1483 self.assertNotEqual(out2, out)
1484
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001485
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001486@contextlib.contextmanager
1487def _execvpe_mockup(defpath=None):
1488 """
1489 Stubs out execv and execve functions when used as context manager.
1490 Records exec calls. The mock execv and execve functions always raise an
1491 exception as they would normally never return.
1492 """
1493 # A list of tuples containing (function name, first arg, args)
1494 # of calls to execv or execve that have been made.
1495 calls = []
1496
1497 def mock_execv(name, *args):
1498 calls.append(('execv', name, args))
1499 raise RuntimeError("execv called")
1500
1501 def mock_execve(name, *args):
1502 calls.append(('execve', name, args))
1503 raise OSError(errno.ENOTDIR, "execve called")
1504
1505 try:
1506 orig_execv = os.execv
1507 orig_execve = os.execve
1508 orig_defpath = os.defpath
1509 os.execv = mock_execv
1510 os.execve = mock_execve
1511 if defpath is not None:
1512 os.defpath = defpath
1513 yield calls
1514 finally:
1515 os.execv = orig_execv
1516 os.execve = orig_execve
1517 os.defpath = orig_defpath
1518
Victor Stinner4659ccf2016-09-14 10:57:00 +02001519
Guido van Rossume7ba4952007-06-06 23:52:48 +00001520class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001521 @unittest.skipIf(USING_LINUXTHREADS,
1522 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001523 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001524 self.assertRaises(OSError, os.execvpe, 'no such app-',
1525 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001526
Steve Dowerbce26262016-11-19 19:17:26 -08001527 def test_execv_with_bad_arglist(self):
1528 self.assertRaises(ValueError, os.execv, 'notepad', ())
1529 self.assertRaises(ValueError, os.execv, 'notepad', [])
1530 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1531 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1532
Thomas Heller6790d602007-08-30 17:15:14 +00001533 def test_execvpe_with_bad_arglist(self):
1534 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001535 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1536 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001537
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001538 @unittest.skipUnless(hasattr(os, '_execvpe'),
1539 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001540 def _test_internal_execvpe(self, test_type):
1541 program_path = os.sep + 'absolutepath'
1542 if test_type is bytes:
1543 program = b'executable'
1544 fullpath = os.path.join(os.fsencode(program_path), program)
1545 native_fullpath = fullpath
1546 arguments = [b'progname', 'arg1', 'arg2']
1547 else:
1548 program = 'executable'
1549 arguments = ['progname', 'arg1', 'arg2']
1550 fullpath = os.path.join(program_path, program)
1551 if os.name != "nt":
1552 native_fullpath = os.fsencode(fullpath)
1553 else:
1554 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001555 env = {'spam': 'beans'}
1556
Victor Stinnerb745a742010-05-18 17:17:23 +00001557 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001558 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001559 self.assertRaises(RuntimeError,
1560 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001561 self.assertEqual(len(calls), 1)
1562 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1563
Victor Stinnerb745a742010-05-18 17:17:23 +00001564 # test os._execvpe() with a relative path:
1565 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001566 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001567 self.assertRaises(OSError,
1568 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001569 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001570 self.assertSequenceEqual(calls[0],
1571 ('execve', native_fullpath, (arguments, env)))
1572
1573 # test os._execvpe() with a relative path:
1574 # os.get_exec_path() reads the 'PATH' variable
1575 with _execvpe_mockup() as calls:
1576 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001577 if test_type is bytes:
1578 env_path[b'PATH'] = program_path
1579 else:
1580 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001581 self.assertRaises(OSError,
1582 os._execvpe, program, arguments, env=env_path)
1583 self.assertEqual(len(calls), 1)
1584 self.assertSequenceEqual(calls[0],
1585 ('execve', native_fullpath, (arguments, env_path)))
1586
1587 def test_internal_execvpe_str(self):
1588 self._test_internal_execvpe(str)
1589 if os.name != "nt":
1590 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001591
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001592 def test_execve_invalid_env(self):
1593 args = [sys.executable, '-c', 'pass']
1594
Ville Skyttä49b27342017-08-03 09:00:59 +03001595 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001596 newenv = os.environ.copy()
1597 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1598 with self.assertRaises(ValueError):
1599 os.execve(args[0], args, newenv)
1600
Ville Skyttä49b27342017-08-03 09:00:59 +03001601 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001602 newenv = os.environ.copy()
1603 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1604 with self.assertRaises(ValueError):
1605 os.execve(args[0], args, newenv)
1606
Ville Skyttä49b27342017-08-03 09:00:59 +03001607 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001608 newenv = os.environ.copy()
1609 newenv["FRUIT=ORANGE"] = "lemon"
1610 with self.assertRaises(ValueError):
1611 os.execve(args[0], args, newenv)
1612
Alexey Izbyshev83460312018-10-20 03:28:22 +03001613 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1614 def test_execve_with_empty_path(self):
1615 # bpo-32890: Check GetLastError() misuse
1616 try:
1617 os.execve('', ['arg'], {})
1618 except OSError as e:
1619 self.assertTrue(e.winerror is None or e.winerror != 0)
1620 else:
1621 self.fail('No OSError raised')
1622
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001623
Serhiy Storchaka43767632013-11-03 21:31:38 +02001624@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001625class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001626 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001627 try:
1628 os.stat(support.TESTFN)
1629 except FileNotFoundError:
1630 exists = False
1631 except OSError as exc:
1632 exists = True
1633 self.fail("file %s must not exist; os.stat failed with %s"
1634 % (support.TESTFN, exc))
1635 else:
1636 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001637
Thomas Wouters477c8d52006-05-27 19:21:47 +00001638 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001639 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001640
1641 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001642 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001643
1644 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001645 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001646
1647 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001648 self.addCleanup(support.unlink, support.TESTFN)
1649
Victor Stinnere77c9742016-03-25 10:28:23 +01001650 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001651 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001652
1653 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001654 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001655
Thomas Wouters477c8d52006-05-27 19:21:47 +00001656 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001657 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001658
Victor Stinnere77c9742016-03-25 10:28:23 +01001659
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001660class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001661 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1663 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001664 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001665 def get_single(f):
1666 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001667 if hasattr(os, f):
1668 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001669 return helper
1670 for f in singles:
1671 locals()["test_"+f] = get_single(f)
1672
Benjamin Peterson7522c742009-01-19 21:00:09 +00001673 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001674 try:
1675 f(support.make_bad_fd(), *args)
1676 except OSError as e:
1677 self.assertEqual(e.errno, errno.EBADF)
1678 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001679 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001680 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001681
Serhiy Storchaka43767632013-11-03 21:31:38 +02001682 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001683 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001684 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001685
Serhiy Storchaka43767632013-11-03 21:31:38 +02001686 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001687 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001688 fd = support.make_bad_fd()
1689 # Make sure none of the descriptors we are about to close are
1690 # currently valid (issue 6542).
1691 for i in range(10):
1692 try: os.fstat(fd+i)
1693 except OSError:
1694 pass
1695 else:
1696 break
1697 if i < 2:
1698 raise unittest.SkipTest(
1699 "Unable to acquire a range of invalid file descriptors")
1700 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001701
Serhiy Storchaka43767632013-11-03 21:31:38 +02001702 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001703 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001704 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001705
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001707 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001709
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001711 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001712 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001713
Serhiy Storchaka43767632013-11-03 21:31:38 +02001714 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001715 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001716 self.check(os.pathconf, "PC_NAME_MAX")
1717 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001718
Serhiy Storchaka43767632013-11-03 21:31:38 +02001719 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001720 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001721 self.check(os.truncate, 0)
1722 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001723
Serhiy Storchaka43767632013-11-03 21:31:38 +02001724 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001725 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001726 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001727
Serhiy Storchaka43767632013-11-03 21:31:38 +02001728 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001729 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001730 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001731
Victor Stinner57ddf782014-01-08 15:21:28 +01001732 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1733 def test_readv(self):
1734 buf = bytearray(10)
1735 self.check(os.readv, [buf])
1736
Serhiy Storchaka43767632013-11-03 21:31:38 +02001737 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001738 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001739 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001740
Serhiy Storchaka43767632013-11-03 21:31:38 +02001741 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001742 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001743 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001744
Victor Stinner57ddf782014-01-08 15:21:28 +01001745 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1746 def test_writev(self):
1747 self.check(os.writev, [b'abc'])
1748
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001749 def test_inheritable(self):
1750 self.check(os.get_inheritable)
1751 self.check(os.set_inheritable, True)
1752
1753 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1754 'needs os.get_blocking() and os.set_blocking()')
1755 def test_blocking(self):
1756 self.check(os.get_blocking)
1757 self.check(os.set_blocking, True)
1758
Brian Curtin1b9df392010-11-24 20:24:31 +00001759
1760class LinkTests(unittest.TestCase):
1761 def setUp(self):
1762 self.file1 = support.TESTFN
1763 self.file2 = os.path.join(support.TESTFN + "2")
1764
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001765 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001766 for file in (self.file1, self.file2):
1767 if os.path.exists(file):
1768 os.unlink(file)
1769
Brian Curtin1b9df392010-11-24 20:24:31 +00001770 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001771 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001772
xdegaye6a55d092017-11-12 17:57:04 +01001773 try:
1774 os.link(file1, file2)
1775 except PermissionError as e:
1776 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001777 with open(file1, "r") as f1, open(file2, "r") as f2:
1778 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1779
1780 def test_link(self):
1781 self._test_link(self.file1, self.file2)
1782
1783 def test_link_bytes(self):
1784 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1785 bytes(self.file2, sys.getfilesystemencoding()))
1786
Brian Curtinf498b752010-11-30 15:54:04 +00001787 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001788 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001789 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001790 except UnicodeError:
1791 raise unittest.SkipTest("Unable to encode for this platform.")
1792
Brian Curtinf498b752010-11-30 15:54:04 +00001793 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001794 self.file2 = self.file1 + "2"
1795 self._test_link(self.file1, self.file2)
1796
Serhiy Storchaka43767632013-11-03 21:31:38 +02001797@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1798class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001799 # uid_t and gid_t are 32-bit unsigned integers on Linux
1800 UID_OVERFLOW = (1 << 32)
1801 GID_OVERFLOW = (1 << 32)
1802
Serhiy Storchaka43767632013-11-03 21:31:38 +02001803 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1804 def test_setuid(self):
1805 if os.getuid() != 0:
1806 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001807 self.assertRaises(TypeError, os.setuid, 'not an int')
1808 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001809
Serhiy Storchaka43767632013-11-03 21:31:38 +02001810 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1811 def test_setgid(self):
1812 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1813 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001814 self.assertRaises(TypeError, os.setgid, 'not an int')
1815 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001816
Serhiy Storchaka43767632013-11-03 21:31:38 +02001817 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1818 def test_seteuid(self):
1819 if os.getuid() != 0:
1820 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001821 self.assertRaises(TypeError, os.setegid, 'not an int')
1822 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001823
Serhiy Storchaka43767632013-11-03 21:31:38 +02001824 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1825 def test_setegid(self):
1826 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1827 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001828 self.assertRaises(TypeError, os.setegid, 'not an int')
1829 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001830
Serhiy Storchaka43767632013-11-03 21:31:38 +02001831 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1832 def test_setreuid(self):
1833 if os.getuid() != 0:
1834 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001835 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1836 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1837 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1838 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001839
Serhiy Storchaka43767632013-11-03 21:31:38 +02001840 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1841 def test_setreuid_neg1(self):
1842 # Needs to accept -1. We run this in a subprocess to avoid
1843 # altering the test runner's process state (issue8045).
1844 subprocess.check_call([
1845 sys.executable, '-c',
1846 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001847
Serhiy Storchaka43767632013-11-03 21:31:38 +02001848 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1849 def test_setregid(self):
1850 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1851 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001852 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
1853 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
1854 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
1855 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001856
Serhiy Storchaka43767632013-11-03 21:31:38 +02001857 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1858 def test_setregid_neg1(self):
1859 # Needs to accept -1. We run this in a subprocess to avoid
1860 # altering the test runner's process state (issue8045).
1861 subprocess.check_call([
1862 sys.executable, '-c',
1863 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001864
Serhiy Storchaka43767632013-11-03 21:31:38 +02001865@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1866class Pep383Tests(unittest.TestCase):
1867 def setUp(self):
1868 if support.TESTFN_UNENCODABLE:
1869 self.dir = support.TESTFN_UNENCODABLE
1870 elif support.TESTFN_NONASCII:
1871 self.dir = support.TESTFN_NONASCII
1872 else:
1873 self.dir = support.TESTFN
1874 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001875
Serhiy Storchaka43767632013-11-03 21:31:38 +02001876 bytesfn = []
1877 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001878 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001879 fn = os.fsencode(fn)
1880 except UnicodeEncodeError:
1881 return
1882 bytesfn.append(fn)
1883 add_filename(support.TESTFN_UNICODE)
1884 if support.TESTFN_UNENCODABLE:
1885 add_filename(support.TESTFN_UNENCODABLE)
1886 if support.TESTFN_NONASCII:
1887 add_filename(support.TESTFN_NONASCII)
1888 if not bytesfn:
1889 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001890
Serhiy Storchaka43767632013-11-03 21:31:38 +02001891 self.unicodefn = set()
1892 os.mkdir(self.dir)
1893 try:
1894 for fn in bytesfn:
1895 support.create_empty_file(os.path.join(self.bdir, fn))
1896 fn = os.fsdecode(fn)
1897 if fn in self.unicodefn:
1898 raise ValueError("duplicate filename")
1899 self.unicodefn.add(fn)
1900 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001901 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001902 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001903
Serhiy Storchaka43767632013-11-03 21:31:38 +02001904 def tearDown(self):
1905 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001906
Serhiy Storchaka43767632013-11-03 21:31:38 +02001907 def test_listdir(self):
1908 expected = self.unicodefn
1909 found = set(os.listdir(self.dir))
1910 self.assertEqual(found, expected)
1911 # test listdir without arguments
1912 current_directory = os.getcwd()
1913 try:
1914 os.chdir(os.sep)
1915 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1916 finally:
1917 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001918
Serhiy Storchaka43767632013-11-03 21:31:38 +02001919 def test_open(self):
1920 for fn in self.unicodefn:
1921 f = open(os.path.join(self.dir, fn), 'rb')
1922 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001923
Serhiy Storchaka43767632013-11-03 21:31:38 +02001924 @unittest.skipUnless(hasattr(os, 'statvfs'),
1925 "need os.statvfs()")
1926 def test_statvfs(self):
1927 # issue #9645
1928 for fn in self.unicodefn:
1929 # should not fail with file not found error
1930 fullname = os.path.join(self.dir, fn)
1931 os.statvfs(fullname)
1932
1933 def test_stat(self):
1934 for fn in self.unicodefn:
1935 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001936
Brian Curtineb24d742010-04-12 17:16:38 +00001937@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1938class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001939 def _kill(self, sig):
1940 # Start sys.executable as a subprocess and communicate from the
1941 # subprocess to the parent that the interpreter is ready. When it
1942 # becomes ready, send *sig* via os.kill to the subprocess and check
1943 # that the return code is equal to *sig*.
1944 import ctypes
1945 from ctypes import wintypes
1946 import msvcrt
1947
1948 # Since we can't access the contents of the process' stdout until the
1949 # process has exited, use PeekNamedPipe to see what's inside stdout
1950 # without waiting. This is done so we can tell that the interpreter
1951 # is started and running at a point where it could handle a signal.
1952 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1953 PeekNamedPipe.restype = wintypes.BOOL
1954 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1955 ctypes.POINTER(ctypes.c_char), # stdout buf
1956 wintypes.DWORD, # Buffer size
1957 ctypes.POINTER(wintypes.DWORD), # bytes read
1958 ctypes.POINTER(wintypes.DWORD), # bytes avail
1959 ctypes.POINTER(wintypes.DWORD)) # bytes left
1960 msg = "running"
1961 proc = subprocess.Popen([sys.executable, "-c",
1962 "import sys;"
1963 "sys.stdout.write('{}');"
1964 "sys.stdout.flush();"
1965 "input()".format(msg)],
1966 stdout=subprocess.PIPE,
1967 stderr=subprocess.PIPE,
1968 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001969 self.addCleanup(proc.stdout.close)
1970 self.addCleanup(proc.stderr.close)
1971 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001972
1973 count, max = 0, 100
1974 while count < max and proc.poll() is None:
1975 # Create a string buffer to store the result of stdout from the pipe
1976 buf = ctypes.create_string_buffer(len(msg))
1977 # Obtain the text currently in proc.stdout
1978 # Bytes read/avail/left are left as NULL and unused
1979 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1980 buf, ctypes.sizeof(buf), None, None, None)
1981 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1982 if buf.value:
1983 self.assertEqual(msg, buf.value.decode())
1984 break
1985 time.sleep(0.1)
1986 count += 1
1987 else:
1988 self.fail("Did not receive communication from the subprocess")
1989
Brian Curtineb24d742010-04-12 17:16:38 +00001990 os.kill(proc.pid, sig)
1991 self.assertEqual(proc.wait(), sig)
1992
1993 def test_kill_sigterm(self):
1994 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001995 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001996
1997 def test_kill_int(self):
1998 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001999 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002000
2001 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002002 tagname = "test_os_%s" % uuid.uuid1()
2003 m = mmap.mmap(-1, 1, tagname)
2004 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002005 # Run a script which has console control handling enabled.
2006 proc = subprocess.Popen([sys.executable,
2007 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002008 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002009 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2010 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002011 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002012 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002013 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002014 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002015 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002016 count += 1
2017 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002018 # Forcefully kill the process if we weren't able to signal it.
2019 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002020 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002021 os.kill(proc.pid, event)
2022 # proc.send_signal(event) could also be done here.
2023 # Allow time for the signal to be passed and the process to exit.
2024 time.sleep(0.5)
2025 if not proc.poll():
2026 # Forcefully kill the process if we weren't able to signal it.
2027 os.kill(proc.pid, signal.SIGINT)
2028 self.fail("subprocess did not stop on {}".format(name))
2029
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002030 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002031 def test_CTRL_C_EVENT(self):
2032 from ctypes import wintypes
2033 import ctypes
2034
2035 # Make a NULL value by creating a pointer with no argument.
2036 NULL = ctypes.POINTER(ctypes.c_int)()
2037 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2038 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2039 wintypes.BOOL)
2040 SetConsoleCtrlHandler.restype = wintypes.BOOL
2041
2042 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002043 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002044 # by subprocesses.
2045 SetConsoleCtrlHandler(NULL, 0)
2046
2047 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2048
2049 def test_CTRL_BREAK_EVENT(self):
2050 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2051
2052
Brian Curtind40e6f72010-07-08 21:39:08 +00002053@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002054class Win32ListdirTests(unittest.TestCase):
2055 """Test listdir on Windows."""
2056
2057 def setUp(self):
2058 self.created_paths = []
2059 for i in range(2):
2060 dir_name = 'SUB%d' % i
2061 dir_path = os.path.join(support.TESTFN, dir_name)
2062 file_name = 'FILE%d' % i
2063 file_path = os.path.join(support.TESTFN, file_name)
2064 os.makedirs(dir_path)
2065 with open(file_path, 'w') as f:
2066 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2067 self.created_paths.extend([dir_name, file_name])
2068 self.created_paths.sort()
2069
2070 def tearDown(self):
2071 shutil.rmtree(support.TESTFN)
2072
2073 def test_listdir_no_extended_path(self):
2074 """Test when the path is not an "extended" path."""
2075 # unicode
2076 self.assertEqual(
2077 sorted(os.listdir(support.TESTFN)),
2078 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002079
Tim Golden781bbeb2013-10-25 20:24:06 +01002080 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002081 self.assertEqual(
2082 sorted(os.listdir(os.fsencode(support.TESTFN))),
2083 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002084
2085 def test_listdir_extended_path(self):
2086 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002087 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002088 # unicode
2089 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2090 self.assertEqual(
2091 sorted(os.listdir(path)),
2092 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002093
Tim Golden781bbeb2013-10-25 20:24:06 +01002094 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002095 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2096 self.assertEqual(
2097 sorted(os.listdir(path)),
2098 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002099
2100
Berker Peksage0b5b202018-08-15 13:03:41 +03002101@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2102class ReadlinkTests(unittest.TestCase):
2103 filelink = 'readlinktest'
2104 filelink_target = os.path.abspath(__file__)
2105 filelinkb = os.fsencode(filelink)
2106 filelinkb_target = os.fsencode(filelink_target)
2107
2108 def setUp(self):
2109 self.assertTrue(os.path.exists(self.filelink_target))
2110 self.assertTrue(os.path.exists(self.filelinkb_target))
2111 self.assertFalse(os.path.exists(self.filelink))
2112 self.assertFalse(os.path.exists(self.filelinkb))
2113
2114 def test_not_symlink(self):
2115 filelink_target = FakePath(self.filelink_target)
2116 self.assertRaises(OSError, os.readlink, self.filelink_target)
2117 self.assertRaises(OSError, os.readlink, filelink_target)
2118
2119 def test_missing_link(self):
2120 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2121 self.assertRaises(FileNotFoundError, os.readlink,
2122 FakePath('missing-link'))
2123
2124 @support.skip_unless_symlink
2125 def test_pathlike(self):
2126 os.symlink(self.filelink_target, self.filelink)
2127 self.addCleanup(support.unlink, self.filelink)
2128 filelink = FakePath(self.filelink)
2129 self.assertEqual(os.readlink(filelink), self.filelink_target)
2130
2131 @support.skip_unless_symlink
2132 def test_pathlike_bytes(self):
2133 os.symlink(self.filelinkb_target, self.filelinkb)
2134 self.addCleanup(support.unlink, self.filelinkb)
2135 path = os.readlink(FakePath(self.filelinkb))
2136 self.assertEqual(path, self.filelinkb_target)
2137 self.assertIsInstance(path, bytes)
2138
2139 @support.skip_unless_symlink
2140 def test_bytes(self):
2141 os.symlink(self.filelinkb_target, self.filelinkb)
2142 self.addCleanup(support.unlink, self.filelinkb)
2143 path = os.readlink(self.filelinkb)
2144 self.assertEqual(path, self.filelinkb_target)
2145 self.assertIsInstance(path, bytes)
2146
2147
Tim Golden781bbeb2013-10-25 20:24:06 +01002148@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002149@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002150class Win32SymlinkTests(unittest.TestCase):
2151 filelink = 'filelinktest'
2152 filelink_target = os.path.abspath(__file__)
2153 dirlink = 'dirlinktest'
2154 dirlink_target = os.path.dirname(filelink_target)
2155 missing_link = 'missing link'
2156
2157 def setUp(self):
2158 assert os.path.exists(self.dirlink_target)
2159 assert os.path.exists(self.filelink_target)
2160 assert not os.path.exists(self.dirlink)
2161 assert not os.path.exists(self.filelink)
2162 assert not os.path.exists(self.missing_link)
2163
2164 def tearDown(self):
2165 if os.path.exists(self.filelink):
2166 os.remove(self.filelink)
2167 if os.path.exists(self.dirlink):
2168 os.rmdir(self.dirlink)
2169 if os.path.lexists(self.missing_link):
2170 os.remove(self.missing_link)
2171
2172 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002173 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002174 self.assertTrue(os.path.exists(self.dirlink))
2175 self.assertTrue(os.path.isdir(self.dirlink))
2176 self.assertTrue(os.path.islink(self.dirlink))
2177 self.check_stat(self.dirlink, self.dirlink_target)
2178
2179 def test_file_link(self):
2180 os.symlink(self.filelink_target, self.filelink)
2181 self.assertTrue(os.path.exists(self.filelink))
2182 self.assertTrue(os.path.isfile(self.filelink))
2183 self.assertTrue(os.path.islink(self.filelink))
2184 self.check_stat(self.filelink, self.filelink_target)
2185
2186 def _create_missing_dir_link(self):
2187 'Create a "directory" link to a non-existent target'
2188 linkname = self.missing_link
2189 if os.path.lexists(linkname):
2190 os.remove(linkname)
2191 target = r'c:\\target does not exist.29r3c740'
2192 assert not os.path.exists(target)
2193 target_is_dir = True
2194 os.symlink(target, linkname, target_is_dir)
2195
2196 def test_remove_directory_link_to_missing_target(self):
2197 self._create_missing_dir_link()
2198 # For compatibility with Unix, os.remove will check the
2199 # directory status and call RemoveDirectory if the symlink
2200 # was created with target_is_dir==True.
2201 os.remove(self.missing_link)
2202
2203 @unittest.skip("currently fails; consider for improvement")
2204 def test_isdir_on_directory_link_to_missing_target(self):
2205 self._create_missing_dir_link()
2206 # consider having isdir return true for directory links
2207 self.assertTrue(os.path.isdir(self.missing_link))
2208
2209 @unittest.skip("currently fails; consider for improvement")
2210 def test_rmdir_on_directory_link_to_missing_target(self):
2211 self._create_missing_dir_link()
2212 # consider allowing rmdir to remove directory links
2213 os.rmdir(self.missing_link)
2214
2215 def check_stat(self, link, target):
2216 self.assertEqual(os.stat(link), os.stat(target))
2217 self.assertNotEqual(os.lstat(link), os.stat(link))
2218
Brian Curtind25aef52011-06-13 15:16:04 -05002219 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002220 self.assertEqual(os.stat(bytes_link), os.stat(target))
2221 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002222
2223 def test_12084(self):
2224 level1 = os.path.abspath(support.TESTFN)
2225 level2 = os.path.join(level1, "level2")
2226 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002227 self.addCleanup(support.rmtree, level1)
2228
2229 os.mkdir(level1)
2230 os.mkdir(level2)
2231 os.mkdir(level3)
2232
2233 file1 = os.path.abspath(os.path.join(level1, "file1"))
2234 create_file(file1)
2235
2236 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002237 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002238 os.chdir(level2)
2239 link = os.path.join(level2, "link")
2240 os.symlink(os.path.relpath(file1), "link")
2241 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002242
Victor Stinnerae39d232016-03-24 17:12:55 +01002243 # Check os.stat calls from the same dir as the link
2244 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002245
Victor Stinnerae39d232016-03-24 17:12:55 +01002246 # Check os.stat calls from a dir below the link
2247 os.chdir(level1)
2248 self.assertEqual(os.stat(file1),
2249 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002250
Victor Stinnerae39d232016-03-24 17:12:55 +01002251 # Check os.stat calls from a dir above the link
2252 os.chdir(level3)
2253 self.assertEqual(os.stat(file1),
2254 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002255 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002256 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002257
SSE43c34aad2018-02-13 00:10:35 +07002258 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2259 and os.path.exists(r'C:\ProgramData'),
2260 'Test directories not found')
2261 def test_29248(self):
2262 # os.symlink() calls CreateSymbolicLink, which creates
2263 # the reparse data buffer with the print name stored
2264 # first, so the offset is always 0. CreateSymbolicLink
2265 # stores the "PrintName" DOS path (e.g. "C:\") first,
2266 # with an offset of 0, followed by the "SubstituteName"
2267 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2268 # the other hand, seems to have been created manually
2269 # with an inverted order.
2270 target = os.readlink(r'C:\Users\All Users')
2271 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2272
Steve Dower6921e732018-03-05 14:26:08 -08002273 def test_buffer_overflow(self):
2274 # Older versions would have a buffer overflow when detecting
2275 # whether a link source was a directory. This test ensures we
2276 # no longer crash, but does not otherwise validate the behavior
2277 segment = 'X' * 27
2278 path = os.path.join(*[segment] * 10)
2279 test_cases = [
2280 # overflow with absolute src
2281 ('\\' + path, segment),
2282 # overflow dest with relative src
2283 (segment, path),
2284 # overflow when joining src
2285 (path[:180], path[:180]),
2286 ]
2287 for src, dest in test_cases:
2288 try:
2289 os.symlink(src, dest)
2290 except FileNotFoundError:
2291 pass
2292 else:
2293 try:
2294 os.remove(dest)
2295 except OSError:
2296 pass
2297 # Also test with bytes, since that is a separate code path.
2298 try:
2299 os.symlink(os.fsencode(src), os.fsencode(dest))
2300 except FileNotFoundError:
2301 pass
2302 else:
2303 try:
2304 os.remove(dest)
2305 except OSError:
2306 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002307
Tim Golden0321cf22014-05-05 19:46:17 +01002308@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2309class Win32JunctionTests(unittest.TestCase):
2310 junction = 'junctiontest'
2311 junction_target = os.path.dirname(os.path.abspath(__file__))
2312
2313 def setUp(self):
2314 assert os.path.exists(self.junction_target)
2315 assert not os.path.exists(self.junction)
2316
2317 def tearDown(self):
2318 if os.path.exists(self.junction):
2319 # os.rmdir delegates to Windows' RemoveDirectoryW,
2320 # which removes junction points safely.
2321 os.rmdir(self.junction)
2322
2323 def test_create_junction(self):
2324 _winapi.CreateJunction(self.junction_target, self.junction)
2325 self.assertTrue(os.path.exists(self.junction))
2326 self.assertTrue(os.path.isdir(self.junction))
2327
2328 # Junctions are not recognized as links.
2329 self.assertFalse(os.path.islink(self.junction))
2330
2331 def test_unlink_removes_junction(self):
2332 _winapi.CreateJunction(self.junction_target, self.junction)
2333 self.assertTrue(os.path.exists(self.junction))
2334
2335 os.unlink(self.junction)
2336 self.assertFalse(os.path.exists(self.junction))
2337
Mark Becwarb82bfac2019-02-02 16:08:23 -05002338@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2339class Win32NtTests(unittest.TestCase):
2340 def setUp(self):
2341 from test import support
2342 self.nt = support.import_module('nt')
2343 pass
2344
2345 def tearDown(self):
2346 pass
2347
2348 def test_getfinalpathname_handles(self):
2349 try:
2350 import ctypes, ctypes.wintypes
2351 except ImportError:
2352 raise unittest.SkipTest('ctypes module is required for this test')
2353
2354 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2355 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2356
2357 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2358 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2359 ctypes.wintypes.LPDWORD)
2360
2361 # This is a pseudo-handle that doesn't need to be closed
2362 hproc = kernel.GetCurrentProcess()
2363
2364 handle_count = ctypes.wintypes.DWORD()
2365 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2366 self.assertEqual(1, ok)
2367
2368 before_count = handle_count.value
2369
2370 # The first two test the error path, __file__ tests the success path
2371 filenames = [ r'\\?\C:',
2372 r'\\?\NUL',
2373 r'\\?\CONIN',
2374 __file__ ]
2375
2376 for i in range(10):
2377 for name in filenames:
2378 try:
2379 tmp = self.nt._getfinalpathname(name)
2380 except:
2381 # Failure is expected
2382 pass
2383 try:
2384 tmp = os.stat(name)
2385 except:
2386 pass
2387
2388 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2389 self.assertEqual(1, ok)
2390
2391 handle_delta = handle_count.value - before_count
2392
2393 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002394
Jason R. Coombs3a092862013-05-27 23:21:28 -04002395@support.skip_unless_symlink
2396class NonLocalSymlinkTests(unittest.TestCase):
2397
2398 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002399 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002400 Create this structure:
2401
2402 base
2403 \___ some_dir
2404 """
2405 os.makedirs('base/some_dir')
2406
2407 def tearDown(self):
2408 shutil.rmtree('base')
2409
2410 def test_directory_link_nonlocal(self):
2411 """
2412 The symlink target should resolve relative to the link, not relative
2413 to the current directory.
2414
2415 Then, link base/some_link -> base/some_dir and ensure that some_link
2416 is resolved as a directory.
2417
2418 In issue13772, it was discovered that directory detection failed if
2419 the symlink target was not specified relative to the current
2420 directory, which was a defect in the implementation.
2421 """
2422 src = os.path.join('base', 'some_link')
2423 os.symlink('some_dir', src)
2424 assert os.path.isdir(src)
2425
2426
Victor Stinnere8d51452010-08-19 01:05:19 +00002427class FSEncodingTests(unittest.TestCase):
2428 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2430 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002431
Victor Stinnere8d51452010-08-19 01:05:19 +00002432 def test_identity(self):
2433 # assert fsdecode(fsencode(x)) == x
2434 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2435 try:
2436 bytesfn = os.fsencode(fn)
2437 except UnicodeEncodeError:
2438 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002439 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002440
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002441
Brett Cannonefb00c02012-02-29 18:31:31 -05002442
2443class DeviceEncodingTests(unittest.TestCase):
2444
2445 def test_bad_fd(self):
2446 # Return None when an fd doesn't actually exist.
2447 self.assertIsNone(os.device_encoding(123456))
2448
Philip Jenveye308b7c2012-02-29 16:16:15 -08002449 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2450 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002451 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002452 def test_device_encoding(self):
2453 encoding = os.device_encoding(0)
2454 self.assertIsNotNone(encoding)
2455 self.assertTrue(codecs.lookup(encoding))
2456
2457
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002458class PidTests(unittest.TestCase):
2459 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2460 def test_getppid(self):
2461 p = subprocess.Popen([sys.executable, '-c',
2462 'import os; print(os.getppid())'],
2463 stdout=subprocess.PIPE)
2464 stdout, _ = p.communicate()
2465 # We are the parent of our subprocess
2466 self.assertEqual(int(stdout), os.getpid())
2467
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002468 def test_waitpid(self):
2469 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002470 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002471 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002472 status = os.waitpid(pid, 0)
2473 self.assertEqual(status, (pid, 0))
2474
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002475
Victor Stinner4659ccf2016-09-14 10:57:00 +02002476class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002477 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002478 self.exitcode = 17
2479
2480 filename = support.TESTFN
2481 self.addCleanup(support.unlink, filename)
2482
2483 if not with_env:
2484 code = 'import sys; sys.exit(%s)' % self.exitcode
2485 else:
2486 self.env = dict(os.environ)
2487 # create an unique key
2488 self.key = str(uuid.uuid4())
2489 self.env[self.key] = self.key
2490 # read the variable from os.environ to check that it exists
2491 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2492 % (self.key, self.exitcode))
2493
2494 with open(filename, "w") as fp:
2495 fp.write(code)
2496
Berker Peksag81816462016-09-15 20:19:47 +03002497 args = [sys.executable, filename]
2498 if use_bytes:
2499 args = [os.fsencode(a) for a in args]
2500 self.env = {os.fsencode(k): os.fsencode(v)
2501 for k, v in self.env.items()}
2502
2503 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002504
Berker Peksag4af23d72016-09-15 20:32:44 +03002505 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002506 def test_spawnl(self):
2507 args = self.create_args()
2508 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2509 self.assertEqual(exitcode, self.exitcode)
2510
Berker Peksag4af23d72016-09-15 20:32:44 +03002511 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002512 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002513 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002514 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2515 self.assertEqual(exitcode, self.exitcode)
2516
Berker Peksag4af23d72016-09-15 20:32:44 +03002517 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002518 def test_spawnlp(self):
2519 args = self.create_args()
2520 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2521 self.assertEqual(exitcode, self.exitcode)
2522
Berker Peksag4af23d72016-09-15 20:32:44 +03002523 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002524 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002525 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002526 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2527 self.assertEqual(exitcode, self.exitcode)
2528
Berker Peksag4af23d72016-09-15 20:32:44 +03002529 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002530 def test_spawnv(self):
2531 args = self.create_args()
2532 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2533 self.assertEqual(exitcode, self.exitcode)
2534
Berker Peksag4af23d72016-09-15 20:32:44 +03002535 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002536 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002537 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002538 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2539 self.assertEqual(exitcode, self.exitcode)
2540
Berker Peksag4af23d72016-09-15 20:32:44 +03002541 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002542 def test_spawnvp(self):
2543 args = self.create_args()
2544 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2545 self.assertEqual(exitcode, self.exitcode)
2546
Berker Peksag4af23d72016-09-15 20:32:44 +03002547 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002548 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002549 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002550 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2551 self.assertEqual(exitcode, self.exitcode)
2552
Berker Peksag4af23d72016-09-15 20:32:44 +03002553 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002554 def test_nowait(self):
2555 args = self.create_args()
2556 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2557 result = os.waitpid(pid, 0)
2558 self.assertEqual(result[0], pid)
2559 status = result[1]
2560 if hasattr(os, 'WIFEXITED'):
2561 self.assertTrue(os.WIFEXITED(status))
2562 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2563 else:
2564 self.assertEqual(status, self.exitcode << 8)
2565
Berker Peksag4af23d72016-09-15 20:32:44 +03002566 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002567 def test_spawnve_bytes(self):
2568 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2569 args = self.create_args(with_env=True, use_bytes=True)
2570 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2571 self.assertEqual(exitcode, self.exitcode)
2572
Steve Dower859fd7b2016-11-19 18:53:19 -08002573 @requires_os_func('spawnl')
2574 def test_spawnl_noargs(self):
2575 args = self.create_args()
2576 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002577 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002578
2579 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002580 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002581 args = self.create_args()
2582 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002583 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002584
2585 @requires_os_func('spawnv')
2586 def test_spawnv_noargs(self):
2587 args = self.create_args()
2588 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2589 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002590 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2591 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002592
2593 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002594 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002595 args = self.create_args()
2596 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2597 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002598 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2599 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002600
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002601 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002602 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002603
Ville Skyttä49b27342017-08-03 09:00:59 +03002604 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002605 newenv = os.environ.copy()
2606 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2607 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002608 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002609 except ValueError:
2610 pass
2611 else:
2612 self.assertEqual(exitcode, 127)
2613
Ville Skyttä49b27342017-08-03 09:00:59 +03002614 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002615 newenv = os.environ.copy()
2616 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2617 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002618 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002619 except ValueError:
2620 pass
2621 else:
2622 self.assertEqual(exitcode, 127)
2623
Ville Skyttä49b27342017-08-03 09:00:59 +03002624 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002625 newenv = os.environ.copy()
2626 newenv["FRUIT=ORANGE"] = "lemon"
2627 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002628 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002629 except ValueError:
2630 pass
2631 else:
2632 self.assertEqual(exitcode, 127)
2633
Ville Skyttä49b27342017-08-03 09:00:59 +03002634 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002635 filename = support.TESTFN
2636 self.addCleanup(support.unlink, filename)
2637 with open(filename, "w") as fp:
2638 fp.write('import sys, os\n'
2639 'if os.getenv("FRUIT") != "orange=lemon":\n'
2640 ' raise AssertionError')
2641 args = [sys.executable, filename]
2642 newenv = os.environ.copy()
2643 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002644 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002645 self.assertEqual(exitcode, 0)
2646
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002647 @requires_os_func('spawnve')
2648 def test_spawnve_invalid_env(self):
2649 self._test_invalid_env(os.spawnve)
2650
2651 @requires_os_func('spawnvpe')
2652 def test_spawnvpe_invalid_env(self):
2653 self._test_invalid_env(os.spawnvpe)
2654
Serhiy Storchaka77703942017-06-25 07:33:01 +03002655
Brian Curtin0151b8e2010-09-24 13:43:43 +00002656# The introduction of this TestCase caused at least two different errors on
2657# *nix buildbots. Temporarily skip this to let the buildbots move along.
2658@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002659@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2660class LoginTests(unittest.TestCase):
2661 def test_getlogin(self):
2662 user_name = os.getlogin()
2663 self.assertNotEqual(len(user_name), 0)
2664
2665
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002666@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2667 "needs os.getpriority and os.setpriority")
2668class ProgramPriorityTests(unittest.TestCase):
2669 """Tests for os.getpriority() and os.setpriority()."""
2670
2671 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002672
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002673 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2674 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2675 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002676 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2677 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002678 raise unittest.SkipTest("unable to reliably test setpriority "
2679 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002680 else:
2681 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002682 finally:
2683 try:
2684 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2685 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002686 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002687 raise
2688
2689
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002690class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002691
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002692 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002693
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002694 def __init__(self, conn):
2695 asynchat.async_chat.__init__(self, conn)
2696 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002697 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 self.closed = False
2699 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002700
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701 def handle_read(self):
2702 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002703 if self.accumulate:
2704 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002705
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 def get_data(self):
2707 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002708
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002709 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002710 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002711 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002712
2713 def handle_error(self):
2714 raise
2715
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002716 def __init__(self, address):
2717 threading.Thread.__init__(self)
2718 asyncore.dispatcher.__init__(self)
2719 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2720 self.bind(address)
2721 self.listen(5)
2722 self.host, self.port = self.socket.getsockname()[:2]
2723 self.handler_instance = None
2724 self._active = False
2725 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002726
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002727 # --- public API
2728
2729 @property
2730 def running(self):
2731 return self._active
2732
2733 def start(self):
2734 assert not self.running
2735 self.__flag = threading.Event()
2736 threading.Thread.start(self)
2737 self.__flag.wait()
2738
2739 def stop(self):
2740 assert self.running
2741 self._active = False
2742 self.join()
2743
2744 def wait(self):
2745 # wait for handler connection to be closed, then stop the server
2746 while not getattr(self.handler_instance, "closed", False):
2747 time.sleep(0.001)
2748 self.stop()
2749
2750 # --- internals
2751
2752 def run(self):
2753 self._active = True
2754 self.__flag.set()
2755 while self._active and asyncore.socket_map:
2756 self._active_lock.acquire()
2757 asyncore.loop(timeout=0.001, count=1)
2758 self._active_lock.release()
2759 asyncore.close_all()
2760
2761 def handle_accept(self):
2762 conn, addr = self.accept()
2763 self.handler_instance = self.Handler(conn)
2764
2765 def handle_connect(self):
2766 self.close()
2767 handle_read = handle_connect
2768
2769 def writable(self):
2770 return 0
2771
2772 def handle_error(self):
2773 raise
2774
2775
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002776@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2777class TestSendfile(unittest.TestCase):
2778
Victor Stinner8c663fd2017-11-08 14:44:44 -08002779 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002780 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002781 not sys.platform.startswith("solaris") and \
2782 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002783 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2784 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002785 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2786 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002787
2788 @classmethod
2789 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002790 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002791 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002792
2793 @classmethod
2794 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002795 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002796 support.unlink(support.TESTFN)
2797
2798 def setUp(self):
2799 self.server = SendfileTestServer((support.HOST, 0))
2800 self.server.start()
2801 self.client = socket.socket()
2802 self.client.connect((self.server.host, self.server.port))
2803 self.client.settimeout(1)
2804 # synchronize by waiting for "220 ready" response
2805 self.client.recv(1024)
2806 self.sockno = self.client.fileno()
2807 self.file = open(support.TESTFN, 'rb')
2808 self.fileno = self.file.fileno()
2809
2810 def tearDown(self):
2811 self.file.close()
2812 self.client.close()
2813 if self.server.running:
2814 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002815 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002816
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002817 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002818 """A higher level wrapper representing how an application is
2819 supposed to use sendfile().
2820 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002821 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002822 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002823 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002824 except OSError as err:
2825 if err.errno == errno.ECONNRESET:
2826 # disconnected
2827 raise
2828 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2829 # we have to retry send data
2830 continue
2831 else:
2832 raise
2833
2834 def test_send_whole_file(self):
2835 # normal send
2836 total_sent = 0
2837 offset = 0
2838 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002839 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002840 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2841 if sent == 0:
2842 break
2843 offset += sent
2844 total_sent += sent
2845 self.assertTrue(sent <= nbytes)
2846 self.assertEqual(offset, total_sent)
2847
2848 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002849 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002850 self.client.close()
2851 self.server.wait()
2852 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002853 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002854 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002855
2856 def test_send_at_certain_offset(self):
2857 # start sending a file at a certain offset
2858 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002859 offset = len(self.DATA) // 2
2860 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002861 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002862 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002863 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2864 if sent == 0:
2865 break
2866 offset += sent
2867 total_sent += sent
2868 self.assertTrue(sent <= nbytes)
2869
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002870 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002871 self.client.close()
2872 self.server.wait()
2873 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002874 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002875 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002876 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002877 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002878
2879 def test_offset_overflow(self):
2880 # specify an offset > file size
2881 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002882 try:
2883 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2884 except OSError as e:
2885 # Solaris can raise EINVAL if offset >= file length, ignore.
2886 if e.errno != errno.EINVAL:
2887 raise
2888 else:
2889 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002890 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002891 self.client.close()
2892 self.server.wait()
2893 data = self.server.handler_instance.get_data()
2894 self.assertEqual(data, b'')
2895
2896 def test_invalid_offset(self):
2897 with self.assertRaises(OSError) as cm:
2898 os.sendfile(self.sockno, self.fileno, -1, 4096)
2899 self.assertEqual(cm.exception.errno, errno.EINVAL)
2900
Martin Panterbf19d162015-09-09 01:01:13 +00002901 def test_keywords(self):
2902 # Keyword arguments should be supported
2903 os.sendfile(out=self.sockno, offset=0, count=4096,
2904 **{'in': self.fileno})
2905 if self.SUPPORT_HEADERS_TRAILERS:
2906 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002907 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002908
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002909 # --- headers / trailers tests
2910
Serhiy Storchaka43767632013-11-03 21:31:38 +02002911 @requires_headers_trailers
2912 def test_headers(self):
2913 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002914 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002915 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002916 headers=[b"x" * 512, b"y" * 256])
2917 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002918 total_sent += sent
2919 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002920 while total_sent < len(expected_data):
2921 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002922 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2923 offset, nbytes)
2924 if sent == 0:
2925 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002926 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002927 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002928 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002929
Serhiy Storchaka43767632013-11-03 21:31:38 +02002930 self.assertEqual(total_sent, len(expected_data))
2931 self.client.close()
2932 self.server.wait()
2933 data = self.server.handler_instance.get_data()
2934 self.assertEqual(hash(data), hash(expected_data))
2935
2936 @requires_headers_trailers
2937 def test_trailers(self):
2938 TESTFN2 = support.TESTFN + "2"
2939 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002940
2941 self.addCleanup(support.unlink, TESTFN2)
2942 create_file(TESTFN2, file_data)
2943
2944 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002945 os.sendfile(self.sockno, f.fileno(), 0, 5,
2946 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002947 self.client.close()
2948 self.server.wait()
2949 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002950 self.assertEqual(data, b"abcde123456789")
2951
2952 @requires_headers_trailers
2953 @requires_32b
2954 def test_headers_overflow_32bits(self):
2955 self.server.handler_instance.accumulate = False
2956 with self.assertRaises(OSError) as cm:
2957 os.sendfile(self.sockno, self.fileno, 0, 0,
2958 headers=[b"x" * 2**16] * 2**15)
2959 self.assertEqual(cm.exception.errno, errno.EINVAL)
2960
2961 @requires_headers_trailers
2962 @requires_32b
2963 def test_trailers_overflow_32bits(self):
2964 self.server.handler_instance.accumulate = False
2965 with self.assertRaises(OSError) as cm:
2966 os.sendfile(self.sockno, self.fileno, 0, 0,
2967 trailers=[b"x" * 2**16] * 2**15)
2968 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002969
Serhiy Storchaka43767632013-11-03 21:31:38 +02002970 @requires_headers_trailers
2971 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2972 'test needs os.SF_NODISKIO')
2973 def test_flags(self):
2974 try:
2975 os.sendfile(self.sockno, self.fileno, 0, 4096,
2976 flags=os.SF_NODISKIO)
2977 except OSError as err:
2978 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2979 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002980
2981
Larry Hastings9cf065c2012-06-22 16:30:09 -07002982def supports_extended_attributes():
2983 if not hasattr(os, "setxattr"):
2984 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002985
Larry Hastings9cf065c2012-06-22 16:30:09 -07002986 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002987 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002988 try:
2989 os.setxattr(fp.fileno(), b"user.test", b"")
2990 except OSError:
2991 return False
2992 finally:
2993 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002994
2995 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002996
2997
2998@unittest.skipUnless(supports_extended_attributes(),
2999 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003000# Kernels < 2.6.39 don't respect setxattr flags.
3001@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003002class ExtendedAttributeTests(unittest.TestCase):
3003
Larry Hastings9cf065c2012-06-22 16:30:09 -07003004 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003005 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003006 self.addCleanup(support.unlink, fn)
3007 create_file(fn)
3008
Benjamin Peterson799bd802011-08-31 22:15:17 -04003009 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003010 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003011 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003012
Victor Stinnerf12e5062011-10-16 22:12:03 +02003013 init_xattr = listxattr(fn)
3014 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003015
Larry Hastings9cf065c2012-06-22 16:30:09 -07003016 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003017 xattr = set(init_xattr)
3018 xattr.add("user.test")
3019 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003020 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3021 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3022 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003023
Benjamin Peterson799bd802011-08-31 22:15:17 -04003024 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003025 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003026 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003027
Benjamin Peterson799bd802011-08-31 22:15:17 -04003028 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003029 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003030 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003031
Larry Hastings9cf065c2012-06-22 16:30:09 -07003032 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003033 xattr.add("user.test2")
3034 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003035 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003036
Benjamin Peterson799bd802011-08-31 22:15:17 -04003037 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003038 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003039 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003040
Victor Stinnerf12e5062011-10-16 22:12:03 +02003041 xattr.remove("user.test")
3042 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003043 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3044 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3045 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3046 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003047 many = sorted("user.test{}".format(i) for i in range(100))
3048 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003049 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003050 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003051
Larry Hastings9cf065c2012-06-22 16:30:09 -07003052 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003053 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003054 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003055
3056 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3057 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003058
3059 def test_simple(self):
3060 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3061 os.listxattr)
3062
3063 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003064 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3065 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003066
3067 def test_fds(self):
3068 def getxattr(path, *args):
3069 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003070 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003071 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003072 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003073 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003074 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003075 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003076 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003077 def listxattr(path, *args):
3078 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003079 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003080 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3081
3082
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003083@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3084class TermsizeTests(unittest.TestCase):
3085 def test_does_not_crash(self):
3086 """Check if get_terminal_size() returns a meaningful value.
3087
3088 There's no easy portable way to actually check the size of the
3089 terminal, so let's check if it returns something sensible instead.
3090 """
3091 try:
3092 size = os.get_terminal_size()
3093 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003094 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003095 # Under win32 a generic OSError can be thrown if the
3096 # handle cannot be retrieved
3097 self.skipTest("failed to query terminal size")
3098 raise
3099
Antoine Pitroucfade362012-02-08 23:48:59 +01003100 self.assertGreaterEqual(size.columns, 0)
3101 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003102
3103 def test_stty_match(self):
3104 """Check if stty returns the same results
3105
3106 stty actually tests stdin, so get_terminal_size is invoked on
3107 stdin explicitly. If stty succeeded, then get_terminal_size()
3108 should work too.
3109 """
3110 try:
3111 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003112 except (FileNotFoundError, subprocess.CalledProcessError,
3113 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003114 self.skipTest("stty invocation failed")
3115 expected = (int(size[1]), int(size[0])) # reversed order
3116
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003117 try:
3118 actual = os.get_terminal_size(sys.__stdin__.fileno())
3119 except OSError as e:
3120 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3121 # Under win32 a generic OSError can be thrown if the
3122 # handle cannot be retrieved
3123 self.skipTest("failed to query terminal size")
3124 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003125 self.assertEqual(expected, actual)
3126
3127
Victor Stinner292c8352012-10-30 02:17:38 +01003128class OSErrorTests(unittest.TestCase):
3129 def setUp(self):
3130 class Str(str):
3131 pass
3132
Victor Stinnerafe17062012-10-31 22:47:43 +01003133 self.bytes_filenames = []
3134 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003135 if support.TESTFN_UNENCODABLE is not None:
3136 decoded = support.TESTFN_UNENCODABLE
3137 else:
3138 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003139 self.unicode_filenames.append(decoded)
3140 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003141 if support.TESTFN_UNDECODABLE is not None:
3142 encoded = support.TESTFN_UNDECODABLE
3143 else:
3144 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003145 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003146 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003147 self.bytes_filenames.append(memoryview(encoded))
3148
3149 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003150
3151 def test_oserror_filename(self):
3152 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003153 (self.filenames, os.chdir,),
3154 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003155 (self.filenames, os.lstat,),
3156 (self.filenames, os.open, os.O_RDONLY),
3157 (self.filenames, os.rmdir,),
3158 (self.filenames, os.stat,),
3159 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003160 ]
3161 if sys.platform == "win32":
3162 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003163 (self.bytes_filenames, os.rename, b"dst"),
3164 (self.bytes_filenames, os.replace, b"dst"),
3165 (self.unicode_filenames, os.rename, "dst"),
3166 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003167 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003168 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003169 else:
3170 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003171 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003172 (self.filenames, os.rename, "dst"),
3173 (self.filenames, os.replace, "dst"),
3174 ))
3175 if hasattr(os, "chown"):
3176 funcs.append((self.filenames, os.chown, 0, 0))
3177 if hasattr(os, "lchown"):
3178 funcs.append((self.filenames, os.lchown, 0, 0))
3179 if hasattr(os, "truncate"):
3180 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003181 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003182 funcs.append((self.filenames, os.chflags, 0))
3183 if hasattr(os, "lchflags"):
3184 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003185 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003186 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003187 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003188 if sys.platform == "win32":
3189 funcs.append((self.bytes_filenames, os.link, b"dst"))
3190 funcs.append((self.unicode_filenames, os.link, "dst"))
3191 else:
3192 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003193 if hasattr(os, "listxattr"):
3194 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003195 (self.filenames, os.listxattr,),
3196 (self.filenames, os.getxattr, "user.test"),
3197 (self.filenames, os.setxattr, "user.test", b'user'),
3198 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003199 ))
3200 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003201 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003202 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003203 if sys.platform == "win32":
3204 funcs.append((self.unicode_filenames, os.readlink,))
3205 else:
3206 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003207
Steve Dowercc16be82016-09-08 10:35:16 -07003208
Victor Stinnerafe17062012-10-31 22:47:43 +01003209 for filenames, func, *func_args in funcs:
3210 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003211 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003212 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003213 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003214 else:
3215 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3216 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003217 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003218 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003219 except UnicodeDecodeError:
3220 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003221 else:
3222 self.fail("No exception thrown by {}".format(func))
3223
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003224class CPUCountTests(unittest.TestCase):
3225 def test_cpu_count(self):
3226 cpus = os.cpu_count()
3227 if cpus is not None:
3228 self.assertIsInstance(cpus, int)
3229 self.assertGreater(cpus, 0)
3230 else:
3231 self.skipTest("Could not determine the number of CPUs")
3232
Victor Stinnerdaf45552013-08-28 00:53:59 +02003233
3234class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003235 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003236 fd = os.open(__file__, os.O_RDONLY)
3237 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003238 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003239
Victor Stinnerdaf45552013-08-28 00:53:59 +02003240 os.set_inheritable(fd, True)
3241 self.assertEqual(os.get_inheritable(fd), True)
3242
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003243 @unittest.skipIf(fcntl is None, "need fcntl")
3244 def test_get_inheritable_cloexec(self):
3245 fd = os.open(__file__, os.O_RDONLY)
3246 self.addCleanup(os.close, fd)
3247 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003248
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003249 # clear FD_CLOEXEC flag
3250 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3251 flags &= ~fcntl.FD_CLOEXEC
3252 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003253
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003254 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003255
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003256 @unittest.skipIf(fcntl is None, "need fcntl")
3257 def test_set_inheritable_cloexec(self):
3258 fd = os.open(__file__, os.O_RDONLY)
3259 self.addCleanup(os.close, fd)
3260 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3261 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003262
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003263 os.set_inheritable(fd, True)
3264 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3265 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003266
Victor Stinnerdaf45552013-08-28 00:53:59 +02003267 def test_open(self):
3268 fd = os.open(__file__, os.O_RDONLY)
3269 self.addCleanup(os.close, fd)
3270 self.assertEqual(os.get_inheritable(fd), False)
3271
3272 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3273 def test_pipe(self):
3274 rfd, wfd = os.pipe()
3275 self.addCleanup(os.close, rfd)
3276 self.addCleanup(os.close, wfd)
3277 self.assertEqual(os.get_inheritable(rfd), False)
3278 self.assertEqual(os.get_inheritable(wfd), False)
3279
3280 def test_dup(self):
3281 fd1 = os.open(__file__, os.O_RDONLY)
3282 self.addCleanup(os.close, fd1)
3283
3284 fd2 = os.dup(fd1)
3285 self.addCleanup(os.close, fd2)
3286 self.assertEqual(os.get_inheritable(fd2), False)
3287
3288 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3289 def test_dup2(self):
3290 fd = os.open(__file__, os.O_RDONLY)
3291 self.addCleanup(os.close, fd)
3292
3293 # inheritable by default
3294 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003295 self.addCleanup(os.close, fd2)
3296 self.assertEqual(os.dup2(fd, fd2), fd2)
3297 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003298
3299 # force non-inheritable
3300 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003301 self.addCleanup(os.close, fd3)
3302 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3303 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003304
3305 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3306 def test_openpty(self):
3307 master_fd, slave_fd = os.openpty()
3308 self.addCleanup(os.close, master_fd)
3309 self.addCleanup(os.close, slave_fd)
3310 self.assertEqual(os.get_inheritable(master_fd), False)
3311 self.assertEqual(os.get_inheritable(slave_fd), False)
3312
3313
Brett Cannon3f9183b2016-08-26 14:44:48 -07003314class PathTConverterTests(unittest.TestCase):
3315 # tuples of (function name, allows fd arguments, additional arguments to
3316 # function, cleanup function)
3317 functions = [
3318 ('stat', True, (), None),
3319 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003320 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003321 ('chflags', False, (0,), None),
3322 ('lchflags', False, (0,), None),
3323 ('open', False, (0,), getattr(os, 'close', None)),
3324 ]
3325
3326 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003327 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003328 if os.name == 'nt':
3329 bytes_fspath = bytes_filename = None
3330 else:
3331 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003332 bytes_fspath = FakePath(bytes_filename)
3333 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003334 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003335 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003336
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003337 int_fspath = FakePath(fd)
3338 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003339
3340 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3341 with self.subTest(name=name):
3342 try:
3343 fn = getattr(os, name)
3344 except AttributeError:
3345 continue
3346
Brett Cannon8f96a302016-08-26 19:30:11 -07003347 for path in (str_filename, bytes_filename, str_fspath,
3348 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003349 if path is None:
3350 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003351 with self.subTest(name=name, path=path):
3352 result = fn(path, *extra_args)
3353 if cleanup_fn is not None:
3354 cleanup_fn(result)
3355
3356 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003357 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003358 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003359
3360 if allow_fd:
3361 result = fn(fd, *extra_args) # should not fail
3362 if cleanup_fn is not None:
3363 cleanup_fn(result)
3364 else:
3365 with self.assertRaisesRegex(
3366 TypeError,
3367 'os.PathLike'):
3368 fn(fd, *extra_args)
3369
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003370 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003371 msg = r'__fspath__\(\) to return str or bytes, not %s'
3372 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003373 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003374 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003375 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003376 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003377 os.stat(FakePath(object()))
3378
Brett Cannon3f9183b2016-08-26 14:44:48 -07003379
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003380@unittest.skipUnless(hasattr(os, 'get_blocking'),
3381 'needs os.get_blocking() and os.set_blocking()')
3382class BlockingTests(unittest.TestCase):
3383 def test_blocking(self):
3384 fd = os.open(__file__, os.O_RDONLY)
3385 self.addCleanup(os.close, fd)
3386 self.assertEqual(os.get_blocking(fd), True)
3387
3388 os.set_blocking(fd, False)
3389 self.assertEqual(os.get_blocking(fd), False)
3390
3391 os.set_blocking(fd, True)
3392 self.assertEqual(os.get_blocking(fd), True)
3393
3394
Yury Selivanov97e2e062014-09-26 12:33:06 -04003395
3396class ExportsTests(unittest.TestCase):
3397 def test_os_all(self):
3398 self.assertIn('open', os.__all__)
3399 self.assertIn('walk', os.__all__)
3400
3401
Victor Stinner6036e442015-03-08 01:58:04 +01003402class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003403 check_no_resource_warning = support.check_no_resource_warning
3404
Victor Stinner6036e442015-03-08 01:58:04 +01003405 def setUp(self):
3406 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003407 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003408 self.addCleanup(support.rmtree, self.path)
3409 os.mkdir(self.path)
3410
3411 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003412 path = self.bytes_path if isinstance(name, bytes) else self.path
3413 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003414 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003415 return filename
3416
3417 def get_entries(self, names):
3418 entries = dict((entry.name, entry)
3419 for entry in os.scandir(self.path))
3420 self.assertEqual(sorted(entries.keys()), names)
3421 return entries
3422
3423 def assert_stat_equal(self, stat1, stat2, skip_fields):
3424 if skip_fields:
3425 for attr in dir(stat1):
3426 if not attr.startswith("st_"):
3427 continue
3428 if attr in ("st_dev", "st_ino", "st_nlink"):
3429 continue
3430 self.assertEqual(getattr(stat1, attr),
3431 getattr(stat2, attr),
3432 (stat1, stat2, attr))
3433 else:
3434 self.assertEqual(stat1, stat2)
3435
3436 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003437 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003438 self.assertEqual(entry.name, name)
3439 self.assertEqual(entry.path, os.path.join(self.path, name))
3440 self.assertEqual(entry.inode(),
3441 os.stat(entry.path, follow_symlinks=False).st_ino)
3442
3443 entry_stat = os.stat(entry.path)
3444 self.assertEqual(entry.is_dir(),
3445 stat.S_ISDIR(entry_stat.st_mode))
3446 self.assertEqual(entry.is_file(),
3447 stat.S_ISREG(entry_stat.st_mode))
3448 self.assertEqual(entry.is_symlink(),
3449 os.path.islink(entry.path))
3450
3451 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3452 self.assertEqual(entry.is_dir(follow_symlinks=False),
3453 stat.S_ISDIR(entry_lstat.st_mode))
3454 self.assertEqual(entry.is_file(follow_symlinks=False),
3455 stat.S_ISREG(entry_lstat.st_mode))
3456
3457 self.assert_stat_equal(entry.stat(),
3458 entry_stat,
3459 os.name == 'nt' and not is_symlink)
3460 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3461 entry_lstat,
3462 os.name == 'nt')
3463
3464 def test_attributes(self):
3465 link = hasattr(os, 'link')
3466 symlink = support.can_symlink()
3467
3468 dirname = os.path.join(self.path, "dir")
3469 os.mkdir(dirname)
3470 filename = self.create_file("file.txt")
3471 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003472 try:
3473 os.link(filename, os.path.join(self.path, "link_file.txt"))
3474 except PermissionError as e:
3475 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003476 if symlink:
3477 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3478 target_is_directory=True)
3479 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3480
3481 names = ['dir', 'file.txt']
3482 if link:
3483 names.append('link_file.txt')
3484 if symlink:
3485 names.extend(('symlink_dir', 'symlink_file.txt'))
3486 entries = self.get_entries(names)
3487
3488 entry = entries['dir']
3489 self.check_entry(entry, 'dir', True, False, False)
3490
3491 entry = entries['file.txt']
3492 self.check_entry(entry, 'file.txt', False, True, False)
3493
3494 if link:
3495 entry = entries['link_file.txt']
3496 self.check_entry(entry, 'link_file.txt', False, True, False)
3497
3498 if symlink:
3499 entry = entries['symlink_dir']
3500 self.check_entry(entry, 'symlink_dir', True, False, True)
3501
3502 entry = entries['symlink_file.txt']
3503 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3504
3505 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003506 path = self.bytes_path if isinstance(name, bytes) else self.path
3507 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003508 self.assertEqual(len(entries), 1)
3509
3510 entry = entries[0]
3511 self.assertEqual(entry.name, name)
3512 return entry
3513
Brett Cannon96881cd2016-06-10 14:37:21 -07003514 def create_file_entry(self, name='file.txt'):
3515 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003516 return self.get_entry(os.path.basename(filename))
3517
3518 def test_current_directory(self):
3519 filename = self.create_file()
3520 old_dir = os.getcwd()
3521 try:
3522 os.chdir(self.path)
3523
3524 # call scandir() without parameter: it must list the content
3525 # of the current directory
3526 entries = dict((entry.name, entry) for entry in os.scandir())
3527 self.assertEqual(sorted(entries.keys()),
3528 [os.path.basename(filename)])
3529 finally:
3530 os.chdir(old_dir)
3531
3532 def test_repr(self):
3533 entry = self.create_file_entry()
3534 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3535
Brett Cannon96881cd2016-06-10 14:37:21 -07003536 def test_fspath_protocol(self):
3537 entry = self.create_file_entry()
3538 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3539
3540 def test_fspath_protocol_bytes(self):
3541 bytes_filename = os.fsencode('bytesfile.txt')
3542 bytes_entry = self.create_file_entry(name=bytes_filename)
3543 fspath = os.fspath(bytes_entry)
3544 self.assertIsInstance(fspath, bytes)
3545 self.assertEqual(fspath,
3546 os.path.join(os.fsencode(self.path),bytes_filename))
3547
Victor Stinner6036e442015-03-08 01:58:04 +01003548 def test_removed_dir(self):
3549 path = os.path.join(self.path, 'dir')
3550
3551 os.mkdir(path)
3552 entry = self.get_entry('dir')
3553 os.rmdir(path)
3554
3555 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3556 if os.name == 'nt':
3557 self.assertTrue(entry.is_dir())
3558 self.assertFalse(entry.is_file())
3559 self.assertFalse(entry.is_symlink())
3560 if os.name == 'nt':
3561 self.assertRaises(FileNotFoundError, entry.inode)
3562 # don't fail
3563 entry.stat()
3564 entry.stat(follow_symlinks=False)
3565 else:
3566 self.assertGreater(entry.inode(), 0)
3567 self.assertRaises(FileNotFoundError, entry.stat)
3568 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3569
3570 def test_removed_file(self):
3571 entry = self.create_file_entry()
3572 os.unlink(entry.path)
3573
3574 self.assertFalse(entry.is_dir())
3575 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3576 if os.name == 'nt':
3577 self.assertTrue(entry.is_file())
3578 self.assertFalse(entry.is_symlink())
3579 if os.name == 'nt':
3580 self.assertRaises(FileNotFoundError, entry.inode)
3581 # don't fail
3582 entry.stat()
3583 entry.stat(follow_symlinks=False)
3584 else:
3585 self.assertGreater(entry.inode(), 0)
3586 self.assertRaises(FileNotFoundError, entry.stat)
3587 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3588
3589 def test_broken_symlink(self):
3590 if not support.can_symlink():
3591 return self.skipTest('cannot create symbolic link')
3592
3593 filename = self.create_file("file.txt")
3594 os.symlink(filename,
3595 os.path.join(self.path, "symlink.txt"))
3596 entries = self.get_entries(['file.txt', 'symlink.txt'])
3597 entry = entries['symlink.txt']
3598 os.unlink(filename)
3599
3600 self.assertGreater(entry.inode(), 0)
3601 self.assertFalse(entry.is_dir())
3602 self.assertFalse(entry.is_file()) # broken symlink returns False
3603 self.assertFalse(entry.is_dir(follow_symlinks=False))
3604 self.assertFalse(entry.is_file(follow_symlinks=False))
3605 self.assertTrue(entry.is_symlink())
3606 self.assertRaises(FileNotFoundError, entry.stat)
3607 # don't fail
3608 entry.stat(follow_symlinks=False)
3609
3610 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003611 self.create_file("file.txt")
3612
3613 path_bytes = os.fsencode(self.path)
3614 entries = list(os.scandir(path_bytes))
3615 self.assertEqual(len(entries), 1, entries)
3616 entry = entries[0]
3617
3618 self.assertEqual(entry.name, b'file.txt')
3619 self.assertEqual(entry.path,
3620 os.fsencode(os.path.join(self.path, 'file.txt')))
3621
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003622 def test_bytes_like(self):
3623 self.create_file("file.txt")
3624
3625 for cls in bytearray, memoryview:
3626 path_bytes = cls(os.fsencode(self.path))
3627 with self.assertWarns(DeprecationWarning):
3628 entries = list(os.scandir(path_bytes))
3629 self.assertEqual(len(entries), 1, entries)
3630 entry = entries[0]
3631
3632 self.assertEqual(entry.name, b'file.txt')
3633 self.assertEqual(entry.path,
3634 os.fsencode(os.path.join(self.path, 'file.txt')))
3635 self.assertIs(type(entry.name), bytes)
3636 self.assertIs(type(entry.path), bytes)
3637
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003638 @unittest.skipUnless(os.listdir in os.supports_fd,
3639 'fd support for listdir required for this test.')
3640 def test_fd(self):
3641 self.assertIn(os.scandir, os.supports_fd)
3642 self.create_file('file.txt')
3643 expected_names = ['file.txt']
3644 if support.can_symlink():
3645 os.symlink('file.txt', os.path.join(self.path, 'link'))
3646 expected_names.append('link')
3647
3648 fd = os.open(self.path, os.O_RDONLY)
3649 try:
3650 with os.scandir(fd) as it:
3651 entries = list(it)
3652 names = [entry.name for entry in entries]
3653 self.assertEqual(sorted(names), expected_names)
3654 self.assertEqual(names, os.listdir(fd))
3655 for entry in entries:
3656 self.assertEqual(entry.path, entry.name)
3657 self.assertEqual(os.fspath(entry), entry.name)
3658 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3659 if os.stat in os.supports_dir_fd:
3660 st = os.stat(entry.name, dir_fd=fd)
3661 self.assertEqual(entry.stat(), st)
3662 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3663 self.assertEqual(entry.stat(follow_symlinks=False), st)
3664 finally:
3665 os.close(fd)
3666
Victor Stinner6036e442015-03-08 01:58:04 +01003667 def test_empty_path(self):
3668 self.assertRaises(FileNotFoundError, os.scandir, '')
3669
3670 def test_consume_iterator_twice(self):
3671 self.create_file("file.txt")
3672 iterator = os.scandir(self.path)
3673
3674 entries = list(iterator)
3675 self.assertEqual(len(entries), 1, entries)
3676
3677 # check than consuming the iterator twice doesn't raise exception
3678 entries2 = list(iterator)
3679 self.assertEqual(len(entries2), 0, entries2)
3680
3681 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003682 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003683 self.assertRaises(TypeError, os.scandir, obj)
3684
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003685 def test_close(self):
3686 self.create_file("file.txt")
3687 self.create_file("file2.txt")
3688 iterator = os.scandir(self.path)
3689 next(iterator)
3690 iterator.close()
3691 # multiple closes
3692 iterator.close()
3693 with self.check_no_resource_warning():
3694 del iterator
3695
3696 def test_context_manager(self):
3697 self.create_file("file.txt")
3698 self.create_file("file2.txt")
3699 with os.scandir(self.path) as iterator:
3700 next(iterator)
3701 with self.check_no_resource_warning():
3702 del iterator
3703
3704 def test_context_manager_close(self):
3705 self.create_file("file.txt")
3706 self.create_file("file2.txt")
3707 with os.scandir(self.path) as iterator:
3708 next(iterator)
3709 iterator.close()
3710
3711 def test_context_manager_exception(self):
3712 self.create_file("file.txt")
3713 self.create_file("file2.txt")
3714 with self.assertRaises(ZeroDivisionError):
3715 with os.scandir(self.path) as iterator:
3716 next(iterator)
3717 1/0
3718 with self.check_no_resource_warning():
3719 del iterator
3720
3721 def test_resource_warning(self):
3722 self.create_file("file.txt")
3723 self.create_file("file2.txt")
3724 iterator = os.scandir(self.path)
3725 next(iterator)
3726 with self.assertWarns(ResourceWarning):
3727 del iterator
3728 support.gc_collect()
3729 # exhausted iterator
3730 iterator = os.scandir(self.path)
3731 list(iterator)
3732 with self.check_no_resource_warning():
3733 del iterator
3734
Victor Stinner6036e442015-03-08 01:58:04 +01003735
Ethan Furmancdc08792016-06-02 15:06:09 -07003736class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003737
3738 # Abstracted so it can be overridden to test pure Python implementation
3739 # if a C version is provided.
3740 fspath = staticmethod(os.fspath)
3741
Ethan Furmancdc08792016-06-02 15:06:09 -07003742 def test_return_bytes(self):
3743 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003744 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003745
3746 def test_return_string(self):
3747 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003748 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003749
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003750 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003751 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003752 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003753
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003754 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003755 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3756 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3757
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003758 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003759 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3760 self.assertTrue(issubclass(FakePath, os.PathLike))
3761 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003762
Ethan Furmancdc08792016-06-02 15:06:09 -07003763 def test_garbage_in_exception_out(self):
3764 vapor = type('blah', (), {})
3765 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003766 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003767
3768 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003769 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003770
Brett Cannon044283a2016-07-15 10:41:49 -07003771 def test_bad_pathlike(self):
3772 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003773 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003774 # __fspath__ attribute that is not callable.
3775 c = type('foo', (), {})
3776 c.__fspath__ = 1
3777 self.assertRaises(TypeError, self.fspath, c())
3778 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003779 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003780 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003781
Victor Stinnerc29b5852017-11-02 07:28:27 -07003782
3783class TimesTests(unittest.TestCase):
3784 def test_times(self):
3785 times = os.times()
3786 self.assertIsInstance(times, os.times_result)
3787
3788 for field in ('user', 'system', 'children_user', 'children_system',
3789 'elapsed'):
3790 value = getattr(times, field)
3791 self.assertIsInstance(value, float)
3792
3793 if os.name == 'nt':
3794 self.assertEqual(times.children_user, 0)
3795 self.assertEqual(times.children_system, 0)
3796 self.assertEqual(times.elapsed, 0)
3797
3798
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003799# Only test if the C version is provided, otherwise TestPEP519 already tested
3800# the pure Python implementation.
3801if hasattr(os, "_fspath"):
3802 class TestPEP519PurePython(TestPEP519):
3803
3804 """Explicitly test the pure Python implementation of os.fspath()."""
3805
3806 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003807
3808
Fred Drake2e2be372001-09-20 21:33:42 +00003809if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003810 unittest.main()