blob: a2021b1eba068f196c39bcb5a9c26dd2f1283e05 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070031from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020037try:
38 import fcntl
39except ImportError:
40 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010041try:
42 import _winapi
43except ImportError:
44 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020045try:
R David Murrayf2ad1732014-12-25 18:36:56 -050046 import grp
47 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
48 if hasattr(os, 'getgid'):
49 process_gid = os.getgid()
50 if process_gid not in groups:
51 groups.append(process_gid)
52except ImportError:
53 groups = []
54try:
55 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Berker Peksagce643912015-05-06 06:33:17 +030064from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020065from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000066
Victor Stinner923590e2016-03-24 09:11:48 +010067
R David Murrayf2ad1732014-12-25 18:36:56 -050068root_in_posix = False
69if hasattr(os, 'geteuid'):
70 root_in_posix = (os.geteuid() == 0)
71
Mark Dickinson7cf03892010-04-16 13:45:35 +000072# Detect whether we're on a Linux system that uses the (now outdated
73# and unmaintained) linuxthreads threading library. There's an issue
74# when combining linuxthreads with a failed execv call: see
75# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020076if hasattr(sys, 'thread_info') and sys.thread_info.version:
77 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
78else:
79 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000080
Stefan Krahebee49a2013-01-17 15:31:00 +010081# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
82HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
83
Victor Stinner923590e2016-03-24 09:11:48 +010084
Berker Peksag4af23d72016-09-15 20:32:44 +030085def requires_os_func(name):
86 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
87
88
Victor Stinnerae39d232016-03-24 17:12:55 +010089def create_file(filename, content=b'content'):
90 with open(filename, "xb", 0) as fp:
91 fp.write(content)
92
93
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094# Tests creating TESTFN
95class FileTests(unittest.TestCase):
96 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000097 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000098 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000099 tearDown = setUp
100
101 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000102 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000103 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000104 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000105
Christian Heimesfdab48e2008-01-20 09:06:41 +0000106 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000107 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
108 # We must allocate two consecutive file descriptors, otherwise
109 # it will mess up other file descriptors (perhaps even the three
110 # standard ones).
111 second = os.dup(first)
112 try:
113 retries = 0
114 while second != first + 1:
115 os.close(first)
116 retries += 1
117 if retries > 10:
118 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000119 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000120 first, second = second, os.dup(second)
121 finally:
122 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000123 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000124 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000125 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000126
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000127 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000128 def test_rename(self):
129 path = support.TESTFN
130 old = sys.getrefcount(path)
131 self.assertRaises(TypeError, os.rename, path, 0)
132 new = sys.getrefcount(path)
133 self.assertEqual(old, new)
134
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000135 def test_read(self):
136 with open(support.TESTFN, "w+b") as fobj:
137 fobj.write(b"spam")
138 fobj.flush()
139 fd = fobj.fileno()
140 os.lseek(fd, 0, 0)
141 s = os.read(fd, 4)
142 self.assertEqual(type(s), bytes)
143 self.assertEqual(s, b"spam")
144
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200145 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200146 # Skip the test on 32-bit platforms: the number of bytes must fit in a
147 # Py_ssize_t type
148 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
149 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200150 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
151 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200152 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100153 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200154
155 # Issue #21932: Make sure that os.read() does not raise an
156 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200157 with open(support.TESTFN, "rb") as fp:
158 data = os.read(fp.fileno(), size)
159
Victor Stinner8c663fd2017-11-08 14:44:44 -0800160 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200161 # operating system is free to return less bytes than requested.
162 self.assertEqual(data, b'test')
163
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000164 def test_write(self):
165 # os.write() accepts bytes- and buffer-like objects but not strings
166 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
167 self.assertRaises(TypeError, os.write, fd, "beans")
168 os.write(fd, b"bacon\n")
169 os.write(fd, bytearray(b"eggs\n"))
170 os.write(fd, memoryview(b"spam\n"))
171 os.close(fd)
172 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000173 self.assertEqual(fobj.read().splitlines(),
174 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000175
Victor Stinnere0daff12011-03-20 23:36:35 +0100176 def write_windows_console(self, *args):
177 retcode = subprocess.call(args,
178 # use a new console to not flood the test output
179 creationflags=subprocess.CREATE_NEW_CONSOLE,
180 # use a shell to hide the console window (SW_HIDE)
181 shell=True)
182 self.assertEqual(retcode, 0)
183
184 @unittest.skipUnless(sys.platform == 'win32',
185 'test specific to the Windows console')
186 def test_write_windows_console(self):
187 # Issue #11395: the Windows console returns an error (12: not enough
188 # space error) on writing into stdout if stdout mode is binary and the
189 # length is greater than 66,000 bytes (or less, depending on heap
190 # usage).
191 code = "print('x' * 100000)"
192 self.write_windows_console(sys.executable, "-c", code)
193 self.write_windows_console(sys.executable, "-u", "-c", code)
194
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000195 def fdopen_helper(self, *args):
196 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200197 f = os.fdopen(fd, *args)
198 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000199
200 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200201 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
202 os.close(fd)
203
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000204 self.fdopen_helper()
205 self.fdopen_helper('r')
206 self.fdopen_helper('r', 100)
207
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100208 def test_replace(self):
209 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100210 self.addCleanup(support.unlink, support.TESTFN)
211 self.addCleanup(support.unlink, TESTFN2)
212
213 create_file(support.TESTFN, b"1")
214 create_file(TESTFN2, b"2")
215
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100216 os.replace(support.TESTFN, TESTFN2)
217 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
218 with open(TESTFN2, 'r') as f:
219 self.assertEqual(f.read(), "1")
220
Martin Panterbf19d162015-09-09 01:01:13 +0000221 def test_open_keywords(self):
222 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
223 dir_fd=None)
224 os.close(f)
225
226 def test_symlink_keywords(self):
227 symlink = support.get_attribute(os, "symlink")
228 try:
229 symlink(src='target', dst=support.TESTFN,
230 target_is_directory=False, dir_fd=None)
231 except (NotImplementedError, OSError):
232 pass # No OS support or unprivileged user
233
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235# Test attributes on return values from os.*stat* family.
236class StatAttributeTests(unittest.TestCase):
237 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200238 self.fname = support.TESTFN
239 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100240 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
Antoine Pitrou38425292010-09-21 18:19:07 +0000242 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000243 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244
245 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(result[stat.ST_SIZE], 3)
247 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 # Make sure all the attributes are there
250 members = dir(result)
251 for name in dir(stat):
252 if name[:3] == 'ST_':
253 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000254 if name.endswith("TIME"):
255 def trunc(x): return int(x)
256 else:
257 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700263 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700264 for name in 'st_atime st_mtime st_ctime'.split():
265 floaty = int(getattr(result, name) * 100000)
266 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700267 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700268
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 try:
270 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 except IndexError:
273 pass
274
275 # Make sure that assignment fails
276 try:
277 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200278 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000279 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000280 pass
281
282 try:
283 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200284 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000285 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 pass
287
288 try:
289 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except AttributeError:
292 pass
293
294 # Use the stat_result constructor with a too-short tuple.
295 try:
296 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200297 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000298 except TypeError:
299 pass
300
Ezio Melotti42da6632011-03-15 05:18:48 +0200301 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000302 try:
303 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
304 except TypeError:
305 pass
306
Antoine Pitrou38425292010-09-21 18:19:07 +0000307 def test_stat_attributes(self):
308 self.check_stat_attributes(self.fname)
309
310 def test_stat_attributes_bytes(self):
311 try:
312 fname = self.fname.encode(sys.getfilesystemencoding())
313 except UnicodeEncodeError:
314 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700329 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100340 self.assertTrue(isinstance(result.f_fsid, int))
341
342 # Test that the size of the tuple doesn't change
343 self.assertEqual(len(result), 10)
344
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700374 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Berker Peksag0b4dc482016-09-17 15:49:59 +0300434 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
435 def test_access_denied(self):
436 # Default to FindFirstFile WIN32_FIND_DATA when access is
437 # denied. See issue 28075.
438 # os.environ['TEMP'] should be located on a volume that
439 # supports file ACLs.
440 fname = os.path.join(os.environ['TEMP'], self.fname)
441 self.addCleanup(support.unlink, fname)
442 create_file(fname, b'ABC')
443 # Deny the right to [S]YNCHRONIZE on the file to
444 # force CreateFile to fail with ERROR_ACCESS_DENIED.
445 DETACHED_PROCESS = 8
446 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500447 # bpo-30584: Use security identifier *S-1-5-32-545 instead
448 # of localized "Users" to not depend on the locale.
449 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300450 creationflags=DETACHED_PROCESS
451 )
452 result = os.stat(fname)
453 self.assertNotEqual(result.st_size, 0)
454
Victor Stinner47aacc82015-06-12 17:26:23 +0200455
456class UtimeTests(unittest.TestCase):
457 def setUp(self):
458 self.dirname = support.TESTFN
459 self.fname = os.path.join(self.dirname, "f1")
460
461 self.addCleanup(support.rmtree, self.dirname)
462 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100463 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200464
Victor Stinner47aacc82015-06-12 17:26:23 +0200465 def support_subsecond(self, filename):
466 # Heuristic to check if the filesystem supports timestamp with
467 # subsecond resolution: check if float and int timestamps are different
468 st = os.stat(filename)
469 return ((st.st_atime != st[7])
470 or (st.st_mtime != st[8])
471 or (st.st_ctime != st[9]))
472
473 def _test_utime(self, set_time, filename=None):
474 if not filename:
475 filename = self.fname
476
477 support_subsecond = self.support_subsecond(filename)
478 if support_subsecond:
479 # Timestamp with a resolution of 1 microsecond (10^-6).
480 #
481 # The resolution of the C internal function used by os.utime()
482 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
483 # test with a resolution of 1 ns requires more work:
484 # see the issue #15745.
485 atime_ns = 1002003000 # 1.002003 seconds
486 mtime_ns = 4005006000 # 4.005006 seconds
487 else:
488 # use a resolution of 1 second
489 atime_ns = 5 * 10**9
490 mtime_ns = 8 * 10**9
491
492 set_time(filename, (atime_ns, mtime_ns))
493 st = os.stat(filename)
494
495 if support_subsecond:
496 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
497 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
498 else:
499 self.assertEqual(st.st_atime, atime_ns * 1e-9)
500 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
501 self.assertEqual(st.st_atime_ns, atime_ns)
502 self.assertEqual(st.st_mtime_ns, mtime_ns)
503
504 def test_utime(self):
505 def set_time(filename, ns):
506 # test the ns keyword parameter
507 os.utime(filename, ns=ns)
508 self._test_utime(set_time)
509
510 @staticmethod
511 def ns_to_sec(ns):
512 # Convert a number of nanosecond (int) to a number of seconds (float).
513 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
514 # issue, os.utime() rounds towards minus infinity.
515 return (ns * 1e-9) + 0.5e-9
516
517 def test_utime_by_indexed(self):
518 # pass times as floating point seconds as the second indexed parameter
519 def set_time(filename, ns):
520 atime_ns, mtime_ns = ns
521 atime = self.ns_to_sec(atime_ns)
522 mtime = self.ns_to_sec(mtime_ns)
523 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
524 # or utime(time_t)
525 os.utime(filename, (atime, mtime))
526 self._test_utime(set_time)
527
528 def test_utime_by_times(self):
529 def set_time(filename, ns):
530 atime_ns, mtime_ns = ns
531 atime = self.ns_to_sec(atime_ns)
532 mtime = self.ns_to_sec(mtime_ns)
533 # test the times keyword parameter
534 os.utime(filename, times=(atime, mtime))
535 self._test_utime(set_time)
536
537 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
538 "follow_symlinks support for utime required "
539 "for this test.")
540 def test_utime_nofollow_symlinks(self):
541 def set_time(filename, ns):
542 # use follow_symlinks=False to test utimensat(timespec)
543 # or lutimes(timeval)
544 os.utime(filename, ns=ns, follow_symlinks=False)
545 self._test_utime(set_time)
546
547 @unittest.skipUnless(os.utime in os.supports_fd,
548 "fd support for utime required for this test.")
549 def test_utime_fd(self):
550 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100551 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200552 # use a file descriptor to test futimens(timespec)
553 # or futimes(timeval)
554 os.utime(fp.fileno(), ns=ns)
555 self._test_utime(set_time)
556
557 @unittest.skipUnless(os.utime in os.supports_dir_fd,
558 "dir_fd support for utime required for this test.")
559 def test_utime_dir_fd(self):
560 def set_time(filename, ns):
561 dirname, name = os.path.split(filename)
562 dirfd = os.open(dirname, os.O_RDONLY)
563 try:
564 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
565 os.utime(name, dir_fd=dirfd, ns=ns)
566 finally:
567 os.close(dirfd)
568 self._test_utime(set_time)
569
570 def test_utime_directory(self):
571 def set_time(filename, ns):
572 # test calling os.utime() on a directory
573 os.utime(filename, ns=ns)
574 self._test_utime(set_time, filename=self.dirname)
575
576 def _test_utime_current(self, set_time):
577 # Get the system clock
578 current = time.time()
579
580 # Call os.utime() to set the timestamp to the current system clock
581 set_time(self.fname)
582
583 if not self.support_subsecond(self.fname):
584 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700585 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200586 # On Windows, the usual resolution of time.time() is 15.6 ms.
587 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700588 #
589 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
590 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200591 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200592 st = os.stat(self.fname)
593 msg = ("st_time=%r, current=%r, dt=%r"
594 % (st.st_mtime, current, st.st_mtime - current))
595 self.assertAlmostEqual(st.st_mtime, current,
596 delta=delta, msg=msg)
597
598 def test_utime_current(self):
599 def set_time(filename):
600 # Set to the current time in the new way
601 os.utime(self.fname)
602 self._test_utime_current(set_time)
603
604 def test_utime_current_old(self):
605 def set_time(filename):
606 # Set to the current time in the old explicit way.
607 os.utime(self.fname, None)
608 self._test_utime_current(set_time)
609
610 def get_file_system(self, path):
611 if sys.platform == 'win32':
612 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
613 import ctypes
614 kernel32 = ctypes.windll.kernel32
615 buf = ctypes.create_unicode_buffer("", 100)
616 ok = kernel32.GetVolumeInformationW(root, None, 0,
617 None, None, None,
618 buf, len(buf))
619 if ok:
620 return buf.value
621 # return None if the filesystem is unknown
622
623 def test_large_time(self):
624 # Many filesystems are limited to the year 2038. At least, the test
625 # pass with NTFS filesystem.
626 if self.get_file_system(self.dirname) != "NTFS":
627 self.skipTest("requires NTFS")
628
629 large = 5000000000 # some day in 2128
630 os.utime(self.fname, (large, large))
631 self.assertEqual(os.stat(self.fname).st_mtime, large)
632
633 def test_utime_invalid_arguments(self):
634 # seconds and nanoseconds parameters are mutually exclusive
635 with self.assertRaises(ValueError):
636 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200637 with self.assertRaises(TypeError):
638 os.utime(self.fname, [5, 5])
639 with self.assertRaises(TypeError):
640 os.utime(self.fname, (5,))
641 with self.assertRaises(TypeError):
642 os.utime(self.fname, (5, 5, 5))
643 with self.assertRaises(TypeError):
644 os.utime(self.fname, ns=[5, 5])
645 with self.assertRaises(TypeError):
646 os.utime(self.fname, ns=(5,))
647 with self.assertRaises(TypeError):
648 os.utime(self.fname, ns=(5, 5, 5))
649
650 if os.utime not in os.supports_follow_symlinks:
651 with self.assertRaises(NotImplementedError):
652 os.utime(self.fname, (5, 5), follow_symlinks=False)
653 if os.utime not in os.supports_fd:
654 with open(self.fname, 'wb', 0) as fp:
655 with self.assertRaises(TypeError):
656 os.utime(fp.fileno(), (5, 5))
657 if os.utime not in os.supports_dir_fd:
658 with self.assertRaises(NotImplementedError):
659 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200660
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300661 @support.cpython_only
662 def test_issue31577(self):
663 # The interpreter shouldn't crash in case utime() received a bad
664 # ns argument.
665 def get_bad_int(divmod_ret_val):
666 class BadInt:
667 def __divmod__(*args):
668 return divmod_ret_val
669 return BadInt()
670 with self.assertRaises(TypeError):
671 os.utime(self.fname, ns=(get_bad_int(42), 1))
672 with self.assertRaises(TypeError):
673 os.utime(self.fname, ns=(get_bad_int(()), 1))
674 with self.assertRaises(TypeError):
675 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
676
Victor Stinner47aacc82015-06-12 17:26:23 +0200677
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000678from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000679
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000680class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000681 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000682 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000683
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000684 def setUp(self):
685 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000686 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000687 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000688 for key, value in self._reference().items():
689 os.environ[key] = value
690
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000691 def tearDown(self):
692 os.environ.clear()
693 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000694 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000695 os.environb.clear()
696 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000697
Christian Heimes90333392007-11-01 19:08:42 +0000698 def _reference(self):
699 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
700
701 def _empty_mapping(self):
702 os.environ.clear()
703 return os.environ
704
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000705 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200706 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
707 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000708 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000709 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300710 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200711 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300712 value = popen.read().strip()
713 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000714
Xavier de Gayed1415312016-07-22 12:15:29 +0200715 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
716 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000717 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200718 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
719 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300720 it = iter(popen)
721 self.assertEqual(next(it), "line1\n")
722 self.assertEqual(next(it), "line2\n")
723 self.assertEqual(next(it), "line3\n")
724 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000725
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000726 # Verify environ keys and values from the OS are of the
727 # correct str type.
728 def test_keyvalue_types(self):
729 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000730 self.assertEqual(type(key), str)
731 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000732
Christian Heimes90333392007-11-01 19:08:42 +0000733 def test_items(self):
734 for key, value in self._reference().items():
735 self.assertEqual(os.environ.get(key), value)
736
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000737 # Issue 7310
738 def test___repr__(self):
739 """Check that the repr() of os.environ looks like environ({...})."""
740 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000741 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
742 '{!r}: {!r}'.format(key, value)
743 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000744
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000745 def test_get_exec_path(self):
746 defpath_list = os.defpath.split(os.pathsep)
747 test_path = ['/monty', '/python', '', '/flying/circus']
748 test_env = {'PATH': os.pathsep.join(test_path)}
749
750 saved_environ = os.environ
751 try:
752 os.environ = dict(test_env)
753 # Test that defaulting to os.environ works.
754 self.assertSequenceEqual(test_path, os.get_exec_path())
755 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
756 finally:
757 os.environ = saved_environ
758
759 # No PATH environment variable
760 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
761 # Empty PATH environment variable
762 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
763 # Supplied PATH environment variable
764 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
765
Victor Stinnerb745a742010-05-18 17:17:23 +0000766 if os.supports_bytes_environ:
767 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000768 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000769 # ignore BytesWarning warning
770 with warnings.catch_warnings(record=True):
771 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000772 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000773 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000774 pass
775 else:
776 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000777
778 # bytes key and/or value
779 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
780 ['abc'])
781 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
782 ['abc'])
783 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
784 ['abc'])
785
786 @unittest.skipUnless(os.supports_bytes_environ,
787 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000788 def test_environb(self):
789 # os.environ -> os.environb
790 value = 'euro\u20ac'
791 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000792 value_bytes = value.encode(sys.getfilesystemencoding(),
793 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000794 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000795 msg = "U+20AC character is not encodable to %s" % (
796 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000797 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000798 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000799 self.assertEqual(os.environ['unicode'], value)
800 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000801
802 # os.environb -> os.environ
803 value = b'\xff'
804 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000805 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000806 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000807 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000808
Victor Stinner13ff2452018-01-22 18:32:50 +0100809 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100810 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100811 def test_unset_error(self):
812 if sys.platform == "win32":
813 # an environment variable is limited to 32,767 characters
814 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100815 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100816 else:
817 # "=" is not allowed in a variable name
818 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100819 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100820
Victor Stinner6d101392013-04-14 16:35:04 +0200821 def test_key_type(self):
822 missing = 'missingkey'
823 self.assertNotIn(missing, os.environ)
824
Victor Stinner839e5ea2013-04-14 16:43:03 +0200825 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200826 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200827 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200828 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200829
Victor Stinner839e5ea2013-04-14 16:43:03 +0200830 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200831 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200832 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200833 self.assertTrue(cm.exception.__suppress_context__)
834
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300835 def _test_environ_iteration(self, collection):
836 iterator = iter(collection)
837 new_key = "__new_key__"
838
839 next(iterator) # start iteration over os.environ.items
840
841 # add a new key in os.environ mapping
842 os.environ[new_key] = "test_environ_iteration"
843
844 try:
845 next(iterator) # force iteration over modified mapping
846 self.assertEqual(os.environ[new_key], "test_environ_iteration")
847 finally:
848 del os.environ[new_key]
849
850 def test_iter_error_when_changing_os_environ(self):
851 self._test_environ_iteration(os.environ)
852
853 def test_iter_error_when_changing_os_environ_items(self):
854 self._test_environ_iteration(os.environ.items())
855
856 def test_iter_error_when_changing_os_environ_values(self):
857 self._test_environ_iteration(os.environ.values())
858
Victor Stinner6d101392013-04-14 16:35:04 +0200859
Tim Petersc4e09402003-04-25 07:11:48 +0000860class WalkTests(unittest.TestCase):
861 """Tests for os.walk()."""
862
Victor Stinner0561c532015-03-12 10:28:24 +0100863 # Wrapper to hide minor differences between os.walk and os.fwalk
864 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200865 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200866 if 'follow_symlinks' in kwargs:
867 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200868 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100869
Charles-François Natali7372b062012-02-05 15:15:38 +0100870 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100871 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100872 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000873
874 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000875 # TESTFN/
876 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000877 # tmp1
878 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000879 # tmp2
880 # SUB11/ no kids
881 # SUB2/ a file kid and a dirsymlink kid
882 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300883 # SUB21/ not readable
884 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000885 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200886 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300887 # broken_link2
888 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000889 # TEST2/
890 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100891 self.walk_path = join(support.TESTFN, "TEST1")
892 self.sub1_path = join(self.walk_path, "SUB1")
893 self.sub11_path = join(self.sub1_path, "SUB11")
894 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300895 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100896 tmp1_path = join(self.walk_path, "tmp1")
897 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000898 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300899 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100900 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000901 t2_path = join(support.TESTFN, "TEST2")
902 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200903 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300904 broken_link2_path = join(sub2_path, "broken_link2")
905 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000906
907 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100908 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000909 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300910 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000911 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100912
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300913 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100914 with open(path, "x") as f:
915 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000916
Victor Stinner0561c532015-03-12 10:28:24 +0100917 if support.can_symlink():
918 os.symlink(os.path.abspath(t2_path), self.link_path)
919 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300920 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
921 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300922 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300923 ["broken_link", "broken_link2", "broken_link3",
924 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100925 else:
pxinwr3e028b22019-02-15 13:04:47 +0800926 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100927
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300928 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300929 try:
930 os.listdir(sub21_path)
931 except PermissionError:
932 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
933 else:
934 os.chmod(sub21_path, stat.S_IRWXU)
935 os.unlink(tmp5_path)
936 os.rmdir(sub21_path)
937 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300938
Victor Stinner0561c532015-03-12 10:28:24 +0100939 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000940 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300941 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100942
Tim Petersc4e09402003-04-25 07:11:48 +0000943 self.assertEqual(len(all), 4)
944 # We can't know which order SUB1 and SUB2 will appear in.
945 # Not flipped: TESTFN, SUB1, SUB11, SUB2
946 # flipped: TESTFN, SUB2, SUB1, SUB11
947 flipped = all[0][1][0] != "SUB1"
948 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200949 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300950 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100951 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
952 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
953 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
954 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000955
Brett Cannon3f9183b2016-08-26 14:44:48 -0700956 def test_walk_prune(self, walk_path=None):
957 if walk_path is None:
958 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000959 # Prune the search.
960 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700961 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000962 all.append((root, dirs, files))
963 # Don't descend into SUB1.
964 if 'SUB1' in dirs:
965 # Note that this also mutates the dirs we appended to all!
966 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000967
Victor Stinner0561c532015-03-12 10:28:24 +0100968 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200969 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100970
971 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300972 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100973 self.assertEqual(all[1], self.sub2_tree)
974
Brett Cannon3f9183b2016-08-26 14:44:48 -0700975 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200976 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700977
Victor Stinner0561c532015-03-12 10:28:24 +0100978 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000979 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100980 all = list(self.walk(self.walk_path, topdown=False))
981
Victor Stinner53b0a412016-03-26 01:12:36 +0100982 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000983 # We can't know which order SUB1 and SUB2 will appear in.
984 # Not flipped: SUB11, SUB1, SUB2, TESTFN
985 # flipped: SUB2, SUB11, SUB1, TESTFN
986 flipped = all[3][1][0] != "SUB1"
987 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200988 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300989 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100990 self.assertEqual(all[3],
991 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
992 self.assertEqual(all[flipped],
993 (self.sub11_path, [], []))
994 self.assertEqual(all[flipped + 1],
995 (self.sub1_path, ["SUB11"], ["tmp2"]))
996 self.assertEqual(all[2 - 2 * flipped],
997 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000998
Victor Stinner0561c532015-03-12 10:28:24 +0100999 def test_walk_symlink(self):
1000 if not support.can_symlink():
1001 self.skipTest("need symlink support")
1002
1003 # Walk, following symlinks.
1004 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1005 for root, dirs, files in walk_it:
1006 if root == self.link_path:
1007 self.assertEqual(dirs, [])
1008 self.assertEqual(files, ["tmp4"])
1009 break
1010 else:
1011 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001012
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001013 def test_walk_bad_dir(self):
1014 # Walk top-down.
1015 errors = []
1016 walk_it = self.walk(self.walk_path, onerror=errors.append)
1017 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001018 self.assertEqual(errors, [])
1019 dir1 = 'SUB1'
1020 path1 = os.path.join(root, dir1)
1021 path1new = os.path.join(root, dir1 + '.new')
1022 os.rename(path1, path1new)
1023 try:
1024 roots = [r for r, d, f in walk_it]
1025 self.assertTrue(errors)
1026 self.assertNotIn(path1, roots)
1027 self.assertNotIn(path1new, roots)
1028 for dir2 in dirs:
1029 if dir2 != dir1:
1030 self.assertIn(os.path.join(root, dir2), roots)
1031 finally:
1032 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001033
Charles-François Natali7372b062012-02-05 15:15:38 +01001034
1035@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1036class FwalkTests(WalkTests):
1037 """Tests for os.fwalk()."""
1038
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001039 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001040 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001041 yield (root, dirs, files)
1042
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001043 def fwalk(self, *args, **kwargs):
1044 return os.fwalk(*args, **kwargs)
1045
Larry Hastingsc48fe982012-06-25 04:49:05 -07001046 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1047 """
1048 compare with walk() results.
1049 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001050 walk_kwargs = walk_kwargs.copy()
1051 fwalk_kwargs = fwalk_kwargs.copy()
1052 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1053 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1054 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001055
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001057 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001058 expected[root] = (set(dirs), set(files))
1059
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001060 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001061 self.assertIn(root, expected)
1062 self.assertEqual(expected[root], (set(dirs), set(files)))
1063
Larry Hastingsc48fe982012-06-25 04:49:05 -07001064 def test_compare_to_walk(self):
1065 kwargs = {'top': support.TESTFN}
1066 self._compare_to_walk(kwargs, kwargs)
1067
Charles-François Natali7372b062012-02-05 15:15:38 +01001068 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001069 try:
1070 fd = os.open(".", os.O_RDONLY)
1071 walk_kwargs = {'top': support.TESTFN}
1072 fwalk_kwargs = walk_kwargs.copy()
1073 fwalk_kwargs['dir_fd'] = fd
1074 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1075 finally:
1076 os.close(fd)
1077
1078 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001079 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001080 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1081 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001082 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001083 # check that the FD is valid
1084 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001085 # redundant check
1086 os.stat(rootfd)
1087 # check that listdir() returns consistent information
1088 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001089
1090 def test_fd_leak(self):
1091 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1092 # we both check that calling fwalk() a large number of times doesn't
1093 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1094 minfd = os.dup(1)
1095 os.close(minfd)
1096 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001097 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001098 pass
1099 newfd = os.dup(1)
1100 self.addCleanup(os.close, newfd)
1101 self.assertEqual(newfd, minfd)
1102
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001103class BytesWalkTests(WalkTests):
1104 """Tests for os.walk() with bytes."""
1105 def walk(self, top, **kwargs):
1106 if 'follow_symlinks' in kwargs:
1107 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1108 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1109 root = os.fsdecode(broot)
1110 dirs = list(map(os.fsdecode, bdirs))
1111 files = list(map(os.fsdecode, bfiles))
1112 yield (root, dirs, files)
1113 bdirs[:] = list(map(os.fsencode, dirs))
1114 bfiles[:] = list(map(os.fsencode, files))
1115
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001116@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1117class BytesFwalkTests(FwalkTests):
1118 """Tests for os.walk() with bytes."""
1119 def fwalk(self, top='.', *args, **kwargs):
1120 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1121 root = os.fsdecode(broot)
1122 dirs = list(map(os.fsdecode, bdirs))
1123 files = list(map(os.fsdecode, bfiles))
1124 yield (root, dirs, files, topfd)
1125 bdirs[:] = list(map(os.fsencode, dirs))
1126 bfiles[:] = list(map(os.fsencode, files))
1127
Charles-François Natali7372b062012-02-05 15:15:38 +01001128
Guido van Rossume7ba4952007-06-06 23:52:48 +00001129class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001130 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001131 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001132
1133 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001134 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001135 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1136 os.makedirs(path) # Should work
1137 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1138 os.makedirs(path)
1139
1140 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001141 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001142 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1143 os.makedirs(path)
1144 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1145 'dir5', 'dir6')
1146 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001147
Serhiy Storchakae304e332017-03-24 13:27:42 +02001148 def test_mode(self):
1149 with support.temp_umask(0o002):
1150 base = support.TESTFN
1151 parent = os.path.join(base, 'dir1')
1152 path = os.path.join(parent, 'dir2')
1153 os.makedirs(path, 0o555)
1154 self.assertTrue(os.path.exists(path))
1155 self.assertTrue(os.path.isdir(path))
1156 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001157 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1158 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001159
Terry Reedy5a22b652010-12-02 07:05:56 +00001160 def test_exist_ok_existing_directory(self):
1161 path = os.path.join(support.TESTFN, 'dir1')
1162 mode = 0o777
1163 old_mask = os.umask(0o022)
1164 os.makedirs(path, mode)
1165 self.assertRaises(OSError, os.makedirs, path, mode)
1166 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001167 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001168 os.makedirs(path, mode=mode, exist_ok=True)
1169 os.umask(old_mask)
1170
Martin Pantera82642f2015-11-19 04:48:44 +00001171 # Issue #25583: A drive root could raise PermissionError on Windows
1172 os.makedirs(os.path.abspath('/'), exist_ok=True)
1173
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001174 def test_exist_ok_s_isgid_directory(self):
1175 path = os.path.join(support.TESTFN, 'dir1')
1176 S_ISGID = stat.S_ISGID
1177 mode = 0o777
1178 old_mask = os.umask(0o022)
1179 try:
1180 existing_testfn_mode = stat.S_IMODE(
1181 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001182 try:
1183 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001184 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001185 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001186 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1187 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1188 # The os should apply S_ISGID from the parent dir for us, but
1189 # this test need not depend on that behavior. Be explicit.
1190 os.makedirs(path, mode | S_ISGID)
1191 # http://bugs.python.org/issue14992
1192 # Should not fail when the bit is already set.
1193 os.makedirs(path, mode, exist_ok=True)
1194 # remove the bit.
1195 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001196 # May work even when the bit is not already set when demanded.
1197 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001198 finally:
1199 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001200
1201 def test_exist_ok_existing_regular_file(self):
1202 base = support.TESTFN
1203 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001204 with open(path, 'w') as f:
1205 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001206 self.assertRaises(OSError, os.makedirs, path)
1207 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1208 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1209 os.remove(path)
1210
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001211 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001212 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001213 'dir4', 'dir5', 'dir6')
1214 # If the tests failed, the bottom-most directory ('../dir6')
1215 # may not have been created, so we look for the outermost directory
1216 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001217 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001218 path = os.path.dirname(path)
1219
1220 os.removedirs(path)
1221
Andrew Svetlov405faed2012-12-25 12:18:09 +02001222
R David Murrayf2ad1732014-12-25 18:36:56 -05001223@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1224class ChownFileTests(unittest.TestCase):
1225
Berker Peksag036a71b2015-07-21 09:29:48 +03001226 @classmethod
1227 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001228 os.mkdir(support.TESTFN)
1229
1230 def test_chown_uid_gid_arguments_must_be_index(self):
1231 stat = os.stat(support.TESTFN)
1232 uid = stat.st_uid
1233 gid = stat.st_gid
1234 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1235 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1236 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1237 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1238 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1239
1240 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1241 def test_chown(self):
1242 gid_1, gid_2 = groups[:2]
1243 uid = os.stat(support.TESTFN).st_uid
1244 os.chown(support.TESTFN, uid, gid_1)
1245 gid = os.stat(support.TESTFN).st_gid
1246 self.assertEqual(gid, gid_1)
1247 os.chown(support.TESTFN, uid, gid_2)
1248 gid = os.stat(support.TESTFN).st_gid
1249 self.assertEqual(gid, gid_2)
1250
1251 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1252 "test needs root privilege and more than one user")
1253 def test_chown_with_root(self):
1254 uid_1, uid_2 = all_users[:2]
1255 gid = os.stat(support.TESTFN).st_gid
1256 os.chown(support.TESTFN, uid_1, gid)
1257 uid = os.stat(support.TESTFN).st_uid
1258 self.assertEqual(uid, uid_1)
1259 os.chown(support.TESTFN, uid_2, gid)
1260 uid = os.stat(support.TESTFN).st_uid
1261 self.assertEqual(uid, uid_2)
1262
1263 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1264 "test needs non-root account and more than one user")
1265 def test_chown_without_permission(self):
1266 uid_1, uid_2 = all_users[:2]
1267 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001268 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001269 os.chown(support.TESTFN, uid_1, gid)
1270 os.chown(support.TESTFN, uid_2, gid)
1271
Berker Peksag036a71b2015-07-21 09:29:48 +03001272 @classmethod
1273 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001274 os.rmdir(support.TESTFN)
1275
1276
Andrew Svetlov405faed2012-12-25 12:18:09 +02001277class RemoveDirsTests(unittest.TestCase):
1278 def setUp(self):
1279 os.makedirs(support.TESTFN)
1280
1281 def tearDown(self):
1282 support.rmtree(support.TESTFN)
1283
1284 def test_remove_all(self):
1285 dira = os.path.join(support.TESTFN, 'dira')
1286 os.mkdir(dira)
1287 dirb = os.path.join(dira, 'dirb')
1288 os.mkdir(dirb)
1289 os.removedirs(dirb)
1290 self.assertFalse(os.path.exists(dirb))
1291 self.assertFalse(os.path.exists(dira))
1292 self.assertFalse(os.path.exists(support.TESTFN))
1293
1294 def test_remove_partial(self):
1295 dira = os.path.join(support.TESTFN, 'dira')
1296 os.mkdir(dira)
1297 dirb = os.path.join(dira, 'dirb')
1298 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001299 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001300 os.removedirs(dirb)
1301 self.assertFalse(os.path.exists(dirb))
1302 self.assertTrue(os.path.exists(dira))
1303 self.assertTrue(os.path.exists(support.TESTFN))
1304
1305 def test_remove_nothing(self):
1306 dira = os.path.join(support.TESTFN, 'dira')
1307 os.mkdir(dira)
1308 dirb = os.path.join(dira, 'dirb')
1309 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001310 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001311 with self.assertRaises(OSError):
1312 os.removedirs(dirb)
1313 self.assertTrue(os.path.exists(dirb))
1314 self.assertTrue(os.path.exists(dira))
1315 self.assertTrue(os.path.exists(support.TESTFN))
1316
1317
Guido van Rossume7ba4952007-06-06 23:52:48 +00001318class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001319 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001320 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001321 f.write(b'hello')
1322 f.close()
1323 with open(os.devnull, 'rb') as f:
1324 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001325
Andrew Svetlov405faed2012-12-25 12:18:09 +02001326
Guido van Rossume7ba4952007-06-06 23:52:48 +00001327class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001328 def test_urandom_length(self):
1329 self.assertEqual(len(os.urandom(0)), 0)
1330 self.assertEqual(len(os.urandom(1)), 1)
1331 self.assertEqual(len(os.urandom(10)), 10)
1332 self.assertEqual(len(os.urandom(100)), 100)
1333 self.assertEqual(len(os.urandom(1000)), 1000)
1334
1335 def test_urandom_value(self):
1336 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001337 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001338 data2 = os.urandom(16)
1339 self.assertNotEqual(data1, data2)
1340
1341 def get_urandom_subprocess(self, count):
1342 code = '\n'.join((
1343 'import os, sys',
1344 'data = os.urandom(%s)' % count,
1345 'sys.stdout.buffer.write(data)',
1346 'sys.stdout.buffer.flush()'))
1347 out = assert_python_ok('-c', code)
1348 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001349 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001350 return stdout
1351
1352 def test_urandom_subprocess(self):
1353 data1 = self.get_urandom_subprocess(16)
1354 data2 = self.get_urandom_subprocess(16)
1355 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001356
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001357
Victor Stinner9b1f4742016-09-06 16:18:52 -07001358@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1359class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001360 @classmethod
1361 def setUpClass(cls):
1362 try:
1363 os.getrandom(1)
1364 except OSError as exc:
1365 if exc.errno == errno.ENOSYS:
1366 # Python compiled on a more recent Linux version
1367 # than the current Linux kernel
1368 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1369 else:
1370 raise
1371
Victor Stinner9b1f4742016-09-06 16:18:52 -07001372 def test_getrandom_type(self):
1373 data = os.getrandom(16)
1374 self.assertIsInstance(data, bytes)
1375 self.assertEqual(len(data), 16)
1376
1377 def test_getrandom0(self):
1378 empty = os.getrandom(0)
1379 self.assertEqual(empty, b'')
1380
1381 def test_getrandom_random(self):
1382 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1383
1384 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1385 # resource /dev/random
1386
1387 def test_getrandom_nonblock(self):
1388 # The call must not fail. Check also that the flag exists
1389 try:
1390 os.getrandom(1, os.GRND_NONBLOCK)
1391 except BlockingIOError:
1392 # System urandom is not initialized yet
1393 pass
1394
1395 def test_getrandom_value(self):
1396 data1 = os.getrandom(16)
1397 data2 = os.getrandom(16)
1398 self.assertNotEqual(data1, data2)
1399
1400
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001401# os.urandom() doesn't use a file descriptor when it is implemented with the
1402# getentropy() function, the getrandom() function or the getrandom() syscall
1403OS_URANDOM_DONT_USE_FD = (
1404 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1405 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1406 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001407
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001408@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1409 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001410class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001411 @unittest.skipUnless(resource, "test requires the resource module")
1412 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001413 # Check urandom() failing when it is not able to open /dev/random.
1414 # We spawn a new process to make the test more robust (if getrlimit()
1415 # failed to restore the file descriptor limit after this, the whole
1416 # test suite would crash; this actually happened on the OS X Tiger
1417 # buildbot).
1418 code = """if 1:
1419 import errno
1420 import os
1421 import resource
1422
1423 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1424 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1425 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001426 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001427 except OSError as e:
1428 assert e.errno == errno.EMFILE, e.errno
1429 else:
1430 raise AssertionError("OSError not raised")
1431 """
1432 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001433
Antoine Pitroue472aea2014-04-26 14:33:03 +02001434 def test_urandom_fd_closed(self):
1435 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1436 # closed.
1437 code = """if 1:
1438 import os
1439 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001440 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001441 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001442 with test.support.SuppressCrashReport():
1443 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001444 sys.stdout.buffer.write(os.urandom(4))
1445 """
1446 rc, out, err = assert_python_ok('-Sc', code)
1447
1448 def test_urandom_fd_reopened(self):
1449 # Issue #21207: urandom() should detect its fd to /dev/urandom
1450 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001451 self.addCleanup(support.unlink, support.TESTFN)
1452 create_file(support.TESTFN, b"x" * 256)
1453
Antoine Pitroue472aea2014-04-26 14:33:03 +02001454 code = """if 1:
1455 import os
1456 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001457 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001458 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001459 with test.support.SuppressCrashReport():
1460 for fd in range(3, 256):
1461 try:
1462 os.close(fd)
1463 except OSError:
1464 pass
1465 else:
1466 # Found the urandom fd (XXX hopefully)
1467 break
1468 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001469 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001470 new_fd = f.fileno()
1471 # Issue #26935: posix allows new_fd and fd to be equal but
1472 # some libc implementations have dup2 return an error in this
1473 # case.
1474 if new_fd != fd:
1475 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001476 sys.stdout.buffer.write(os.urandom(4))
1477 sys.stdout.buffer.write(os.urandom(4))
1478 """.format(TESTFN=support.TESTFN)
1479 rc, out, err = assert_python_ok('-Sc', code)
1480 self.assertEqual(len(out), 8)
1481 self.assertNotEqual(out[0:4], out[4:8])
1482 rc, out2, err2 = assert_python_ok('-Sc', code)
1483 self.assertEqual(len(out2), 8)
1484 self.assertNotEqual(out2, out)
1485
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001486
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001487@contextlib.contextmanager
1488def _execvpe_mockup(defpath=None):
1489 """
1490 Stubs out execv and execve functions when used as context manager.
1491 Records exec calls. The mock execv and execve functions always raise an
1492 exception as they would normally never return.
1493 """
1494 # A list of tuples containing (function name, first arg, args)
1495 # of calls to execv or execve that have been made.
1496 calls = []
1497
1498 def mock_execv(name, *args):
1499 calls.append(('execv', name, args))
1500 raise RuntimeError("execv called")
1501
1502 def mock_execve(name, *args):
1503 calls.append(('execve', name, args))
1504 raise OSError(errno.ENOTDIR, "execve called")
1505
1506 try:
1507 orig_execv = os.execv
1508 orig_execve = os.execve
1509 orig_defpath = os.defpath
1510 os.execv = mock_execv
1511 os.execve = mock_execve
1512 if defpath is not None:
1513 os.defpath = defpath
1514 yield calls
1515 finally:
1516 os.execv = orig_execv
1517 os.execve = orig_execve
1518 os.defpath = orig_defpath
1519
Victor Stinner4659ccf2016-09-14 10:57:00 +02001520
Guido van Rossume7ba4952007-06-06 23:52:48 +00001521class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001522 @unittest.skipIf(USING_LINUXTHREADS,
1523 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001524 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001525 self.assertRaises(OSError, os.execvpe, 'no such app-',
1526 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001527
Steve Dowerbce26262016-11-19 19:17:26 -08001528 def test_execv_with_bad_arglist(self):
1529 self.assertRaises(ValueError, os.execv, 'notepad', ())
1530 self.assertRaises(ValueError, os.execv, 'notepad', [])
1531 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1532 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1533
Thomas Heller6790d602007-08-30 17:15:14 +00001534 def test_execvpe_with_bad_arglist(self):
1535 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001536 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1537 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001538
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001539 @unittest.skipUnless(hasattr(os, '_execvpe'),
1540 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001541 def _test_internal_execvpe(self, test_type):
1542 program_path = os.sep + 'absolutepath'
1543 if test_type is bytes:
1544 program = b'executable'
1545 fullpath = os.path.join(os.fsencode(program_path), program)
1546 native_fullpath = fullpath
1547 arguments = [b'progname', 'arg1', 'arg2']
1548 else:
1549 program = 'executable'
1550 arguments = ['progname', 'arg1', 'arg2']
1551 fullpath = os.path.join(program_path, program)
1552 if os.name != "nt":
1553 native_fullpath = os.fsencode(fullpath)
1554 else:
1555 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001556 env = {'spam': 'beans'}
1557
Victor Stinnerb745a742010-05-18 17:17:23 +00001558 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001559 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001560 self.assertRaises(RuntimeError,
1561 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001562 self.assertEqual(len(calls), 1)
1563 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1564
Victor Stinnerb745a742010-05-18 17:17:23 +00001565 # test os._execvpe() with a relative path:
1566 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001567 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001568 self.assertRaises(OSError,
1569 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001570 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001571 self.assertSequenceEqual(calls[0],
1572 ('execve', native_fullpath, (arguments, env)))
1573
1574 # test os._execvpe() with a relative path:
1575 # os.get_exec_path() reads the 'PATH' variable
1576 with _execvpe_mockup() as calls:
1577 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001578 if test_type is bytes:
1579 env_path[b'PATH'] = program_path
1580 else:
1581 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001582 self.assertRaises(OSError,
1583 os._execvpe, program, arguments, env=env_path)
1584 self.assertEqual(len(calls), 1)
1585 self.assertSequenceEqual(calls[0],
1586 ('execve', native_fullpath, (arguments, env_path)))
1587
1588 def test_internal_execvpe_str(self):
1589 self._test_internal_execvpe(str)
1590 if os.name != "nt":
1591 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001592
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001593 def test_execve_invalid_env(self):
1594 args = [sys.executable, '-c', 'pass']
1595
Ville Skyttä49b27342017-08-03 09:00:59 +03001596 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001597 newenv = os.environ.copy()
1598 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1599 with self.assertRaises(ValueError):
1600 os.execve(args[0], args, newenv)
1601
Ville Skyttä49b27342017-08-03 09:00:59 +03001602 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001603 newenv = os.environ.copy()
1604 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1605 with self.assertRaises(ValueError):
1606 os.execve(args[0], args, newenv)
1607
Ville Skyttä49b27342017-08-03 09:00:59 +03001608 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001609 newenv = os.environ.copy()
1610 newenv["FRUIT=ORANGE"] = "lemon"
1611 with self.assertRaises(ValueError):
1612 os.execve(args[0], args, newenv)
1613
Alexey Izbyshev83460312018-10-20 03:28:22 +03001614 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1615 def test_execve_with_empty_path(self):
1616 # bpo-32890: Check GetLastError() misuse
1617 try:
1618 os.execve('', ['arg'], {})
1619 except OSError as e:
1620 self.assertTrue(e.winerror is None or e.winerror != 0)
1621 else:
1622 self.fail('No OSError raised')
1623
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001624
Serhiy Storchaka43767632013-11-03 21:31:38 +02001625@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001626class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001627 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001628 try:
1629 os.stat(support.TESTFN)
1630 except FileNotFoundError:
1631 exists = False
1632 except OSError as exc:
1633 exists = True
1634 self.fail("file %s must not exist; os.stat failed with %s"
1635 % (support.TESTFN, exc))
1636 else:
1637 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001638
Thomas Wouters477c8d52006-05-27 19:21:47 +00001639 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001640 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001641
1642 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001643 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001644
1645 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001646 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001647
1648 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001649 self.addCleanup(support.unlink, support.TESTFN)
1650
Victor Stinnere77c9742016-03-25 10:28:23 +01001651 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001652 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001653
1654 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001655 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001656
Thomas Wouters477c8d52006-05-27 19:21:47 +00001657 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001658 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001659
Victor Stinnere77c9742016-03-25 10:28:23 +01001660
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001661class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001662 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001663 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1664 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001665 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666 def get_single(f):
1667 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001668 if hasattr(os, f):
1669 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001670 return helper
1671 for f in singles:
1672 locals()["test_"+f] = get_single(f)
1673
Benjamin Peterson7522c742009-01-19 21:00:09 +00001674 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001675 try:
1676 f(support.make_bad_fd(), *args)
1677 except OSError as e:
1678 self.assertEqual(e.errno, errno.EBADF)
1679 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001680 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001681 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001682
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001686
Serhiy Storchaka43767632013-11-03 21:31:38 +02001687 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001688 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001689 fd = support.make_bad_fd()
1690 # Make sure none of the descriptors we are about to close are
1691 # currently valid (issue 6542).
1692 for i in range(10):
1693 try: os.fstat(fd+i)
1694 except OSError:
1695 pass
1696 else:
1697 break
1698 if i < 2:
1699 raise unittest.SkipTest(
1700 "Unable to acquire a range of invalid file descriptors")
1701 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001702
Serhiy Storchaka43767632013-11-03 21:31:38 +02001703 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001704 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001705 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001706
Serhiy Storchaka43767632013-11-03 21:31:38 +02001707 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001708 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001709 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001710
Serhiy Storchaka43767632013-11-03 21:31:38 +02001711 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001712 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001713 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001714
Serhiy Storchaka43767632013-11-03 21:31:38 +02001715 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001716 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001717 self.check(os.pathconf, "PC_NAME_MAX")
1718 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001719
Serhiy Storchaka43767632013-11-03 21:31:38 +02001720 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001721 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722 self.check(os.truncate, 0)
1723 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001724
Serhiy Storchaka43767632013-11-03 21:31:38 +02001725 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001726 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001727 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001728
Serhiy Storchaka43767632013-11-03 21:31:38 +02001729 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001730 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001731 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001732
Victor Stinner57ddf782014-01-08 15:21:28 +01001733 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1734 def test_readv(self):
1735 buf = bytearray(10)
1736 self.check(os.readv, [buf])
1737
Serhiy Storchaka43767632013-11-03 21:31:38 +02001738 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001739 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001740 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001741
Serhiy Storchaka43767632013-11-03 21:31:38 +02001742 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001743 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001744 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001745
Victor Stinner57ddf782014-01-08 15:21:28 +01001746 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1747 def test_writev(self):
1748 self.check(os.writev, [b'abc'])
1749
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001750 def test_inheritable(self):
1751 self.check(os.get_inheritable)
1752 self.check(os.set_inheritable, True)
1753
1754 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1755 'needs os.get_blocking() and os.set_blocking()')
1756 def test_blocking(self):
1757 self.check(os.get_blocking)
1758 self.check(os.set_blocking, True)
1759
Brian Curtin1b9df392010-11-24 20:24:31 +00001760
1761class LinkTests(unittest.TestCase):
1762 def setUp(self):
1763 self.file1 = support.TESTFN
1764 self.file2 = os.path.join(support.TESTFN + "2")
1765
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001766 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001767 for file in (self.file1, self.file2):
1768 if os.path.exists(file):
1769 os.unlink(file)
1770
Brian Curtin1b9df392010-11-24 20:24:31 +00001771 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001772 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001773
xdegaye6a55d092017-11-12 17:57:04 +01001774 try:
1775 os.link(file1, file2)
1776 except PermissionError as e:
1777 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001778 with open(file1, "r") as f1, open(file2, "r") as f2:
1779 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1780
1781 def test_link(self):
1782 self._test_link(self.file1, self.file2)
1783
1784 def test_link_bytes(self):
1785 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1786 bytes(self.file2, sys.getfilesystemencoding()))
1787
Brian Curtinf498b752010-11-30 15:54:04 +00001788 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001789 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001790 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001791 except UnicodeError:
1792 raise unittest.SkipTest("Unable to encode for this platform.")
1793
Brian Curtinf498b752010-11-30 15:54:04 +00001794 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001795 self.file2 = self.file1 + "2"
1796 self._test_link(self.file1, self.file2)
1797
Serhiy Storchaka43767632013-11-03 21:31:38 +02001798@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1799class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001800 # uid_t and gid_t are 32-bit unsigned integers on Linux
1801 UID_OVERFLOW = (1 << 32)
1802 GID_OVERFLOW = (1 << 32)
1803
Serhiy Storchaka43767632013-11-03 21:31:38 +02001804 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1805 def test_setuid(self):
1806 if os.getuid() != 0:
1807 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001808 self.assertRaises(TypeError, os.setuid, 'not an int')
1809 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001810
Serhiy Storchaka43767632013-11-03 21:31:38 +02001811 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1812 def test_setgid(self):
1813 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1814 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001815 self.assertRaises(TypeError, os.setgid, 'not an int')
1816 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001817
Serhiy Storchaka43767632013-11-03 21:31:38 +02001818 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1819 def test_seteuid(self):
1820 if os.getuid() != 0:
1821 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001822 self.assertRaises(TypeError, os.setegid, 'not an int')
1823 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001824
Serhiy Storchaka43767632013-11-03 21:31:38 +02001825 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1826 def test_setegid(self):
1827 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1828 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001829 self.assertRaises(TypeError, os.setegid, 'not an int')
1830 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001831
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1833 def test_setreuid(self):
1834 if os.getuid() != 0:
1835 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001836 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1837 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1838 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1839 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001840
Serhiy Storchaka43767632013-11-03 21:31:38 +02001841 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1842 def test_setreuid_neg1(self):
1843 # Needs to accept -1. We run this in a subprocess to avoid
1844 # altering the test runner's process state (issue8045).
1845 subprocess.check_call([
1846 sys.executable, '-c',
1847 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001848
Serhiy Storchaka43767632013-11-03 21:31:38 +02001849 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1850 def test_setregid(self):
1851 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1852 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001853 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
1854 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
1855 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
1856 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001857
Serhiy Storchaka43767632013-11-03 21:31:38 +02001858 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1859 def test_setregid_neg1(self):
1860 # Needs to accept -1. We run this in a subprocess to avoid
1861 # altering the test runner's process state (issue8045).
1862 subprocess.check_call([
1863 sys.executable, '-c',
1864 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001865
Serhiy Storchaka43767632013-11-03 21:31:38 +02001866@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1867class Pep383Tests(unittest.TestCase):
1868 def setUp(self):
1869 if support.TESTFN_UNENCODABLE:
1870 self.dir = support.TESTFN_UNENCODABLE
1871 elif support.TESTFN_NONASCII:
1872 self.dir = support.TESTFN_NONASCII
1873 else:
1874 self.dir = support.TESTFN
1875 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001876
Serhiy Storchaka43767632013-11-03 21:31:38 +02001877 bytesfn = []
1878 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001879 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001880 fn = os.fsencode(fn)
1881 except UnicodeEncodeError:
1882 return
1883 bytesfn.append(fn)
1884 add_filename(support.TESTFN_UNICODE)
1885 if support.TESTFN_UNENCODABLE:
1886 add_filename(support.TESTFN_UNENCODABLE)
1887 if support.TESTFN_NONASCII:
1888 add_filename(support.TESTFN_NONASCII)
1889 if not bytesfn:
1890 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001891
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892 self.unicodefn = set()
1893 os.mkdir(self.dir)
1894 try:
1895 for fn in bytesfn:
1896 support.create_empty_file(os.path.join(self.bdir, fn))
1897 fn = os.fsdecode(fn)
1898 if fn in self.unicodefn:
1899 raise ValueError("duplicate filename")
1900 self.unicodefn.add(fn)
1901 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001902 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001903 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001904
Serhiy Storchaka43767632013-11-03 21:31:38 +02001905 def tearDown(self):
1906 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001907
Serhiy Storchaka43767632013-11-03 21:31:38 +02001908 def test_listdir(self):
1909 expected = self.unicodefn
1910 found = set(os.listdir(self.dir))
1911 self.assertEqual(found, expected)
1912 # test listdir without arguments
1913 current_directory = os.getcwd()
1914 try:
1915 os.chdir(os.sep)
1916 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1917 finally:
1918 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001919
Serhiy Storchaka43767632013-11-03 21:31:38 +02001920 def test_open(self):
1921 for fn in self.unicodefn:
1922 f = open(os.path.join(self.dir, fn), 'rb')
1923 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001924
Serhiy Storchaka43767632013-11-03 21:31:38 +02001925 @unittest.skipUnless(hasattr(os, 'statvfs'),
1926 "need os.statvfs()")
1927 def test_statvfs(self):
1928 # issue #9645
1929 for fn in self.unicodefn:
1930 # should not fail with file not found error
1931 fullname = os.path.join(self.dir, fn)
1932 os.statvfs(fullname)
1933
1934 def test_stat(self):
1935 for fn in self.unicodefn:
1936 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001937
Brian Curtineb24d742010-04-12 17:16:38 +00001938@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1939class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001940 def _kill(self, sig):
1941 # Start sys.executable as a subprocess and communicate from the
1942 # subprocess to the parent that the interpreter is ready. When it
1943 # becomes ready, send *sig* via os.kill to the subprocess and check
1944 # that the return code is equal to *sig*.
1945 import ctypes
1946 from ctypes import wintypes
1947 import msvcrt
1948
1949 # Since we can't access the contents of the process' stdout until the
1950 # process has exited, use PeekNamedPipe to see what's inside stdout
1951 # without waiting. This is done so we can tell that the interpreter
1952 # is started and running at a point where it could handle a signal.
1953 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1954 PeekNamedPipe.restype = wintypes.BOOL
1955 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1956 ctypes.POINTER(ctypes.c_char), # stdout buf
1957 wintypes.DWORD, # Buffer size
1958 ctypes.POINTER(wintypes.DWORD), # bytes read
1959 ctypes.POINTER(wintypes.DWORD), # bytes avail
1960 ctypes.POINTER(wintypes.DWORD)) # bytes left
1961 msg = "running"
1962 proc = subprocess.Popen([sys.executable, "-c",
1963 "import sys;"
1964 "sys.stdout.write('{}');"
1965 "sys.stdout.flush();"
1966 "input()".format(msg)],
1967 stdout=subprocess.PIPE,
1968 stderr=subprocess.PIPE,
1969 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001970 self.addCleanup(proc.stdout.close)
1971 self.addCleanup(proc.stderr.close)
1972 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001973
1974 count, max = 0, 100
1975 while count < max and proc.poll() is None:
1976 # Create a string buffer to store the result of stdout from the pipe
1977 buf = ctypes.create_string_buffer(len(msg))
1978 # Obtain the text currently in proc.stdout
1979 # Bytes read/avail/left are left as NULL and unused
1980 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1981 buf, ctypes.sizeof(buf), None, None, None)
1982 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1983 if buf.value:
1984 self.assertEqual(msg, buf.value.decode())
1985 break
1986 time.sleep(0.1)
1987 count += 1
1988 else:
1989 self.fail("Did not receive communication from the subprocess")
1990
Brian Curtineb24d742010-04-12 17:16:38 +00001991 os.kill(proc.pid, sig)
1992 self.assertEqual(proc.wait(), sig)
1993
1994 def test_kill_sigterm(self):
1995 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001996 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001997
1998 def test_kill_int(self):
1999 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002000 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002001
2002 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002003 tagname = "test_os_%s" % uuid.uuid1()
2004 m = mmap.mmap(-1, 1, tagname)
2005 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002006 # Run a script which has console control handling enabled.
2007 proc = subprocess.Popen([sys.executable,
2008 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002009 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002010 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2011 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002012 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002013 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002014 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002015 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002016 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002017 count += 1
2018 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002019 # Forcefully kill the process if we weren't able to signal it.
2020 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002021 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002022 os.kill(proc.pid, event)
2023 # proc.send_signal(event) could also be done here.
2024 # Allow time for the signal to be passed and the process to exit.
2025 time.sleep(0.5)
2026 if not proc.poll():
2027 # Forcefully kill the process if we weren't able to signal it.
2028 os.kill(proc.pid, signal.SIGINT)
2029 self.fail("subprocess did not stop on {}".format(name))
2030
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002031 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002032 def test_CTRL_C_EVENT(self):
2033 from ctypes import wintypes
2034 import ctypes
2035
2036 # Make a NULL value by creating a pointer with no argument.
2037 NULL = ctypes.POINTER(ctypes.c_int)()
2038 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2039 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2040 wintypes.BOOL)
2041 SetConsoleCtrlHandler.restype = wintypes.BOOL
2042
2043 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002044 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002045 # by subprocesses.
2046 SetConsoleCtrlHandler(NULL, 0)
2047
2048 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2049
2050 def test_CTRL_BREAK_EVENT(self):
2051 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2052
2053
Brian Curtind40e6f72010-07-08 21:39:08 +00002054@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002055class Win32ListdirTests(unittest.TestCase):
2056 """Test listdir on Windows."""
2057
2058 def setUp(self):
2059 self.created_paths = []
2060 for i in range(2):
2061 dir_name = 'SUB%d' % i
2062 dir_path = os.path.join(support.TESTFN, dir_name)
2063 file_name = 'FILE%d' % i
2064 file_path = os.path.join(support.TESTFN, file_name)
2065 os.makedirs(dir_path)
2066 with open(file_path, 'w') as f:
2067 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2068 self.created_paths.extend([dir_name, file_name])
2069 self.created_paths.sort()
2070
2071 def tearDown(self):
2072 shutil.rmtree(support.TESTFN)
2073
2074 def test_listdir_no_extended_path(self):
2075 """Test when the path is not an "extended" path."""
2076 # unicode
2077 self.assertEqual(
2078 sorted(os.listdir(support.TESTFN)),
2079 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002080
Tim Golden781bbeb2013-10-25 20:24:06 +01002081 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002082 self.assertEqual(
2083 sorted(os.listdir(os.fsencode(support.TESTFN))),
2084 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002085
2086 def test_listdir_extended_path(self):
2087 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002088 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002089 # unicode
2090 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2091 self.assertEqual(
2092 sorted(os.listdir(path)),
2093 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002094
Tim Golden781bbeb2013-10-25 20:24:06 +01002095 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002096 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2097 self.assertEqual(
2098 sorted(os.listdir(path)),
2099 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002100
2101
Berker Peksage0b5b202018-08-15 13:03:41 +03002102@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2103class ReadlinkTests(unittest.TestCase):
2104 filelink = 'readlinktest'
2105 filelink_target = os.path.abspath(__file__)
2106 filelinkb = os.fsencode(filelink)
2107 filelinkb_target = os.fsencode(filelink_target)
2108
2109 def setUp(self):
2110 self.assertTrue(os.path.exists(self.filelink_target))
2111 self.assertTrue(os.path.exists(self.filelinkb_target))
2112 self.assertFalse(os.path.exists(self.filelink))
2113 self.assertFalse(os.path.exists(self.filelinkb))
2114
2115 def test_not_symlink(self):
2116 filelink_target = FakePath(self.filelink_target)
2117 self.assertRaises(OSError, os.readlink, self.filelink_target)
2118 self.assertRaises(OSError, os.readlink, filelink_target)
2119
2120 def test_missing_link(self):
2121 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2122 self.assertRaises(FileNotFoundError, os.readlink,
2123 FakePath('missing-link'))
2124
2125 @support.skip_unless_symlink
2126 def test_pathlike(self):
2127 os.symlink(self.filelink_target, self.filelink)
2128 self.addCleanup(support.unlink, self.filelink)
2129 filelink = FakePath(self.filelink)
2130 self.assertEqual(os.readlink(filelink), self.filelink_target)
2131
2132 @support.skip_unless_symlink
2133 def test_pathlike_bytes(self):
2134 os.symlink(self.filelinkb_target, self.filelinkb)
2135 self.addCleanup(support.unlink, self.filelinkb)
2136 path = os.readlink(FakePath(self.filelinkb))
2137 self.assertEqual(path, self.filelinkb_target)
2138 self.assertIsInstance(path, bytes)
2139
2140 @support.skip_unless_symlink
2141 def test_bytes(self):
2142 os.symlink(self.filelinkb_target, self.filelinkb)
2143 self.addCleanup(support.unlink, self.filelinkb)
2144 path = os.readlink(self.filelinkb)
2145 self.assertEqual(path, self.filelinkb_target)
2146 self.assertIsInstance(path, bytes)
2147
2148
Tim Golden781bbeb2013-10-25 20:24:06 +01002149@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002150@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002151class Win32SymlinkTests(unittest.TestCase):
2152 filelink = 'filelinktest'
2153 filelink_target = os.path.abspath(__file__)
2154 dirlink = 'dirlinktest'
2155 dirlink_target = os.path.dirname(filelink_target)
2156 missing_link = 'missing link'
2157
2158 def setUp(self):
2159 assert os.path.exists(self.dirlink_target)
2160 assert os.path.exists(self.filelink_target)
2161 assert not os.path.exists(self.dirlink)
2162 assert not os.path.exists(self.filelink)
2163 assert not os.path.exists(self.missing_link)
2164
2165 def tearDown(self):
2166 if os.path.exists(self.filelink):
2167 os.remove(self.filelink)
2168 if os.path.exists(self.dirlink):
2169 os.rmdir(self.dirlink)
2170 if os.path.lexists(self.missing_link):
2171 os.remove(self.missing_link)
2172
2173 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002174 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002175 self.assertTrue(os.path.exists(self.dirlink))
2176 self.assertTrue(os.path.isdir(self.dirlink))
2177 self.assertTrue(os.path.islink(self.dirlink))
2178 self.check_stat(self.dirlink, self.dirlink_target)
2179
2180 def test_file_link(self):
2181 os.symlink(self.filelink_target, self.filelink)
2182 self.assertTrue(os.path.exists(self.filelink))
2183 self.assertTrue(os.path.isfile(self.filelink))
2184 self.assertTrue(os.path.islink(self.filelink))
2185 self.check_stat(self.filelink, self.filelink_target)
2186
2187 def _create_missing_dir_link(self):
2188 'Create a "directory" link to a non-existent target'
2189 linkname = self.missing_link
2190 if os.path.lexists(linkname):
2191 os.remove(linkname)
2192 target = r'c:\\target does not exist.29r3c740'
2193 assert not os.path.exists(target)
2194 target_is_dir = True
2195 os.symlink(target, linkname, target_is_dir)
2196
2197 def test_remove_directory_link_to_missing_target(self):
2198 self._create_missing_dir_link()
2199 # For compatibility with Unix, os.remove will check the
2200 # directory status and call RemoveDirectory if the symlink
2201 # was created with target_is_dir==True.
2202 os.remove(self.missing_link)
2203
2204 @unittest.skip("currently fails; consider for improvement")
2205 def test_isdir_on_directory_link_to_missing_target(self):
2206 self._create_missing_dir_link()
2207 # consider having isdir return true for directory links
2208 self.assertTrue(os.path.isdir(self.missing_link))
2209
2210 @unittest.skip("currently fails; consider for improvement")
2211 def test_rmdir_on_directory_link_to_missing_target(self):
2212 self._create_missing_dir_link()
2213 # consider allowing rmdir to remove directory links
2214 os.rmdir(self.missing_link)
2215
2216 def check_stat(self, link, target):
2217 self.assertEqual(os.stat(link), os.stat(target))
2218 self.assertNotEqual(os.lstat(link), os.stat(link))
2219
Brian Curtind25aef52011-06-13 15:16:04 -05002220 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002221 self.assertEqual(os.stat(bytes_link), os.stat(target))
2222 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002223
2224 def test_12084(self):
2225 level1 = os.path.abspath(support.TESTFN)
2226 level2 = os.path.join(level1, "level2")
2227 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002228 self.addCleanup(support.rmtree, level1)
2229
2230 os.mkdir(level1)
2231 os.mkdir(level2)
2232 os.mkdir(level3)
2233
2234 file1 = os.path.abspath(os.path.join(level1, "file1"))
2235 create_file(file1)
2236
2237 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002238 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002239 os.chdir(level2)
2240 link = os.path.join(level2, "link")
2241 os.symlink(os.path.relpath(file1), "link")
2242 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002243
Victor Stinnerae39d232016-03-24 17:12:55 +01002244 # Check os.stat calls from the same dir as the link
2245 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002246
Victor Stinnerae39d232016-03-24 17:12:55 +01002247 # Check os.stat calls from a dir below the link
2248 os.chdir(level1)
2249 self.assertEqual(os.stat(file1),
2250 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002251
Victor Stinnerae39d232016-03-24 17:12:55 +01002252 # Check os.stat calls from a dir above the link
2253 os.chdir(level3)
2254 self.assertEqual(os.stat(file1),
2255 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002256 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002257 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002258
SSE43c34aad2018-02-13 00:10:35 +07002259 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2260 and os.path.exists(r'C:\ProgramData'),
2261 'Test directories not found')
2262 def test_29248(self):
2263 # os.symlink() calls CreateSymbolicLink, which creates
2264 # the reparse data buffer with the print name stored
2265 # first, so the offset is always 0. CreateSymbolicLink
2266 # stores the "PrintName" DOS path (e.g. "C:\") first,
2267 # with an offset of 0, followed by the "SubstituteName"
2268 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2269 # the other hand, seems to have been created manually
2270 # with an inverted order.
2271 target = os.readlink(r'C:\Users\All Users')
2272 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2273
Steve Dower6921e732018-03-05 14:26:08 -08002274 def test_buffer_overflow(self):
2275 # Older versions would have a buffer overflow when detecting
2276 # whether a link source was a directory. This test ensures we
2277 # no longer crash, but does not otherwise validate the behavior
2278 segment = 'X' * 27
2279 path = os.path.join(*[segment] * 10)
2280 test_cases = [
2281 # overflow with absolute src
2282 ('\\' + path, segment),
2283 # overflow dest with relative src
2284 (segment, path),
2285 # overflow when joining src
2286 (path[:180], path[:180]),
2287 ]
2288 for src, dest in test_cases:
2289 try:
2290 os.symlink(src, dest)
2291 except FileNotFoundError:
2292 pass
2293 else:
2294 try:
2295 os.remove(dest)
2296 except OSError:
2297 pass
2298 # Also test with bytes, since that is a separate code path.
2299 try:
2300 os.symlink(os.fsencode(src), os.fsencode(dest))
2301 except FileNotFoundError:
2302 pass
2303 else:
2304 try:
2305 os.remove(dest)
2306 except OSError:
2307 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002308
Tim Golden0321cf22014-05-05 19:46:17 +01002309@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2310class Win32JunctionTests(unittest.TestCase):
2311 junction = 'junctiontest'
2312 junction_target = os.path.dirname(os.path.abspath(__file__))
2313
2314 def setUp(self):
2315 assert os.path.exists(self.junction_target)
2316 assert not os.path.exists(self.junction)
2317
2318 def tearDown(self):
2319 if os.path.exists(self.junction):
2320 # os.rmdir delegates to Windows' RemoveDirectoryW,
2321 # which removes junction points safely.
2322 os.rmdir(self.junction)
2323
2324 def test_create_junction(self):
2325 _winapi.CreateJunction(self.junction_target, self.junction)
2326 self.assertTrue(os.path.exists(self.junction))
2327 self.assertTrue(os.path.isdir(self.junction))
2328
2329 # Junctions are not recognized as links.
2330 self.assertFalse(os.path.islink(self.junction))
2331
2332 def test_unlink_removes_junction(self):
2333 _winapi.CreateJunction(self.junction_target, self.junction)
2334 self.assertTrue(os.path.exists(self.junction))
2335
2336 os.unlink(self.junction)
2337 self.assertFalse(os.path.exists(self.junction))
2338
Mark Becwarb82bfac2019-02-02 16:08:23 -05002339@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2340class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002341 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002342 nt = support.import_module('nt')
2343 ctypes = support.import_module('ctypes')
2344 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002345
2346 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2347 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2348
2349 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2350 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2351 ctypes.wintypes.LPDWORD)
2352
2353 # This is a pseudo-handle that doesn't need to be closed
2354 hproc = kernel.GetCurrentProcess()
2355
2356 handle_count = ctypes.wintypes.DWORD()
2357 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2358 self.assertEqual(1, ok)
2359
2360 before_count = handle_count.value
2361
2362 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002363 filenames = [
2364 r'\\?\C:',
2365 r'\\?\NUL',
2366 r'\\?\CONIN',
2367 __file__,
2368 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002369
Berker Peksag6ef726a2019-04-22 18:46:28 +03002370 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002371 for name in filenames:
2372 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002373 nt._getfinalpathname(name)
2374 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002375 # Failure is expected
2376 pass
2377 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002378 os.stat(name)
2379 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002380 pass
2381
2382 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2383 self.assertEqual(1, ok)
2384
2385 handle_delta = handle_count.value - before_count
2386
2387 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002388
Jason R. Coombs3a092862013-05-27 23:21:28 -04002389@support.skip_unless_symlink
2390class NonLocalSymlinkTests(unittest.TestCase):
2391
2392 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002393 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002394 Create this structure:
2395
2396 base
2397 \___ some_dir
2398 """
2399 os.makedirs('base/some_dir')
2400
2401 def tearDown(self):
2402 shutil.rmtree('base')
2403
2404 def test_directory_link_nonlocal(self):
2405 """
2406 The symlink target should resolve relative to the link, not relative
2407 to the current directory.
2408
2409 Then, link base/some_link -> base/some_dir and ensure that some_link
2410 is resolved as a directory.
2411
2412 In issue13772, it was discovered that directory detection failed if
2413 the symlink target was not specified relative to the current
2414 directory, which was a defect in the implementation.
2415 """
2416 src = os.path.join('base', 'some_link')
2417 os.symlink('some_dir', src)
2418 assert os.path.isdir(src)
2419
2420
Victor Stinnere8d51452010-08-19 01:05:19 +00002421class FSEncodingTests(unittest.TestCase):
2422 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002423 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2424 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002425
Victor Stinnere8d51452010-08-19 01:05:19 +00002426 def test_identity(self):
2427 # assert fsdecode(fsencode(x)) == x
2428 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2429 try:
2430 bytesfn = os.fsencode(fn)
2431 except UnicodeEncodeError:
2432 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002433 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002434
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002435
Brett Cannonefb00c02012-02-29 18:31:31 -05002436
2437class DeviceEncodingTests(unittest.TestCase):
2438
2439 def test_bad_fd(self):
2440 # Return None when an fd doesn't actually exist.
2441 self.assertIsNone(os.device_encoding(123456))
2442
Paul Monson62dfd7d2019-04-25 11:36:45 -07002443 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002444 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002445 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002446 def test_device_encoding(self):
2447 encoding = os.device_encoding(0)
2448 self.assertIsNotNone(encoding)
2449 self.assertTrue(codecs.lookup(encoding))
2450
2451
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002452class PidTests(unittest.TestCase):
2453 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2454 def test_getppid(self):
2455 p = subprocess.Popen([sys.executable, '-c',
2456 'import os; print(os.getppid())'],
2457 stdout=subprocess.PIPE)
2458 stdout, _ = p.communicate()
2459 # We are the parent of our subprocess
2460 self.assertEqual(int(stdout), os.getpid())
2461
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002462 def test_waitpid(self):
2463 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002464 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002465 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002466 status = os.waitpid(pid, 0)
2467 self.assertEqual(status, (pid, 0))
2468
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002469
Victor Stinner4659ccf2016-09-14 10:57:00 +02002470class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002471 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002472 self.exitcode = 17
2473
2474 filename = support.TESTFN
2475 self.addCleanup(support.unlink, filename)
2476
2477 if not with_env:
2478 code = 'import sys; sys.exit(%s)' % self.exitcode
2479 else:
2480 self.env = dict(os.environ)
2481 # create an unique key
2482 self.key = str(uuid.uuid4())
2483 self.env[self.key] = self.key
2484 # read the variable from os.environ to check that it exists
2485 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2486 % (self.key, self.exitcode))
2487
2488 with open(filename, "w") as fp:
2489 fp.write(code)
2490
Berker Peksag81816462016-09-15 20:19:47 +03002491 args = [sys.executable, filename]
2492 if use_bytes:
2493 args = [os.fsencode(a) for a in args]
2494 self.env = {os.fsencode(k): os.fsencode(v)
2495 for k, v in self.env.items()}
2496
2497 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002498
Berker Peksag4af23d72016-09-15 20:32:44 +03002499 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002500 def test_spawnl(self):
2501 args = self.create_args()
2502 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2503 self.assertEqual(exitcode, self.exitcode)
2504
Berker Peksag4af23d72016-09-15 20:32:44 +03002505 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002506 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002507 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002508 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2509 self.assertEqual(exitcode, self.exitcode)
2510
Berker Peksag4af23d72016-09-15 20:32:44 +03002511 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002512 def test_spawnlp(self):
2513 args = self.create_args()
2514 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2515 self.assertEqual(exitcode, self.exitcode)
2516
Berker Peksag4af23d72016-09-15 20:32:44 +03002517 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002518 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002519 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002520 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2521 self.assertEqual(exitcode, self.exitcode)
2522
Berker Peksag4af23d72016-09-15 20:32:44 +03002523 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002524 def test_spawnv(self):
2525 args = self.create_args()
2526 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2527 self.assertEqual(exitcode, self.exitcode)
2528
Berker Peksag4af23d72016-09-15 20:32:44 +03002529 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002530 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002531 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002532 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2533 self.assertEqual(exitcode, self.exitcode)
2534
Berker Peksag4af23d72016-09-15 20:32:44 +03002535 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002536 def test_spawnvp(self):
2537 args = self.create_args()
2538 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2539 self.assertEqual(exitcode, self.exitcode)
2540
Berker Peksag4af23d72016-09-15 20:32:44 +03002541 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002542 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002543 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002544 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2545 self.assertEqual(exitcode, self.exitcode)
2546
Berker Peksag4af23d72016-09-15 20:32:44 +03002547 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002548 def test_nowait(self):
2549 args = self.create_args()
2550 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2551 result = os.waitpid(pid, 0)
2552 self.assertEqual(result[0], pid)
2553 status = result[1]
2554 if hasattr(os, 'WIFEXITED'):
2555 self.assertTrue(os.WIFEXITED(status))
2556 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2557 else:
2558 self.assertEqual(status, self.exitcode << 8)
2559
Berker Peksag4af23d72016-09-15 20:32:44 +03002560 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002561 def test_spawnve_bytes(self):
2562 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2563 args = self.create_args(with_env=True, use_bytes=True)
2564 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2565 self.assertEqual(exitcode, self.exitcode)
2566
Steve Dower859fd7b2016-11-19 18:53:19 -08002567 @requires_os_func('spawnl')
2568 def test_spawnl_noargs(self):
2569 args = self.create_args()
2570 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002571 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002572
2573 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002574 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002575 args = self.create_args()
2576 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002577 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002578
2579 @requires_os_func('spawnv')
2580 def test_spawnv_noargs(self):
2581 args = self.create_args()
2582 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2583 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002584 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2585 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002586
2587 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002588 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002589 args = self.create_args()
2590 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2591 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002592 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2593 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002594
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002595 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002596 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002597
Ville Skyttä49b27342017-08-03 09:00:59 +03002598 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002599 newenv = os.environ.copy()
2600 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2601 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002602 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002603 except ValueError:
2604 pass
2605 else:
2606 self.assertEqual(exitcode, 127)
2607
Ville Skyttä49b27342017-08-03 09:00:59 +03002608 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002609 newenv = os.environ.copy()
2610 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2611 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002612 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002613 except ValueError:
2614 pass
2615 else:
2616 self.assertEqual(exitcode, 127)
2617
Ville Skyttä49b27342017-08-03 09:00:59 +03002618 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002619 newenv = os.environ.copy()
2620 newenv["FRUIT=ORANGE"] = "lemon"
2621 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002622 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002623 except ValueError:
2624 pass
2625 else:
2626 self.assertEqual(exitcode, 127)
2627
Ville Skyttä49b27342017-08-03 09:00:59 +03002628 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002629 filename = support.TESTFN
2630 self.addCleanup(support.unlink, filename)
2631 with open(filename, "w") as fp:
2632 fp.write('import sys, os\n'
2633 'if os.getenv("FRUIT") != "orange=lemon":\n'
2634 ' raise AssertionError')
2635 args = [sys.executable, filename]
2636 newenv = os.environ.copy()
2637 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002638 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002639 self.assertEqual(exitcode, 0)
2640
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002641 @requires_os_func('spawnve')
2642 def test_spawnve_invalid_env(self):
2643 self._test_invalid_env(os.spawnve)
2644
2645 @requires_os_func('spawnvpe')
2646 def test_spawnvpe_invalid_env(self):
2647 self._test_invalid_env(os.spawnvpe)
2648
Serhiy Storchaka77703942017-06-25 07:33:01 +03002649
Brian Curtin0151b8e2010-09-24 13:43:43 +00002650# The introduction of this TestCase caused at least two different errors on
2651# *nix buildbots. Temporarily skip this to let the buildbots move along.
2652@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002653@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2654class LoginTests(unittest.TestCase):
2655 def test_getlogin(self):
2656 user_name = os.getlogin()
2657 self.assertNotEqual(len(user_name), 0)
2658
2659
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002660@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2661 "needs os.getpriority and os.setpriority")
2662class ProgramPriorityTests(unittest.TestCase):
2663 """Tests for os.getpriority() and os.setpriority()."""
2664
2665 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002666
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002667 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2668 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2669 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002670 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2671 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002672 raise unittest.SkipTest("unable to reliably test setpriority "
2673 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002674 else:
2675 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002676 finally:
2677 try:
2678 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2679 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002680 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002681 raise
2682
2683
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002684class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002685
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002686 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002687
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002688 def __init__(self, conn):
2689 asynchat.async_chat.__init__(self, conn)
2690 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002691 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002692 self.closed = False
2693 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002694
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002695 def handle_read(self):
2696 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002697 if self.accumulate:
2698 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002699
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002700 def get_data(self):
2701 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002702
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002703 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002704 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002706
2707 def handle_error(self):
2708 raise
2709
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002710 def __init__(self, address):
2711 threading.Thread.__init__(self)
2712 asyncore.dispatcher.__init__(self)
2713 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2714 self.bind(address)
2715 self.listen(5)
2716 self.host, self.port = self.socket.getsockname()[:2]
2717 self.handler_instance = None
2718 self._active = False
2719 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002720
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002721 # --- public API
2722
2723 @property
2724 def running(self):
2725 return self._active
2726
2727 def start(self):
2728 assert not self.running
2729 self.__flag = threading.Event()
2730 threading.Thread.start(self)
2731 self.__flag.wait()
2732
2733 def stop(self):
2734 assert self.running
2735 self._active = False
2736 self.join()
2737
2738 def wait(self):
2739 # wait for handler connection to be closed, then stop the server
2740 while not getattr(self.handler_instance, "closed", False):
2741 time.sleep(0.001)
2742 self.stop()
2743
2744 # --- internals
2745
2746 def run(self):
2747 self._active = True
2748 self.__flag.set()
2749 while self._active and asyncore.socket_map:
2750 self._active_lock.acquire()
2751 asyncore.loop(timeout=0.001, count=1)
2752 self._active_lock.release()
2753 asyncore.close_all()
2754
2755 def handle_accept(self):
2756 conn, addr = self.accept()
2757 self.handler_instance = self.Handler(conn)
2758
2759 def handle_connect(self):
2760 self.close()
2761 handle_read = handle_connect
2762
2763 def writable(self):
2764 return 0
2765
2766 def handle_error(self):
2767 raise
2768
2769
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002770@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2771class TestSendfile(unittest.TestCase):
2772
Victor Stinner8c663fd2017-11-08 14:44:44 -08002773 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002774 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002775 not sys.platform.startswith("solaris") and \
2776 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002777 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2778 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002779 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2780 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002781
2782 @classmethod
2783 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002784 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002785 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002786
2787 @classmethod
2788 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002789 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002790 support.unlink(support.TESTFN)
2791
2792 def setUp(self):
2793 self.server = SendfileTestServer((support.HOST, 0))
2794 self.server.start()
2795 self.client = socket.socket()
2796 self.client.connect((self.server.host, self.server.port))
2797 self.client.settimeout(1)
2798 # synchronize by waiting for "220 ready" response
2799 self.client.recv(1024)
2800 self.sockno = self.client.fileno()
2801 self.file = open(support.TESTFN, 'rb')
2802 self.fileno = self.file.fileno()
2803
2804 def tearDown(self):
2805 self.file.close()
2806 self.client.close()
2807 if self.server.running:
2808 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002809 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002810
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002811 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002812 """A higher level wrapper representing how an application is
2813 supposed to use sendfile().
2814 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002815 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002816 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002817 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002818 except OSError as err:
2819 if err.errno == errno.ECONNRESET:
2820 # disconnected
2821 raise
2822 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2823 # we have to retry send data
2824 continue
2825 else:
2826 raise
2827
2828 def test_send_whole_file(self):
2829 # normal send
2830 total_sent = 0
2831 offset = 0
2832 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002833 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002834 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2835 if sent == 0:
2836 break
2837 offset += sent
2838 total_sent += sent
2839 self.assertTrue(sent <= nbytes)
2840 self.assertEqual(offset, total_sent)
2841
2842 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002843 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002844 self.client.close()
2845 self.server.wait()
2846 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002847 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002848 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002849
2850 def test_send_at_certain_offset(self):
2851 # start sending a file at a certain offset
2852 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002853 offset = len(self.DATA) // 2
2854 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002855 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002856 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002857 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2858 if sent == 0:
2859 break
2860 offset += sent
2861 total_sent += sent
2862 self.assertTrue(sent <= nbytes)
2863
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002864 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002865 self.client.close()
2866 self.server.wait()
2867 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002868 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002869 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002870 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002871 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002872
2873 def test_offset_overflow(self):
2874 # specify an offset > file size
2875 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002876 try:
2877 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2878 except OSError as e:
2879 # Solaris can raise EINVAL if offset >= file length, ignore.
2880 if e.errno != errno.EINVAL:
2881 raise
2882 else:
2883 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002884 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002885 self.client.close()
2886 self.server.wait()
2887 data = self.server.handler_instance.get_data()
2888 self.assertEqual(data, b'')
2889
2890 def test_invalid_offset(self):
2891 with self.assertRaises(OSError) as cm:
2892 os.sendfile(self.sockno, self.fileno, -1, 4096)
2893 self.assertEqual(cm.exception.errno, errno.EINVAL)
2894
Martin Panterbf19d162015-09-09 01:01:13 +00002895 def test_keywords(self):
2896 # Keyword arguments should be supported
2897 os.sendfile(out=self.sockno, offset=0, count=4096,
2898 **{'in': self.fileno})
2899 if self.SUPPORT_HEADERS_TRAILERS:
2900 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002901 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002902
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002903 # --- headers / trailers tests
2904
Serhiy Storchaka43767632013-11-03 21:31:38 +02002905 @requires_headers_trailers
2906 def test_headers(self):
2907 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002908 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002909 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002910 headers=[b"x" * 512, b"y" * 256])
2911 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002912 total_sent += sent
2913 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002914 while total_sent < len(expected_data):
2915 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002916 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2917 offset, nbytes)
2918 if sent == 0:
2919 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002920 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002921 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002922 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002923
Serhiy Storchaka43767632013-11-03 21:31:38 +02002924 self.assertEqual(total_sent, len(expected_data))
2925 self.client.close()
2926 self.server.wait()
2927 data = self.server.handler_instance.get_data()
2928 self.assertEqual(hash(data), hash(expected_data))
2929
2930 @requires_headers_trailers
2931 def test_trailers(self):
2932 TESTFN2 = support.TESTFN + "2"
2933 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002934
2935 self.addCleanup(support.unlink, TESTFN2)
2936 create_file(TESTFN2, file_data)
2937
2938 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002939 os.sendfile(self.sockno, f.fileno(), 0, 5,
2940 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002941 self.client.close()
2942 self.server.wait()
2943 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002944 self.assertEqual(data, b"abcde123456789")
2945
2946 @requires_headers_trailers
2947 @requires_32b
2948 def test_headers_overflow_32bits(self):
2949 self.server.handler_instance.accumulate = False
2950 with self.assertRaises(OSError) as cm:
2951 os.sendfile(self.sockno, self.fileno, 0, 0,
2952 headers=[b"x" * 2**16] * 2**15)
2953 self.assertEqual(cm.exception.errno, errno.EINVAL)
2954
2955 @requires_headers_trailers
2956 @requires_32b
2957 def test_trailers_overflow_32bits(self):
2958 self.server.handler_instance.accumulate = False
2959 with self.assertRaises(OSError) as cm:
2960 os.sendfile(self.sockno, self.fileno, 0, 0,
2961 trailers=[b"x" * 2**16] * 2**15)
2962 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002963
Serhiy Storchaka43767632013-11-03 21:31:38 +02002964 @requires_headers_trailers
2965 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2966 'test needs os.SF_NODISKIO')
2967 def test_flags(self):
2968 try:
2969 os.sendfile(self.sockno, self.fileno, 0, 4096,
2970 flags=os.SF_NODISKIO)
2971 except OSError as err:
2972 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2973 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002974
2975
Larry Hastings9cf065c2012-06-22 16:30:09 -07002976def supports_extended_attributes():
2977 if not hasattr(os, "setxattr"):
2978 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002979
Larry Hastings9cf065c2012-06-22 16:30:09 -07002980 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002981 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002982 try:
2983 os.setxattr(fp.fileno(), b"user.test", b"")
2984 except OSError:
2985 return False
2986 finally:
2987 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002988
2989 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002990
2991
2992@unittest.skipUnless(supports_extended_attributes(),
2993 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002994# Kernels < 2.6.39 don't respect setxattr flags.
2995@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002996class ExtendedAttributeTests(unittest.TestCase):
2997
Larry Hastings9cf065c2012-06-22 16:30:09 -07002998 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002999 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003000 self.addCleanup(support.unlink, fn)
3001 create_file(fn)
3002
Benjamin Peterson799bd802011-08-31 22:15:17 -04003003 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003004 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003005 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003006
Victor Stinnerf12e5062011-10-16 22:12:03 +02003007 init_xattr = listxattr(fn)
3008 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003009
Larry Hastings9cf065c2012-06-22 16:30:09 -07003010 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003011 xattr = set(init_xattr)
3012 xattr.add("user.test")
3013 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003014 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3015 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3016 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003017
Benjamin Peterson799bd802011-08-31 22:15:17 -04003018 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003019 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003020 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003021
Benjamin Peterson799bd802011-08-31 22:15:17 -04003022 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003023 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003024 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003025
Larry Hastings9cf065c2012-06-22 16:30:09 -07003026 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003027 xattr.add("user.test2")
3028 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003029 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003030
Benjamin Peterson799bd802011-08-31 22:15:17 -04003031 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003032 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003033 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003034
Victor Stinnerf12e5062011-10-16 22:12:03 +02003035 xattr.remove("user.test")
3036 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003037 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3038 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3039 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3040 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003041 many = sorted("user.test{}".format(i) for i in range(100))
3042 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003043 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003044 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003045
Larry Hastings9cf065c2012-06-22 16:30:09 -07003046 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003047 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003048 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003049
3050 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3051 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003052
3053 def test_simple(self):
3054 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3055 os.listxattr)
3056
3057 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003058 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3059 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003060
3061 def test_fds(self):
3062 def getxattr(path, *args):
3063 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003064 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003065 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003066 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003067 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003068 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003069 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003070 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003071 def listxattr(path, *args):
3072 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003073 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003074 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3075
3076
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003077@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3078class TermsizeTests(unittest.TestCase):
3079 def test_does_not_crash(self):
3080 """Check if get_terminal_size() returns a meaningful value.
3081
3082 There's no easy portable way to actually check the size of the
3083 terminal, so let's check if it returns something sensible instead.
3084 """
3085 try:
3086 size = os.get_terminal_size()
3087 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003088 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003089 # Under win32 a generic OSError can be thrown if the
3090 # handle cannot be retrieved
3091 self.skipTest("failed to query terminal size")
3092 raise
3093
Antoine Pitroucfade362012-02-08 23:48:59 +01003094 self.assertGreaterEqual(size.columns, 0)
3095 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003096
3097 def test_stty_match(self):
3098 """Check if stty returns the same results
3099
3100 stty actually tests stdin, so get_terminal_size is invoked on
3101 stdin explicitly. If stty succeeded, then get_terminal_size()
3102 should work too.
3103 """
3104 try:
3105 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003106 except (FileNotFoundError, subprocess.CalledProcessError,
3107 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003108 self.skipTest("stty invocation failed")
3109 expected = (int(size[1]), int(size[0])) # reversed order
3110
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003111 try:
3112 actual = os.get_terminal_size(sys.__stdin__.fileno())
3113 except OSError as e:
3114 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3115 # Under win32 a generic OSError can be thrown if the
3116 # handle cannot be retrieved
3117 self.skipTest("failed to query terminal size")
3118 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003119 self.assertEqual(expected, actual)
3120
3121
Victor Stinner292c8352012-10-30 02:17:38 +01003122class OSErrorTests(unittest.TestCase):
3123 def setUp(self):
3124 class Str(str):
3125 pass
3126
Victor Stinnerafe17062012-10-31 22:47:43 +01003127 self.bytes_filenames = []
3128 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003129 if support.TESTFN_UNENCODABLE is not None:
3130 decoded = support.TESTFN_UNENCODABLE
3131 else:
3132 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003133 self.unicode_filenames.append(decoded)
3134 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003135 if support.TESTFN_UNDECODABLE is not None:
3136 encoded = support.TESTFN_UNDECODABLE
3137 else:
3138 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003139 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003140 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003141 self.bytes_filenames.append(memoryview(encoded))
3142
3143 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003144
3145 def test_oserror_filename(self):
3146 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003147 (self.filenames, os.chdir,),
3148 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003149 (self.filenames, os.lstat,),
3150 (self.filenames, os.open, os.O_RDONLY),
3151 (self.filenames, os.rmdir,),
3152 (self.filenames, os.stat,),
3153 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003154 ]
3155 if sys.platform == "win32":
3156 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003157 (self.bytes_filenames, os.rename, b"dst"),
3158 (self.bytes_filenames, os.replace, b"dst"),
3159 (self.unicode_filenames, os.rename, "dst"),
3160 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003161 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003162 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003163 else:
3164 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003165 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003166 (self.filenames, os.rename, "dst"),
3167 (self.filenames, os.replace, "dst"),
3168 ))
3169 if hasattr(os, "chown"):
3170 funcs.append((self.filenames, os.chown, 0, 0))
3171 if hasattr(os, "lchown"):
3172 funcs.append((self.filenames, os.lchown, 0, 0))
3173 if hasattr(os, "truncate"):
3174 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003175 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003176 funcs.append((self.filenames, os.chflags, 0))
3177 if hasattr(os, "lchflags"):
3178 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003179 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003180 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003181 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003182 if sys.platform == "win32":
3183 funcs.append((self.bytes_filenames, os.link, b"dst"))
3184 funcs.append((self.unicode_filenames, os.link, "dst"))
3185 else:
3186 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003187 if hasattr(os, "listxattr"):
3188 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003189 (self.filenames, os.listxattr,),
3190 (self.filenames, os.getxattr, "user.test"),
3191 (self.filenames, os.setxattr, "user.test", b'user'),
3192 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003193 ))
3194 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003195 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003196 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003197 if sys.platform == "win32":
3198 funcs.append((self.unicode_filenames, os.readlink,))
3199 else:
3200 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003201
Steve Dowercc16be82016-09-08 10:35:16 -07003202
Victor Stinnerafe17062012-10-31 22:47:43 +01003203 for filenames, func, *func_args in funcs:
3204 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003205 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003206 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003207 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003208 else:
3209 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3210 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003211 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003212 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003213 except UnicodeDecodeError:
3214 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003215 else:
3216 self.fail("No exception thrown by {}".format(func))
3217
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003218class CPUCountTests(unittest.TestCase):
3219 def test_cpu_count(self):
3220 cpus = os.cpu_count()
3221 if cpus is not None:
3222 self.assertIsInstance(cpus, int)
3223 self.assertGreater(cpus, 0)
3224 else:
3225 self.skipTest("Could not determine the number of CPUs")
3226
Victor Stinnerdaf45552013-08-28 00:53:59 +02003227
3228class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003229 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003230 fd = os.open(__file__, os.O_RDONLY)
3231 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003232 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003233
Victor Stinnerdaf45552013-08-28 00:53:59 +02003234 os.set_inheritable(fd, True)
3235 self.assertEqual(os.get_inheritable(fd), True)
3236
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003237 @unittest.skipIf(fcntl is None, "need fcntl")
3238 def test_get_inheritable_cloexec(self):
3239 fd = os.open(__file__, os.O_RDONLY)
3240 self.addCleanup(os.close, fd)
3241 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003242
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003243 # clear FD_CLOEXEC flag
3244 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3245 flags &= ~fcntl.FD_CLOEXEC
3246 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003247
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003248 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003249
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003250 @unittest.skipIf(fcntl is None, "need fcntl")
3251 def test_set_inheritable_cloexec(self):
3252 fd = os.open(__file__, os.O_RDONLY)
3253 self.addCleanup(os.close, fd)
3254 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3255 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003256
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003257 os.set_inheritable(fd, True)
3258 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3259 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003260
Victor Stinnerdaf45552013-08-28 00:53:59 +02003261 def test_open(self):
3262 fd = os.open(__file__, os.O_RDONLY)
3263 self.addCleanup(os.close, fd)
3264 self.assertEqual(os.get_inheritable(fd), False)
3265
3266 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3267 def test_pipe(self):
3268 rfd, wfd = os.pipe()
3269 self.addCleanup(os.close, rfd)
3270 self.addCleanup(os.close, wfd)
3271 self.assertEqual(os.get_inheritable(rfd), False)
3272 self.assertEqual(os.get_inheritable(wfd), False)
3273
3274 def test_dup(self):
3275 fd1 = os.open(__file__, os.O_RDONLY)
3276 self.addCleanup(os.close, fd1)
3277
3278 fd2 = os.dup(fd1)
3279 self.addCleanup(os.close, fd2)
3280 self.assertEqual(os.get_inheritable(fd2), False)
3281
3282 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3283 def test_dup2(self):
3284 fd = os.open(__file__, os.O_RDONLY)
3285 self.addCleanup(os.close, fd)
3286
3287 # inheritable by default
3288 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003289 self.addCleanup(os.close, fd2)
3290 self.assertEqual(os.dup2(fd, fd2), fd2)
3291 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003292
3293 # force non-inheritable
3294 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003295 self.addCleanup(os.close, fd3)
3296 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3297 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003298
3299 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3300 def test_openpty(self):
3301 master_fd, slave_fd = os.openpty()
3302 self.addCleanup(os.close, master_fd)
3303 self.addCleanup(os.close, slave_fd)
3304 self.assertEqual(os.get_inheritable(master_fd), False)
3305 self.assertEqual(os.get_inheritable(slave_fd), False)
3306
3307
Brett Cannon3f9183b2016-08-26 14:44:48 -07003308class PathTConverterTests(unittest.TestCase):
3309 # tuples of (function name, allows fd arguments, additional arguments to
3310 # function, cleanup function)
3311 functions = [
3312 ('stat', True, (), None),
3313 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003314 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003315 ('chflags', False, (0,), None),
3316 ('lchflags', False, (0,), None),
3317 ('open', False, (0,), getattr(os, 'close', None)),
3318 ]
3319
3320 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003321 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003322 if os.name == 'nt':
3323 bytes_fspath = bytes_filename = None
3324 else:
3325 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003326 bytes_fspath = FakePath(bytes_filename)
3327 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003328 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003329 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003330
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003331 int_fspath = FakePath(fd)
3332 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003333
3334 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3335 with self.subTest(name=name):
3336 try:
3337 fn = getattr(os, name)
3338 except AttributeError:
3339 continue
3340
Brett Cannon8f96a302016-08-26 19:30:11 -07003341 for path in (str_filename, bytes_filename, str_fspath,
3342 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003343 if path is None:
3344 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003345 with self.subTest(name=name, path=path):
3346 result = fn(path, *extra_args)
3347 if cleanup_fn is not None:
3348 cleanup_fn(result)
3349
3350 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003351 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003352 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003353
3354 if allow_fd:
3355 result = fn(fd, *extra_args) # should not fail
3356 if cleanup_fn is not None:
3357 cleanup_fn(result)
3358 else:
3359 with self.assertRaisesRegex(
3360 TypeError,
3361 'os.PathLike'):
3362 fn(fd, *extra_args)
3363
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003364 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003365 msg = r'__fspath__\(\) to return str or bytes, not %s'
3366 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003367 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003368 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003369 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003370 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003371 os.stat(FakePath(object()))
3372
Brett Cannon3f9183b2016-08-26 14:44:48 -07003373
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003374@unittest.skipUnless(hasattr(os, 'get_blocking'),
3375 'needs os.get_blocking() and os.set_blocking()')
3376class BlockingTests(unittest.TestCase):
3377 def test_blocking(self):
3378 fd = os.open(__file__, os.O_RDONLY)
3379 self.addCleanup(os.close, fd)
3380 self.assertEqual(os.get_blocking(fd), True)
3381
3382 os.set_blocking(fd, False)
3383 self.assertEqual(os.get_blocking(fd), False)
3384
3385 os.set_blocking(fd, True)
3386 self.assertEqual(os.get_blocking(fd), True)
3387
3388
Yury Selivanov97e2e062014-09-26 12:33:06 -04003389
3390class ExportsTests(unittest.TestCase):
3391 def test_os_all(self):
3392 self.assertIn('open', os.__all__)
3393 self.assertIn('walk', os.__all__)
3394
3395
Victor Stinner6036e442015-03-08 01:58:04 +01003396class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003397 check_no_resource_warning = support.check_no_resource_warning
3398
Victor Stinner6036e442015-03-08 01:58:04 +01003399 def setUp(self):
3400 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003401 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003402 self.addCleanup(support.rmtree, self.path)
3403 os.mkdir(self.path)
3404
3405 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003406 path = self.bytes_path if isinstance(name, bytes) else self.path
3407 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003408 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003409 return filename
3410
3411 def get_entries(self, names):
3412 entries = dict((entry.name, entry)
3413 for entry in os.scandir(self.path))
3414 self.assertEqual(sorted(entries.keys()), names)
3415 return entries
3416
3417 def assert_stat_equal(self, stat1, stat2, skip_fields):
3418 if skip_fields:
3419 for attr in dir(stat1):
3420 if not attr.startswith("st_"):
3421 continue
3422 if attr in ("st_dev", "st_ino", "st_nlink"):
3423 continue
3424 self.assertEqual(getattr(stat1, attr),
3425 getattr(stat2, attr),
3426 (stat1, stat2, attr))
3427 else:
3428 self.assertEqual(stat1, stat2)
3429
3430 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003431 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003432 self.assertEqual(entry.name, name)
3433 self.assertEqual(entry.path, os.path.join(self.path, name))
3434 self.assertEqual(entry.inode(),
3435 os.stat(entry.path, follow_symlinks=False).st_ino)
3436
3437 entry_stat = os.stat(entry.path)
3438 self.assertEqual(entry.is_dir(),
3439 stat.S_ISDIR(entry_stat.st_mode))
3440 self.assertEqual(entry.is_file(),
3441 stat.S_ISREG(entry_stat.st_mode))
3442 self.assertEqual(entry.is_symlink(),
3443 os.path.islink(entry.path))
3444
3445 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3446 self.assertEqual(entry.is_dir(follow_symlinks=False),
3447 stat.S_ISDIR(entry_lstat.st_mode))
3448 self.assertEqual(entry.is_file(follow_symlinks=False),
3449 stat.S_ISREG(entry_lstat.st_mode))
3450
3451 self.assert_stat_equal(entry.stat(),
3452 entry_stat,
3453 os.name == 'nt' and not is_symlink)
3454 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3455 entry_lstat,
3456 os.name == 'nt')
3457
3458 def test_attributes(self):
3459 link = hasattr(os, 'link')
3460 symlink = support.can_symlink()
3461
3462 dirname = os.path.join(self.path, "dir")
3463 os.mkdir(dirname)
3464 filename = self.create_file("file.txt")
3465 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003466 try:
3467 os.link(filename, os.path.join(self.path, "link_file.txt"))
3468 except PermissionError as e:
3469 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003470 if symlink:
3471 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3472 target_is_directory=True)
3473 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3474
3475 names = ['dir', 'file.txt']
3476 if link:
3477 names.append('link_file.txt')
3478 if symlink:
3479 names.extend(('symlink_dir', 'symlink_file.txt'))
3480 entries = self.get_entries(names)
3481
3482 entry = entries['dir']
3483 self.check_entry(entry, 'dir', True, False, False)
3484
3485 entry = entries['file.txt']
3486 self.check_entry(entry, 'file.txt', False, True, False)
3487
3488 if link:
3489 entry = entries['link_file.txt']
3490 self.check_entry(entry, 'link_file.txt', False, True, False)
3491
3492 if symlink:
3493 entry = entries['symlink_dir']
3494 self.check_entry(entry, 'symlink_dir', True, False, True)
3495
3496 entry = entries['symlink_file.txt']
3497 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3498
3499 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003500 path = self.bytes_path if isinstance(name, bytes) else self.path
3501 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003502 self.assertEqual(len(entries), 1)
3503
3504 entry = entries[0]
3505 self.assertEqual(entry.name, name)
3506 return entry
3507
Brett Cannon96881cd2016-06-10 14:37:21 -07003508 def create_file_entry(self, name='file.txt'):
3509 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003510 return self.get_entry(os.path.basename(filename))
3511
3512 def test_current_directory(self):
3513 filename = self.create_file()
3514 old_dir = os.getcwd()
3515 try:
3516 os.chdir(self.path)
3517
3518 # call scandir() without parameter: it must list the content
3519 # of the current directory
3520 entries = dict((entry.name, entry) for entry in os.scandir())
3521 self.assertEqual(sorted(entries.keys()),
3522 [os.path.basename(filename)])
3523 finally:
3524 os.chdir(old_dir)
3525
3526 def test_repr(self):
3527 entry = self.create_file_entry()
3528 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3529
Brett Cannon96881cd2016-06-10 14:37:21 -07003530 def test_fspath_protocol(self):
3531 entry = self.create_file_entry()
3532 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3533
3534 def test_fspath_protocol_bytes(self):
3535 bytes_filename = os.fsencode('bytesfile.txt')
3536 bytes_entry = self.create_file_entry(name=bytes_filename)
3537 fspath = os.fspath(bytes_entry)
3538 self.assertIsInstance(fspath, bytes)
3539 self.assertEqual(fspath,
3540 os.path.join(os.fsencode(self.path),bytes_filename))
3541
Victor Stinner6036e442015-03-08 01:58:04 +01003542 def test_removed_dir(self):
3543 path = os.path.join(self.path, 'dir')
3544
3545 os.mkdir(path)
3546 entry = self.get_entry('dir')
3547 os.rmdir(path)
3548
3549 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3550 if os.name == 'nt':
3551 self.assertTrue(entry.is_dir())
3552 self.assertFalse(entry.is_file())
3553 self.assertFalse(entry.is_symlink())
3554 if os.name == 'nt':
3555 self.assertRaises(FileNotFoundError, entry.inode)
3556 # don't fail
3557 entry.stat()
3558 entry.stat(follow_symlinks=False)
3559 else:
3560 self.assertGreater(entry.inode(), 0)
3561 self.assertRaises(FileNotFoundError, entry.stat)
3562 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3563
3564 def test_removed_file(self):
3565 entry = self.create_file_entry()
3566 os.unlink(entry.path)
3567
3568 self.assertFalse(entry.is_dir())
3569 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3570 if os.name == 'nt':
3571 self.assertTrue(entry.is_file())
3572 self.assertFalse(entry.is_symlink())
3573 if os.name == 'nt':
3574 self.assertRaises(FileNotFoundError, entry.inode)
3575 # don't fail
3576 entry.stat()
3577 entry.stat(follow_symlinks=False)
3578 else:
3579 self.assertGreater(entry.inode(), 0)
3580 self.assertRaises(FileNotFoundError, entry.stat)
3581 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3582
3583 def test_broken_symlink(self):
3584 if not support.can_symlink():
3585 return self.skipTest('cannot create symbolic link')
3586
3587 filename = self.create_file("file.txt")
3588 os.symlink(filename,
3589 os.path.join(self.path, "symlink.txt"))
3590 entries = self.get_entries(['file.txt', 'symlink.txt'])
3591 entry = entries['symlink.txt']
3592 os.unlink(filename)
3593
3594 self.assertGreater(entry.inode(), 0)
3595 self.assertFalse(entry.is_dir())
3596 self.assertFalse(entry.is_file()) # broken symlink returns False
3597 self.assertFalse(entry.is_dir(follow_symlinks=False))
3598 self.assertFalse(entry.is_file(follow_symlinks=False))
3599 self.assertTrue(entry.is_symlink())
3600 self.assertRaises(FileNotFoundError, entry.stat)
3601 # don't fail
3602 entry.stat(follow_symlinks=False)
3603
3604 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003605 self.create_file("file.txt")
3606
3607 path_bytes = os.fsencode(self.path)
3608 entries = list(os.scandir(path_bytes))
3609 self.assertEqual(len(entries), 1, entries)
3610 entry = entries[0]
3611
3612 self.assertEqual(entry.name, b'file.txt')
3613 self.assertEqual(entry.path,
3614 os.fsencode(os.path.join(self.path, 'file.txt')))
3615
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003616 def test_bytes_like(self):
3617 self.create_file("file.txt")
3618
3619 for cls in bytearray, memoryview:
3620 path_bytes = cls(os.fsencode(self.path))
3621 with self.assertWarns(DeprecationWarning):
3622 entries = list(os.scandir(path_bytes))
3623 self.assertEqual(len(entries), 1, entries)
3624 entry = entries[0]
3625
3626 self.assertEqual(entry.name, b'file.txt')
3627 self.assertEqual(entry.path,
3628 os.fsencode(os.path.join(self.path, 'file.txt')))
3629 self.assertIs(type(entry.name), bytes)
3630 self.assertIs(type(entry.path), bytes)
3631
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003632 @unittest.skipUnless(os.listdir in os.supports_fd,
3633 'fd support for listdir required for this test.')
3634 def test_fd(self):
3635 self.assertIn(os.scandir, os.supports_fd)
3636 self.create_file('file.txt')
3637 expected_names = ['file.txt']
3638 if support.can_symlink():
3639 os.symlink('file.txt', os.path.join(self.path, 'link'))
3640 expected_names.append('link')
3641
3642 fd = os.open(self.path, os.O_RDONLY)
3643 try:
3644 with os.scandir(fd) as it:
3645 entries = list(it)
3646 names = [entry.name for entry in entries]
3647 self.assertEqual(sorted(names), expected_names)
3648 self.assertEqual(names, os.listdir(fd))
3649 for entry in entries:
3650 self.assertEqual(entry.path, entry.name)
3651 self.assertEqual(os.fspath(entry), entry.name)
3652 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3653 if os.stat in os.supports_dir_fd:
3654 st = os.stat(entry.name, dir_fd=fd)
3655 self.assertEqual(entry.stat(), st)
3656 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3657 self.assertEqual(entry.stat(follow_symlinks=False), st)
3658 finally:
3659 os.close(fd)
3660
Victor Stinner6036e442015-03-08 01:58:04 +01003661 def test_empty_path(self):
3662 self.assertRaises(FileNotFoundError, os.scandir, '')
3663
3664 def test_consume_iterator_twice(self):
3665 self.create_file("file.txt")
3666 iterator = os.scandir(self.path)
3667
3668 entries = list(iterator)
3669 self.assertEqual(len(entries), 1, entries)
3670
3671 # check than consuming the iterator twice doesn't raise exception
3672 entries2 = list(iterator)
3673 self.assertEqual(len(entries2), 0, entries2)
3674
3675 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003676 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003677 self.assertRaises(TypeError, os.scandir, obj)
3678
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003679 def test_close(self):
3680 self.create_file("file.txt")
3681 self.create_file("file2.txt")
3682 iterator = os.scandir(self.path)
3683 next(iterator)
3684 iterator.close()
3685 # multiple closes
3686 iterator.close()
3687 with self.check_no_resource_warning():
3688 del iterator
3689
3690 def test_context_manager(self):
3691 self.create_file("file.txt")
3692 self.create_file("file2.txt")
3693 with os.scandir(self.path) as iterator:
3694 next(iterator)
3695 with self.check_no_resource_warning():
3696 del iterator
3697
3698 def test_context_manager_close(self):
3699 self.create_file("file.txt")
3700 self.create_file("file2.txt")
3701 with os.scandir(self.path) as iterator:
3702 next(iterator)
3703 iterator.close()
3704
3705 def test_context_manager_exception(self):
3706 self.create_file("file.txt")
3707 self.create_file("file2.txt")
3708 with self.assertRaises(ZeroDivisionError):
3709 with os.scandir(self.path) as iterator:
3710 next(iterator)
3711 1/0
3712 with self.check_no_resource_warning():
3713 del iterator
3714
3715 def test_resource_warning(self):
3716 self.create_file("file.txt")
3717 self.create_file("file2.txt")
3718 iterator = os.scandir(self.path)
3719 next(iterator)
3720 with self.assertWarns(ResourceWarning):
3721 del iterator
3722 support.gc_collect()
3723 # exhausted iterator
3724 iterator = os.scandir(self.path)
3725 list(iterator)
3726 with self.check_no_resource_warning():
3727 del iterator
3728
Victor Stinner6036e442015-03-08 01:58:04 +01003729
Ethan Furmancdc08792016-06-02 15:06:09 -07003730class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003731
3732 # Abstracted so it can be overridden to test pure Python implementation
3733 # if a C version is provided.
3734 fspath = staticmethod(os.fspath)
3735
Ethan Furmancdc08792016-06-02 15:06:09 -07003736 def test_return_bytes(self):
3737 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003738 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003739
3740 def test_return_string(self):
3741 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003742 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003743
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003744 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003745 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003746 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003747
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003748 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003749 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3750 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3751
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003752 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003753 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3754 self.assertTrue(issubclass(FakePath, os.PathLike))
3755 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003756
Ethan Furmancdc08792016-06-02 15:06:09 -07003757 def test_garbage_in_exception_out(self):
3758 vapor = type('blah', (), {})
3759 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003760 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003761
3762 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003763 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003764
Brett Cannon044283a2016-07-15 10:41:49 -07003765 def test_bad_pathlike(self):
3766 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003767 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003768 # __fspath__ attribute that is not callable.
3769 c = type('foo', (), {})
3770 c.__fspath__ = 1
3771 self.assertRaises(TypeError, self.fspath, c())
3772 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003773 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003774 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003775
Victor Stinnerc29b5852017-11-02 07:28:27 -07003776
3777class TimesTests(unittest.TestCase):
3778 def test_times(self):
3779 times = os.times()
3780 self.assertIsInstance(times, os.times_result)
3781
3782 for field in ('user', 'system', 'children_user', 'children_system',
3783 'elapsed'):
3784 value = getattr(times, field)
3785 self.assertIsInstance(value, float)
3786
3787 if os.name == 'nt':
3788 self.assertEqual(times.children_user, 0)
3789 self.assertEqual(times.children_system, 0)
3790 self.assertEqual(times.elapsed, 0)
3791
3792
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003793# Only test if the C version is provided, otherwise TestPEP519 already tested
3794# the pure Python implementation.
3795if hasattr(os, "_fspath"):
3796 class TestPEP519PurePython(TestPEP519):
3797
3798 """Explicitly test the pure Python implementation of os.fspath()."""
3799
3800 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003801
3802
Fred Drake2e2be372001-09-20 21:33:42 +00003803if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003804 unittest.main()