blob: 820c99c7a07cb469495e10d05e4ebbf1d299b7e7 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070031from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020037try:
38 import fcntl
39except ImportError:
40 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010041try:
42 import _winapi
43except ImportError:
44 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020045try:
R David Murrayf2ad1732014-12-25 18:36:56 -050046 import grp
47 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
48 if hasattr(os, 'getgid'):
49 process_gid = os.getgid()
50 if process_gid not in groups:
51 groups.append(process_gid)
52except ImportError:
53 groups = []
54try:
55 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Berker Peksagce643912015-05-06 06:33:17 +030064from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020065from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000066
Victor Stinner923590e2016-03-24 09:11:48 +010067
R David Murrayf2ad1732014-12-25 18:36:56 -050068root_in_posix = False
69if hasattr(os, 'geteuid'):
70 root_in_posix = (os.geteuid() == 0)
71
Mark Dickinson7cf03892010-04-16 13:45:35 +000072# Detect whether we're on a Linux system that uses the (now outdated
73# and unmaintained) linuxthreads threading library. There's an issue
74# when combining linuxthreads with a failed execv call: see
75# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020076if hasattr(sys, 'thread_info') and sys.thread_info.version:
77 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
78else:
79 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000080
Stefan Krahebee49a2013-01-17 15:31:00 +010081# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
82HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
83
Victor Stinner923590e2016-03-24 09:11:48 +010084
Berker Peksag4af23d72016-09-15 20:32:44 +030085def requires_os_func(name):
86 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
87
88
Victor Stinnerae39d232016-03-24 17:12:55 +010089def create_file(filename, content=b'content'):
90 with open(filename, "xb", 0) as fp:
91 fp.write(content)
92
93
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094# Tests creating TESTFN
95class FileTests(unittest.TestCase):
96 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000097 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000098 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000099 tearDown = setUp
100
101 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000102 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000103 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000104 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000105
Christian Heimesfdab48e2008-01-20 09:06:41 +0000106 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000107 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
108 # We must allocate two consecutive file descriptors, otherwise
109 # it will mess up other file descriptors (perhaps even the three
110 # standard ones).
111 second = os.dup(first)
112 try:
113 retries = 0
114 while second != first + 1:
115 os.close(first)
116 retries += 1
117 if retries > 10:
118 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000119 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000120 first, second = second, os.dup(second)
121 finally:
122 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000123 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000124 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000125 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000126
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000127 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000128 def test_rename(self):
129 path = support.TESTFN
130 old = sys.getrefcount(path)
131 self.assertRaises(TypeError, os.rename, path, 0)
132 new = sys.getrefcount(path)
133 self.assertEqual(old, new)
134
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000135 def test_read(self):
136 with open(support.TESTFN, "w+b") as fobj:
137 fobj.write(b"spam")
138 fobj.flush()
139 fd = fobj.fileno()
140 os.lseek(fd, 0, 0)
141 s = os.read(fd, 4)
142 self.assertEqual(type(s), bytes)
143 self.assertEqual(s, b"spam")
144
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200145 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200146 # Skip the test on 32-bit platforms: the number of bytes must fit in a
147 # Py_ssize_t type
148 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
149 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200150 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
151 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200152 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100153 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200154
155 # Issue #21932: Make sure that os.read() does not raise an
156 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200157 with open(support.TESTFN, "rb") as fp:
158 data = os.read(fp.fileno(), size)
159
Victor Stinner8c663fd2017-11-08 14:44:44 -0800160 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200161 # operating system is free to return less bytes than requested.
162 self.assertEqual(data, b'test')
163
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000164 def test_write(self):
165 # os.write() accepts bytes- and buffer-like objects but not strings
166 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
167 self.assertRaises(TypeError, os.write, fd, "beans")
168 os.write(fd, b"bacon\n")
169 os.write(fd, bytearray(b"eggs\n"))
170 os.write(fd, memoryview(b"spam\n"))
171 os.close(fd)
172 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000173 self.assertEqual(fobj.read().splitlines(),
174 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000175
Victor Stinnere0daff12011-03-20 23:36:35 +0100176 def write_windows_console(self, *args):
177 retcode = subprocess.call(args,
178 # use a new console to not flood the test output
179 creationflags=subprocess.CREATE_NEW_CONSOLE,
180 # use a shell to hide the console window (SW_HIDE)
181 shell=True)
182 self.assertEqual(retcode, 0)
183
184 @unittest.skipUnless(sys.platform == 'win32',
185 'test specific to the Windows console')
186 def test_write_windows_console(self):
187 # Issue #11395: the Windows console returns an error (12: not enough
188 # space error) on writing into stdout if stdout mode is binary and the
189 # length is greater than 66,000 bytes (or less, depending on heap
190 # usage).
191 code = "print('x' * 100000)"
192 self.write_windows_console(sys.executable, "-c", code)
193 self.write_windows_console(sys.executable, "-u", "-c", code)
194
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000195 def fdopen_helper(self, *args):
196 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200197 f = os.fdopen(fd, *args)
198 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000199
200 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200201 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
202 os.close(fd)
203
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000204 self.fdopen_helper()
205 self.fdopen_helper('r')
206 self.fdopen_helper('r', 100)
207
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100208 def test_replace(self):
209 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100210 self.addCleanup(support.unlink, support.TESTFN)
211 self.addCleanup(support.unlink, TESTFN2)
212
213 create_file(support.TESTFN, b"1")
214 create_file(TESTFN2, b"2")
215
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100216 os.replace(support.TESTFN, TESTFN2)
217 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
218 with open(TESTFN2, 'r') as f:
219 self.assertEqual(f.read(), "1")
220
Martin Panterbf19d162015-09-09 01:01:13 +0000221 def test_open_keywords(self):
222 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
223 dir_fd=None)
224 os.close(f)
225
226 def test_symlink_keywords(self):
227 symlink = support.get_attribute(os, "symlink")
228 try:
229 symlink(src='target', dst=support.TESTFN,
230 target_is_directory=False, dir_fd=None)
231 except (NotImplementedError, OSError):
232 pass # No OS support or unprivileged user
233
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235# Test attributes on return values from os.*stat* family.
236class StatAttributeTests(unittest.TestCase):
237 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200238 self.fname = support.TESTFN
239 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100240 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
Antoine Pitrou38425292010-09-21 18:19:07 +0000242 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000243 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244
245 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(result[stat.ST_SIZE], 3)
247 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 # Make sure all the attributes are there
250 members = dir(result)
251 for name in dir(stat):
252 if name[:3] == 'ST_':
253 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000254 if name.endswith("TIME"):
255 def trunc(x): return int(x)
256 else:
257 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700263 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700264 for name in 'st_atime st_mtime st_ctime'.split():
265 floaty = int(getattr(result, name) * 100000)
266 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700267 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700268
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 try:
270 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 except IndexError:
273 pass
274
275 # Make sure that assignment fails
276 try:
277 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200278 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000279 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000280 pass
281
282 try:
283 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200284 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000285 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 pass
287
288 try:
289 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except AttributeError:
292 pass
293
294 # Use the stat_result constructor with a too-short tuple.
295 try:
296 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200297 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000298 except TypeError:
299 pass
300
Ezio Melotti42da6632011-03-15 05:18:48 +0200301 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000302 try:
303 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
304 except TypeError:
305 pass
306
Antoine Pitrou38425292010-09-21 18:19:07 +0000307 def test_stat_attributes(self):
308 self.check_stat_attributes(self.fname)
309
310 def test_stat_attributes_bytes(self):
311 try:
312 fname = self.fname.encode(sys.getfilesystemencoding())
313 except UnicodeEncodeError:
314 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700329 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100340 self.assertTrue(isinstance(result.f_fsid, int))
341
342 # Test that the size of the tuple doesn't change
343 self.assertEqual(len(result), 10)
344
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700374 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Berker Peksag0b4dc482016-09-17 15:49:59 +0300434 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
435 def test_access_denied(self):
436 # Default to FindFirstFile WIN32_FIND_DATA when access is
437 # denied. See issue 28075.
438 # os.environ['TEMP'] should be located on a volume that
439 # supports file ACLs.
440 fname = os.path.join(os.environ['TEMP'], self.fname)
441 self.addCleanup(support.unlink, fname)
442 create_file(fname, b'ABC')
443 # Deny the right to [S]YNCHRONIZE on the file to
444 # force CreateFile to fail with ERROR_ACCESS_DENIED.
445 DETACHED_PROCESS = 8
446 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500447 # bpo-30584: Use security identifier *S-1-5-32-545 instead
448 # of localized "Users" to not depend on the locale.
449 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300450 creationflags=DETACHED_PROCESS
451 )
452 result = os.stat(fname)
453 self.assertNotEqual(result.st_size, 0)
454
Victor Stinner47aacc82015-06-12 17:26:23 +0200455
456class UtimeTests(unittest.TestCase):
457 def setUp(self):
458 self.dirname = support.TESTFN
459 self.fname = os.path.join(self.dirname, "f1")
460
461 self.addCleanup(support.rmtree, self.dirname)
462 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100463 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200464
Victor Stinner47aacc82015-06-12 17:26:23 +0200465 def support_subsecond(self, filename):
466 # Heuristic to check if the filesystem supports timestamp with
467 # subsecond resolution: check if float and int timestamps are different
468 st = os.stat(filename)
469 return ((st.st_atime != st[7])
470 or (st.st_mtime != st[8])
471 or (st.st_ctime != st[9]))
472
473 def _test_utime(self, set_time, filename=None):
474 if not filename:
475 filename = self.fname
476
477 support_subsecond = self.support_subsecond(filename)
478 if support_subsecond:
479 # Timestamp with a resolution of 1 microsecond (10^-6).
480 #
481 # The resolution of the C internal function used by os.utime()
482 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
483 # test with a resolution of 1 ns requires more work:
484 # see the issue #15745.
485 atime_ns = 1002003000 # 1.002003 seconds
486 mtime_ns = 4005006000 # 4.005006 seconds
487 else:
488 # use a resolution of 1 second
489 atime_ns = 5 * 10**9
490 mtime_ns = 8 * 10**9
491
492 set_time(filename, (atime_ns, mtime_ns))
493 st = os.stat(filename)
494
495 if support_subsecond:
496 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
497 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
498 else:
499 self.assertEqual(st.st_atime, atime_ns * 1e-9)
500 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
501 self.assertEqual(st.st_atime_ns, atime_ns)
502 self.assertEqual(st.st_mtime_ns, mtime_ns)
503
504 def test_utime(self):
505 def set_time(filename, ns):
506 # test the ns keyword parameter
507 os.utime(filename, ns=ns)
508 self._test_utime(set_time)
509
510 @staticmethod
511 def ns_to_sec(ns):
512 # Convert a number of nanosecond (int) to a number of seconds (float).
513 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
514 # issue, os.utime() rounds towards minus infinity.
515 return (ns * 1e-9) + 0.5e-9
516
517 def test_utime_by_indexed(self):
518 # pass times as floating point seconds as the second indexed parameter
519 def set_time(filename, ns):
520 atime_ns, mtime_ns = ns
521 atime = self.ns_to_sec(atime_ns)
522 mtime = self.ns_to_sec(mtime_ns)
523 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
524 # or utime(time_t)
525 os.utime(filename, (atime, mtime))
526 self._test_utime(set_time)
527
528 def test_utime_by_times(self):
529 def set_time(filename, ns):
530 atime_ns, mtime_ns = ns
531 atime = self.ns_to_sec(atime_ns)
532 mtime = self.ns_to_sec(mtime_ns)
533 # test the times keyword parameter
534 os.utime(filename, times=(atime, mtime))
535 self._test_utime(set_time)
536
537 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
538 "follow_symlinks support for utime required "
539 "for this test.")
540 def test_utime_nofollow_symlinks(self):
541 def set_time(filename, ns):
542 # use follow_symlinks=False to test utimensat(timespec)
543 # or lutimes(timeval)
544 os.utime(filename, ns=ns, follow_symlinks=False)
545 self._test_utime(set_time)
546
547 @unittest.skipUnless(os.utime in os.supports_fd,
548 "fd support for utime required for this test.")
549 def test_utime_fd(self):
550 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100551 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200552 # use a file descriptor to test futimens(timespec)
553 # or futimes(timeval)
554 os.utime(fp.fileno(), ns=ns)
555 self._test_utime(set_time)
556
557 @unittest.skipUnless(os.utime in os.supports_dir_fd,
558 "dir_fd support for utime required for this test.")
559 def test_utime_dir_fd(self):
560 def set_time(filename, ns):
561 dirname, name = os.path.split(filename)
562 dirfd = os.open(dirname, os.O_RDONLY)
563 try:
564 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
565 os.utime(name, dir_fd=dirfd, ns=ns)
566 finally:
567 os.close(dirfd)
568 self._test_utime(set_time)
569
570 def test_utime_directory(self):
571 def set_time(filename, ns):
572 # test calling os.utime() on a directory
573 os.utime(filename, ns=ns)
574 self._test_utime(set_time, filename=self.dirname)
575
576 def _test_utime_current(self, set_time):
577 # Get the system clock
578 current = time.time()
579
580 # Call os.utime() to set the timestamp to the current system clock
581 set_time(self.fname)
582
583 if not self.support_subsecond(self.fname):
584 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700585 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200586 # On Windows, the usual resolution of time.time() is 15.6 ms.
587 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700588 #
589 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
590 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200591 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200592 st = os.stat(self.fname)
593 msg = ("st_time=%r, current=%r, dt=%r"
594 % (st.st_mtime, current, st.st_mtime - current))
595 self.assertAlmostEqual(st.st_mtime, current,
596 delta=delta, msg=msg)
597
598 def test_utime_current(self):
599 def set_time(filename):
600 # Set to the current time in the new way
601 os.utime(self.fname)
602 self._test_utime_current(set_time)
603
604 def test_utime_current_old(self):
605 def set_time(filename):
606 # Set to the current time in the old explicit way.
607 os.utime(self.fname, None)
608 self._test_utime_current(set_time)
609
610 def get_file_system(self, path):
611 if sys.platform == 'win32':
612 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
613 import ctypes
614 kernel32 = ctypes.windll.kernel32
615 buf = ctypes.create_unicode_buffer("", 100)
616 ok = kernel32.GetVolumeInformationW(root, None, 0,
617 None, None, None,
618 buf, len(buf))
619 if ok:
620 return buf.value
621 # return None if the filesystem is unknown
622
623 def test_large_time(self):
624 # Many filesystems are limited to the year 2038. At least, the test
625 # pass with NTFS filesystem.
626 if self.get_file_system(self.dirname) != "NTFS":
627 self.skipTest("requires NTFS")
628
629 large = 5000000000 # some day in 2128
630 os.utime(self.fname, (large, large))
631 self.assertEqual(os.stat(self.fname).st_mtime, large)
632
633 def test_utime_invalid_arguments(self):
634 # seconds and nanoseconds parameters are mutually exclusive
635 with self.assertRaises(ValueError):
636 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200637 with self.assertRaises(TypeError):
638 os.utime(self.fname, [5, 5])
639 with self.assertRaises(TypeError):
640 os.utime(self.fname, (5,))
641 with self.assertRaises(TypeError):
642 os.utime(self.fname, (5, 5, 5))
643 with self.assertRaises(TypeError):
644 os.utime(self.fname, ns=[5, 5])
645 with self.assertRaises(TypeError):
646 os.utime(self.fname, ns=(5,))
647 with self.assertRaises(TypeError):
648 os.utime(self.fname, ns=(5, 5, 5))
649
650 if os.utime not in os.supports_follow_symlinks:
651 with self.assertRaises(NotImplementedError):
652 os.utime(self.fname, (5, 5), follow_symlinks=False)
653 if os.utime not in os.supports_fd:
654 with open(self.fname, 'wb', 0) as fp:
655 with self.assertRaises(TypeError):
656 os.utime(fp.fileno(), (5, 5))
657 if os.utime not in os.supports_dir_fd:
658 with self.assertRaises(NotImplementedError):
659 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200660
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300661 @support.cpython_only
662 def test_issue31577(self):
663 # The interpreter shouldn't crash in case utime() received a bad
664 # ns argument.
665 def get_bad_int(divmod_ret_val):
666 class BadInt:
667 def __divmod__(*args):
668 return divmod_ret_val
669 return BadInt()
670 with self.assertRaises(TypeError):
671 os.utime(self.fname, ns=(get_bad_int(42), 1))
672 with self.assertRaises(TypeError):
673 os.utime(self.fname, ns=(get_bad_int(()), 1))
674 with self.assertRaises(TypeError):
675 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
676
Victor Stinner47aacc82015-06-12 17:26:23 +0200677
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000678from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000679
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000680class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000681 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000682 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000683
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000684 def setUp(self):
685 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000686 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000687 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000688 for key, value in self._reference().items():
689 os.environ[key] = value
690
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000691 def tearDown(self):
692 os.environ.clear()
693 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000694 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000695 os.environb.clear()
696 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000697
Christian Heimes90333392007-11-01 19:08:42 +0000698 def _reference(self):
699 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
700
701 def _empty_mapping(self):
702 os.environ.clear()
703 return os.environ
704
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000705 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200706 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
707 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000708 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000709 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300710 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200711 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300712 value = popen.read().strip()
713 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000714
Xavier de Gayed1415312016-07-22 12:15:29 +0200715 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
716 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000717 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200718 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
719 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300720 it = iter(popen)
721 self.assertEqual(next(it), "line1\n")
722 self.assertEqual(next(it), "line2\n")
723 self.assertEqual(next(it), "line3\n")
724 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000725
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000726 # Verify environ keys and values from the OS are of the
727 # correct str type.
728 def test_keyvalue_types(self):
729 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000730 self.assertEqual(type(key), str)
731 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000732
Christian Heimes90333392007-11-01 19:08:42 +0000733 def test_items(self):
734 for key, value in self._reference().items():
735 self.assertEqual(os.environ.get(key), value)
736
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000737 # Issue 7310
738 def test___repr__(self):
739 """Check that the repr() of os.environ looks like environ({...})."""
740 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000741 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
742 '{!r}: {!r}'.format(key, value)
743 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000744
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000745 def test_get_exec_path(self):
746 defpath_list = os.defpath.split(os.pathsep)
747 test_path = ['/monty', '/python', '', '/flying/circus']
748 test_env = {'PATH': os.pathsep.join(test_path)}
749
750 saved_environ = os.environ
751 try:
752 os.environ = dict(test_env)
753 # Test that defaulting to os.environ works.
754 self.assertSequenceEqual(test_path, os.get_exec_path())
755 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
756 finally:
757 os.environ = saved_environ
758
759 # No PATH environment variable
760 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
761 # Empty PATH environment variable
762 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
763 # Supplied PATH environment variable
764 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
765
Victor Stinnerb745a742010-05-18 17:17:23 +0000766 if os.supports_bytes_environ:
767 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000768 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000769 # ignore BytesWarning warning
770 with warnings.catch_warnings(record=True):
771 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000772 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000773 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000774 pass
775 else:
776 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000777
778 # bytes key and/or value
779 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
780 ['abc'])
781 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
782 ['abc'])
783 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
784 ['abc'])
785
786 @unittest.skipUnless(os.supports_bytes_environ,
787 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000788 def test_environb(self):
789 # os.environ -> os.environb
790 value = 'euro\u20ac'
791 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000792 value_bytes = value.encode(sys.getfilesystemencoding(),
793 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000794 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000795 msg = "U+20AC character is not encodable to %s" % (
796 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000797 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000798 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000799 self.assertEqual(os.environ['unicode'], value)
800 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000801
802 # os.environb -> os.environ
803 value = b'\xff'
804 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000805 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000806 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000807 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000808
Victor Stinner13ff2452018-01-22 18:32:50 +0100809 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100810 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100811 def test_unset_error(self):
812 if sys.platform == "win32":
813 # an environment variable is limited to 32,767 characters
814 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100815 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100816 else:
817 # "=" is not allowed in a variable name
818 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100819 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100820
Victor Stinner6d101392013-04-14 16:35:04 +0200821 def test_key_type(self):
822 missing = 'missingkey'
823 self.assertNotIn(missing, os.environ)
824
Victor Stinner839e5ea2013-04-14 16:43:03 +0200825 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200826 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200827 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200828 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200829
Victor Stinner839e5ea2013-04-14 16:43:03 +0200830 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200831 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200832 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200833 self.assertTrue(cm.exception.__suppress_context__)
834
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300835 def _test_environ_iteration(self, collection):
836 iterator = iter(collection)
837 new_key = "__new_key__"
838
839 next(iterator) # start iteration over os.environ.items
840
841 # add a new key in os.environ mapping
842 os.environ[new_key] = "test_environ_iteration"
843
844 try:
845 next(iterator) # force iteration over modified mapping
846 self.assertEqual(os.environ[new_key], "test_environ_iteration")
847 finally:
848 del os.environ[new_key]
849
850 def test_iter_error_when_changing_os_environ(self):
851 self._test_environ_iteration(os.environ)
852
853 def test_iter_error_when_changing_os_environ_items(self):
854 self._test_environ_iteration(os.environ.items())
855
856 def test_iter_error_when_changing_os_environ_values(self):
857 self._test_environ_iteration(os.environ.values())
858
Victor Stinner6d101392013-04-14 16:35:04 +0200859
Tim Petersc4e09402003-04-25 07:11:48 +0000860class WalkTests(unittest.TestCase):
861 """Tests for os.walk()."""
862
Victor Stinner0561c532015-03-12 10:28:24 +0100863 # Wrapper to hide minor differences between os.walk and os.fwalk
864 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200865 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200866 if 'follow_symlinks' in kwargs:
867 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200868 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100869
Charles-François Natali7372b062012-02-05 15:15:38 +0100870 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100871 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100872 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000873
874 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000875 # TESTFN/
876 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000877 # tmp1
878 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000879 # tmp2
880 # SUB11/ no kids
881 # SUB2/ a file kid and a dirsymlink kid
882 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300883 # SUB21/ not readable
884 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000885 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200886 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300887 # broken_link2
888 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000889 # TEST2/
890 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100891 self.walk_path = join(support.TESTFN, "TEST1")
892 self.sub1_path = join(self.walk_path, "SUB1")
893 self.sub11_path = join(self.sub1_path, "SUB11")
894 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300895 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100896 tmp1_path = join(self.walk_path, "tmp1")
897 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000898 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300899 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100900 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000901 t2_path = join(support.TESTFN, "TEST2")
902 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200903 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300904 broken_link2_path = join(sub2_path, "broken_link2")
905 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000906
907 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100908 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000909 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300910 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000911 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100912
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300913 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100914 with open(path, "x") as f:
915 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000916
Victor Stinner0561c532015-03-12 10:28:24 +0100917 if support.can_symlink():
918 os.symlink(os.path.abspath(t2_path), self.link_path)
919 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300920 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
921 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300922 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300923 ["broken_link", "broken_link2", "broken_link3",
924 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100925 else:
pxinwr3e028b22019-02-15 13:04:47 +0800926 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100927
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300928 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300929 try:
930 os.listdir(sub21_path)
931 except PermissionError:
932 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
933 else:
934 os.chmod(sub21_path, stat.S_IRWXU)
935 os.unlink(tmp5_path)
936 os.rmdir(sub21_path)
937 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300938
Victor Stinner0561c532015-03-12 10:28:24 +0100939 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000940 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300941 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100942
Tim Petersc4e09402003-04-25 07:11:48 +0000943 self.assertEqual(len(all), 4)
944 # We can't know which order SUB1 and SUB2 will appear in.
945 # Not flipped: TESTFN, SUB1, SUB11, SUB2
946 # flipped: TESTFN, SUB2, SUB1, SUB11
947 flipped = all[0][1][0] != "SUB1"
948 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200949 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300950 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100951 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
952 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
953 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
954 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000955
Brett Cannon3f9183b2016-08-26 14:44:48 -0700956 def test_walk_prune(self, walk_path=None):
957 if walk_path is None:
958 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000959 # Prune the search.
960 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700961 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000962 all.append((root, dirs, files))
963 # Don't descend into SUB1.
964 if 'SUB1' in dirs:
965 # Note that this also mutates the dirs we appended to all!
966 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000967
Victor Stinner0561c532015-03-12 10:28:24 +0100968 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200969 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100970
971 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300972 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100973 self.assertEqual(all[1], self.sub2_tree)
974
Brett Cannon3f9183b2016-08-26 14:44:48 -0700975 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200976 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700977
Victor Stinner0561c532015-03-12 10:28:24 +0100978 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000979 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100980 all = list(self.walk(self.walk_path, topdown=False))
981
Victor Stinner53b0a412016-03-26 01:12:36 +0100982 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000983 # We can't know which order SUB1 and SUB2 will appear in.
984 # Not flipped: SUB11, SUB1, SUB2, TESTFN
985 # flipped: SUB2, SUB11, SUB1, TESTFN
986 flipped = all[3][1][0] != "SUB1"
987 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200988 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300989 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100990 self.assertEqual(all[3],
991 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
992 self.assertEqual(all[flipped],
993 (self.sub11_path, [], []))
994 self.assertEqual(all[flipped + 1],
995 (self.sub1_path, ["SUB11"], ["tmp2"]))
996 self.assertEqual(all[2 - 2 * flipped],
997 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000998
Victor Stinner0561c532015-03-12 10:28:24 +0100999 def test_walk_symlink(self):
1000 if not support.can_symlink():
1001 self.skipTest("need symlink support")
1002
1003 # Walk, following symlinks.
1004 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1005 for root, dirs, files in walk_it:
1006 if root == self.link_path:
1007 self.assertEqual(dirs, [])
1008 self.assertEqual(files, ["tmp4"])
1009 break
1010 else:
1011 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001012
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001013 def test_walk_bad_dir(self):
1014 # Walk top-down.
1015 errors = []
1016 walk_it = self.walk(self.walk_path, onerror=errors.append)
1017 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001018 self.assertEqual(errors, [])
1019 dir1 = 'SUB1'
1020 path1 = os.path.join(root, dir1)
1021 path1new = os.path.join(root, dir1 + '.new')
1022 os.rename(path1, path1new)
1023 try:
1024 roots = [r for r, d, f in walk_it]
1025 self.assertTrue(errors)
1026 self.assertNotIn(path1, roots)
1027 self.assertNotIn(path1new, roots)
1028 for dir2 in dirs:
1029 if dir2 != dir1:
1030 self.assertIn(os.path.join(root, dir2), roots)
1031 finally:
1032 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001033
Charles-François Natali7372b062012-02-05 15:15:38 +01001034
1035@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1036class FwalkTests(WalkTests):
1037 """Tests for os.fwalk()."""
1038
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001039 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001040 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001041 yield (root, dirs, files)
1042
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001043 def fwalk(self, *args, **kwargs):
1044 return os.fwalk(*args, **kwargs)
1045
Larry Hastingsc48fe982012-06-25 04:49:05 -07001046 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1047 """
1048 compare with walk() results.
1049 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001050 walk_kwargs = walk_kwargs.copy()
1051 fwalk_kwargs = fwalk_kwargs.copy()
1052 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1053 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1054 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001055
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001057 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001058 expected[root] = (set(dirs), set(files))
1059
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001060 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001061 self.assertIn(root, expected)
1062 self.assertEqual(expected[root], (set(dirs), set(files)))
1063
Larry Hastingsc48fe982012-06-25 04:49:05 -07001064 def test_compare_to_walk(self):
1065 kwargs = {'top': support.TESTFN}
1066 self._compare_to_walk(kwargs, kwargs)
1067
Charles-François Natali7372b062012-02-05 15:15:38 +01001068 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001069 try:
1070 fd = os.open(".", os.O_RDONLY)
1071 walk_kwargs = {'top': support.TESTFN}
1072 fwalk_kwargs = walk_kwargs.copy()
1073 fwalk_kwargs['dir_fd'] = fd
1074 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1075 finally:
1076 os.close(fd)
1077
1078 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001079 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001080 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1081 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001082 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001083 # check that the FD is valid
1084 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001085 # redundant check
1086 os.stat(rootfd)
1087 # check that listdir() returns consistent information
1088 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001089
1090 def test_fd_leak(self):
1091 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1092 # we both check that calling fwalk() a large number of times doesn't
1093 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1094 minfd = os.dup(1)
1095 os.close(minfd)
1096 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001097 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001098 pass
1099 newfd = os.dup(1)
1100 self.addCleanup(os.close, newfd)
1101 self.assertEqual(newfd, minfd)
1102
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001103class BytesWalkTests(WalkTests):
1104 """Tests for os.walk() with bytes."""
1105 def walk(self, top, **kwargs):
1106 if 'follow_symlinks' in kwargs:
1107 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1108 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1109 root = os.fsdecode(broot)
1110 dirs = list(map(os.fsdecode, bdirs))
1111 files = list(map(os.fsdecode, bfiles))
1112 yield (root, dirs, files)
1113 bdirs[:] = list(map(os.fsencode, dirs))
1114 bfiles[:] = list(map(os.fsencode, files))
1115
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001116@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1117class BytesFwalkTests(FwalkTests):
1118 """Tests for os.walk() with bytes."""
1119 def fwalk(self, top='.', *args, **kwargs):
1120 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1121 root = os.fsdecode(broot)
1122 dirs = list(map(os.fsdecode, bdirs))
1123 files = list(map(os.fsdecode, bfiles))
1124 yield (root, dirs, files, topfd)
1125 bdirs[:] = list(map(os.fsencode, dirs))
1126 bfiles[:] = list(map(os.fsencode, files))
1127
Charles-François Natali7372b062012-02-05 15:15:38 +01001128
Guido van Rossume7ba4952007-06-06 23:52:48 +00001129class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001130 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001131 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001132
1133 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001134 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001135 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1136 os.makedirs(path) # Should work
1137 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1138 os.makedirs(path)
1139
1140 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001141 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001142 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1143 os.makedirs(path)
1144 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1145 'dir5', 'dir6')
1146 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001147
Serhiy Storchakae304e332017-03-24 13:27:42 +02001148 def test_mode(self):
1149 with support.temp_umask(0o002):
1150 base = support.TESTFN
1151 parent = os.path.join(base, 'dir1')
1152 path = os.path.join(parent, 'dir2')
1153 os.makedirs(path, 0o555)
1154 self.assertTrue(os.path.exists(path))
1155 self.assertTrue(os.path.isdir(path))
1156 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001157 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1158 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001159
Terry Reedy5a22b652010-12-02 07:05:56 +00001160 def test_exist_ok_existing_directory(self):
1161 path = os.path.join(support.TESTFN, 'dir1')
1162 mode = 0o777
1163 old_mask = os.umask(0o022)
1164 os.makedirs(path, mode)
1165 self.assertRaises(OSError, os.makedirs, path, mode)
1166 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001167 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001168 os.makedirs(path, mode=mode, exist_ok=True)
1169 os.umask(old_mask)
1170
Martin Pantera82642f2015-11-19 04:48:44 +00001171 # Issue #25583: A drive root could raise PermissionError on Windows
1172 os.makedirs(os.path.abspath('/'), exist_ok=True)
1173
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001174 def test_exist_ok_s_isgid_directory(self):
1175 path = os.path.join(support.TESTFN, 'dir1')
1176 S_ISGID = stat.S_ISGID
1177 mode = 0o777
1178 old_mask = os.umask(0o022)
1179 try:
1180 existing_testfn_mode = stat.S_IMODE(
1181 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001182 try:
1183 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001184 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001185 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001186 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1187 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1188 # The os should apply S_ISGID from the parent dir for us, but
1189 # this test need not depend on that behavior. Be explicit.
1190 os.makedirs(path, mode | S_ISGID)
1191 # http://bugs.python.org/issue14992
1192 # Should not fail when the bit is already set.
1193 os.makedirs(path, mode, exist_ok=True)
1194 # remove the bit.
1195 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001196 # May work even when the bit is not already set when demanded.
1197 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001198 finally:
1199 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001200
1201 def test_exist_ok_existing_regular_file(self):
1202 base = support.TESTFN
1203 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001204 with open(path, 'w') as f:
1205 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001206 self.assertRaises(OSError, os.makedirs, path)
1207 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1208 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1209 os.remove(path)
1210
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001211 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001212 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001213 'dir4', 'dir5', 'dir6')
1214 # If the tests failed, the bottom-most directory ('../dir6')
1215 # may not have been created, so we look for the outermost directory
1216 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001217 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001218 path = os.path.dirname(path)
1219
1220 os.removedirs(path)
1221
Andrew Svetlov405faed2012-12-25 12:18:09 +02001222
R David Murrayf2ad1732014-12-25 18:36:56 -05001223@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1224class ChownFileTests(unittest.TestCase):
1225
Berker Peksag036a71b2015-07-21 09:29:48 +03001226 @classmethod
1227 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001228 os.mkdir(support.TESTFN)
1229
1230 def test_chown_uid_gid_arguments_must_be_index(self):
1231 stat = os.stat(support.TESTFN)
1232 uid = stat.st_uid
1233 gid = stat.st_gid
1234 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1235 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1236 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1237 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1238 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1239
1240 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1241 def test_chown(self):
1242 gid_1, gid_2 = groups[:2]
1243 uid = os.stat(support.TESTFN).st_uid
1244 os.chown(support.TESTFN, uid, gid_1)
1245 gid = os.stat(support.TESTFN).st_gid
1246 self.assertEqual(gid, gid_1)
1247 os.chown(support.TESTFN, uid, gid_2)
1248 gid = os.stat(support.TESTFN).st_gid
1249 self.assertEqual(gid, gid_2)
1250
1251 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1252 "test needs root privilege and more than one user")
1253 def test_chown_with_root(self):
1254 uid_1, uid_2 = all_users[:2]
1255 gid = os.stat(support.TESTFN).st_gid
1256 os.chown(support.TESTFN, uid_1, gid)
1257 uid = os.stat(support.TESTFN).st_uid
1258 self.assertEqual(uid, uid_1)
1259 os.chown(support.TESTFN, uid_2, gid)
1260 uid = os.stat(support.TESTFN).st_uid
1261 self.assertEqual(uid, uid_2)
1262
1263 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1264 "test needs non-root account and more than one user")
1265 def test_chown_without_permission(self):
1266 uid_1, uid_2 = all_users[:2]
1267 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001268 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001269 os.chown(support.TESTFN, uid_1, gid)
1270 os.chown(support.TESTFN, uid_2, gid)
1271
Berker Peksag036a71b2015-07-21 09:29:48 +03001272 @classmethod
1273 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001274 os.rmdir(support.TESTFN)
1275
1276
Andrew Svetlov405faed2012-12-25 12:18:09 +02001277class RemoveDirsTests(unittest.TestCase):
1278 def setUp(self):
1279 os.makedirs(support.TESTFN)
1280
1281 def tearDown(self):
1282 support.rmtree(support.TESTFN)
1283
1284 def test_remove_all(self):
1285 dira = os.path.join(support.TESTFN, 'dira')
1286 os.mkdir(dira)
1287 dirb = os.path.join(dira, 'dirb')
1288 os.mkdir(dirb)
1289 os.removedirs(dirb)
1290 self.assertFalse(os.path.exists(dirb))
1291 self.assertFalse(os.path.exists(dira))
1292 self.assertFalse(os.path.exists(support.TESTFN))
1293
1294 def test_remove_partial(self):
1295 dira = os.path.join(support.TESTFN, 'dira')
1296 os.mkdir(dira)
1297 dirb = os.path.join(dira, 'dirb')
1298 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001299 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001300 os.removedirs(dirb)
1301 self.assertFalse(os.path.exists(dirb))
1302 self.assertTrue(os.path.exists(dira))
1303 self.assertTrue(os.path.exists(support.TESTFN))
1304
1305 def test_remove_nothing(self):
1306 dira = os.path.join(support.TESTFN, 'dira')
1307 os.mkdir(dira)
1308 dirb = os.path.join(dira, 'dirb')
1309 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001310 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001311 with self.assertRaises(OSError):
1312 os.removedirs(dirb)
1313 self.assertTrue(os.path.exists(dirb))
1314 self.assertTrue(os.path.exists(dira))
1315 self.assertTrue(os.path.exists(support.TESTFN))
1316
1317
Guido van Rossume7ba4952007-06-06 23:52:48 +00001318class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001319 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001320 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001321 f.write(b'hello')
1322 f.close()
1323 with open(os.devnull, 'rb') as f:
1324 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001325
Andrew Svetlov405faed2012-12-25 12:18:09 +02001326
Guido van Rossume7ba4952007-06-06 23:52:48 +00001327class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001328 def test_urandom_length(self):
1329 self.assertEqual(len(os.urandom(0)), 0)
1330 self.assertEqual(len(os.urandom(1)), 1)
1331 self.assertEqual(len(os.urandom(10)), 10)
1332 self.assertEqual(len(os.urandom(100)), 100)
1333 self.assertEqual(len(os.urandom(1000)), 1000)
1334
1335 def test_urandom_value(self):
1336 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001337 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001338 data2 = os.urandom(16)
1339 self.assertNotEqual(data1, data2)
1340
1341 def get_urandom_subprocess(self, count):
1342 code = '\n'.join((
1343 'import os, sys',
1344 'data = os.urandom(%s)' % count,
1345 'sys.stdout.buffer.write(data)',
1346 'sys.stdout.buffer.flush()'))
1347 out = assert_python_ok('-c', code)
1348 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001349 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001350 return stdout
1351
1352 def test_urandom_subprocess(self):
1353 data1 = self.get_urandom_subprocess(16)
1354 data2 = self.get_urandom_subprocess(16)
1355 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001356
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001357
Victor Stinner9b1f4742016-09-06 16:18:52 -07001358@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1359class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001360 @classmethod
1361 def setUpClass(cls):
1362 try:
1363 os.getrandom(1)
1364 except OSError as exc:
1365 if exc.errno == errno.ENOSYS:
1366 # Python compiled on a more recent Linux version
1367 # than the current Linux kernel
1368 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1369 else:
1370 raise
1371
Victor Stinner9b1f4742016-09-06 16:18:52 -07001372 def test_getrandom_type(self):
1373 data = os.getrandom(16)
1374 self.assertIsInstance(data, bytes)
1375 self.assertEqual(len(data), 16)
1376
1377 def test_getrandom0(self):
1378 empty = os.getrandom(0)
1379 self.assertEqual(empty, b'')
1380
1381 def test_getrandom_random(self):
1382 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1383
1384 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1385 # resource /dev/random
1386
1387 def test_getrandom_nonblock(self):
1388 # The call must not fail. Check also that the flag exists
1389 try:
1390 os.getrandom(1, os.GRND_NONBLOCK)
1391 except BlockingIOError:
1392 # System urandom is not initialized yet
1393 pass
1394
1395 def test_getrandom_value(self):
1396 data1 = os.getrandom(16)
1397 data2 = os.getrandom(16)
1398 self.assertNotEqual(data1, data2)
1399
1400
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001401# os.urandom() doesn't use a file descriptor when it is implemented with the
1402# getentropy() function, the getrandom() function or the getrandom() syscall
1403OS_URANDOM_DONT_USE_FD = (
1404 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1405 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1406 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001407
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001408@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1409 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001410@unittest.skipIf(sys.platform == "vxworks",
1411 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001412class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001413 @unittest.skipUnless(resource, "test requires the resource module")
1414 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001415 # Check urandom() failing when it is not able to open /dev/random.
1416 # We spawn a new process to make the test more robust (if getrlimit()
1417 # failed to restore the file descriptor limit after this, the whole
1418 # test suite would crash; this actually happened on the OS X Tiger
1419 # buildbot).
1420 code = """if 1:
1421 import errno
1422 import os
1423 import resource
1424
1425 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1426 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1427 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001428 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001429 except OSError as e:
1430 assert e.errno == errno.EMFILE, e.errno
1431 else:
1432 raise AssertionError("OSError not raised")
1433 """
1434 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001435
Antoine Pitroue472aea2014-04-26 14:33:03 +02001436 def test_urandom_fd_closed(self):
1437 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1438 # closed.
1439 code = """if 1:
1440 import os
1441 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001442 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001443 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001444 with test.support.SuppressCrashReport():
1445 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001446 sys.stdout.buffer.write(os.urandom(4))
1447 """
1448 rc, out, err = assert_python_ok('-Sc', code)
1449
1450 def test_urandom_fd_reopened(self):
1451 # Issue #21207: urandom() should detect its fd to /dev/urandom
1452 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001453 self.addCleanup(support.unlink, support.TESTFN)
1454 create_file(support.TESTFN, b"x" * 256)
1455
Antoine Pitroue472aea2014-04-26 14:33:03 +02001456 code = """if 1:
1457 import os
1458 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001459 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001460 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001461 with test.support.SuppressCrashReport():
1462 for fd in range(3, 256):
1463 try:
1464 os.close(fd)
1465 except OSError:
1466 pass
1467 else:
1468 # Found the urandom fd (XXX hopefully)
1469 break
1470 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001471 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001472 new_fd = f.fileno()
1473 # Issue #26935: posix allows new_fd and fd to be equal but
1474 # some libc implementations have dup2 return an error in this
1475 # case.
1476 if new_fd != fd:
1477 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001478 sys.stdout.buffer.write(os.urandom(4))
1479 sys.stdout.buffer.write(os.urandom(4))
1480 """.format(TESTFN=support.TESTFN)
1481 rc, out, err = assert_python_ok('-Sc', code)
1482 self.assertEqual(len(out), 8)
1483 self.assertNotEqual(out[0:4], out[4:8])
1484 rc, out2, err2 = assert_python_ok('-Sc', code)
1485 self.assertEqual(len(out2), 8)
1486 self.assertNotEqual(out2, out)
1487
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001488
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001489@contextlib.contextmanager
1490def _execvpe_mockup(defpath=None):
1491 """
1492 Stubs out execv and execve functions when used as context manager.
1493 Records exec calls. The mock execv and execve functions always raise an
1494 exception as they would normally never return.
1495 """
1496 # A list of tuples containing (function name, first arg, args)
1497 # of calls to execv or execve that have been made.
1498 calls = []
1499
1500 def mock_execv(name, *args):
1501 calls.append(('execv', name, args))
1502 raise RuntimeError("execv called")
1503
1504 def mock_execve(name, *args):
1505 calls.append(('execve', name, args))
1506 raise OSError(errno.ENOTDIR, "execve called")
1507
1508 try:
1509 orig_execv = os.execv
1510 orig_execve = os.execve
1511 orig_defpath = os.defpath
1512 os.execv = mock_execv
1513 os.execve = mock_execve
1514 if defpath is not None:
1515 os.defpath = defpath
1516 yield calls
1517 finally:
1518 os.execv = orig_execv
1519 os.execve = orig_execve
1520 os.defpath = orig_defpath
1521
pxinwrf2d7ac72019-05-21 18:46:37 +08001522@unittest.skipUnless(hasattr(os, 'execv'),
1523 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001524class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001525 @unittest.skipIf(USING_LINUXTHREADS,
1526 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001527 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001528 self.assertRaises(OSError, os.execvpe, 'no such app-',
1529 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001530
Steve Dowerbce26262016-11-19 19:17:26 -08001531 def test_execv_with_bad_arglist(self):
1532 self.assertRaises(ValueError, os.execv, 'notepad', ())
1533 self.assertRaises(ValueError, os.execv, 'notepad', [])
1534 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1535 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1536
Thomas Heller6790d602007-08-30 17:15:14 +00001537 def test_execvpe_with_bad_arglist(self):
1538 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001539 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1540 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001541
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001542 @unittest.skipUnless(hasattr(os, '_execvpe'),
1543 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001544 def _test_internal_execvpe(self, test_type):
1545 program_path = os.sep + 'absolutepath'
1546 if test_type is bytes:
1547 program = b'executable'
1548 fullpath = os.path.join(os.fsencode(program_path), program)
1549 native_fullpath = fullpath
1550 arguments = [b'progname', 'arg1', 'arg2']
1551 else:
1552 program = 'executable'
1553 arguments = ['progname', 'arg1', 'arg2']
1554 fullpath = os.path.join(program_path, program)
1555 if os.name != "nt":
1556 native_fullpath = os.fsencode(fullpath)
1557 else:
1558 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001559 env = {'spam': 'beans'}
1560
Victor Stinnerb745a742010-05-18 17:17:23 +00001561 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001562 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001563 self.assertRaises(RuntimeError,
1564 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001565 self.assertEqual(len(calls), 1)
1566 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1567
Victor Stinnerb745a742010-05-18 17:17:23 +00001568 # test os._execvpe() with a relative path:
1569 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001570 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001571 self.assertRaises(OSError,
1572 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001573 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001574 self.assertSequenceEqual(calls[0],
1575 ('execve', native_fullpath, (arguments, env)))
1576
1577 # test os._execvpe() with a relative path:
1578 # os.get_exec_path() reads the 'PATH' variable
1579 with _execvpe_mockup() as calls:
1580 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001581 if test_type is bytes:
1582 env_path[b'PATH'] = program_path
1583 else:
1584 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001585 self.assertRaises(OSError,
1586 os._execvpe, program, arguments, env=env_path)
1587 self.assertEqual(len(calls), 1)
1588 self.assertSequenceEqual(calls[0],
1589 ('execve', native_fullpath, (arguments, env_path)))
1590
1591 def test_internal_execvpe_str(self):
1592 self._test_internal_execvpe(str)
1593 if os.name != "nt":
1594 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001595
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001596 def test_execve_invalid_env(self):
1597 args = [sys.executable, '-c', 'pass']
1598
Ville Skyttä49b27342017-08-03 09:00:59 +03001599 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001600 newenv = os.environ.copy()
1601 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1602 with self.assertRaises(ValueError):
1603 os.execve(args[0], args, newenv)
1604
Ville Skyttä49b27342017-08-03 09:00:59 +03001605 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001606 newenv = os.environ.copy()
1607 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1608 with self.assertRaises(ValueError):
1609 os.execve(args[0], args, newenv)
1610
Ville Skyttä49b27342017-08-03 09:00:59 +03001611 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001612 newenv = os.environ.copy()
1613 newenv["FRUIT=ORANGE"] = "lemon"
1614 with self.assertRaises(ValueError):
1615 os.execve(args[0], args, newenv)
1616
Alexey Izbyshev83460312018-10-20 03:28:22 +03001617 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1618 def test_execve_with_empty_path(self):
1619 # bpo-32890: Check GetLastError() misuse
1620 try:
1621 os.execve('', ['arg'], {})
1622 except OSError as e:
1623 self.assertTrue(e.winerror is None or e.winerror != 0)
1624 else:
1625 self.fail('No OSError raised')
1626
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001627
Serhiy Storchaka43767632013-11-03 21:31:38 +02001628@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001629class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001630 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001631 try:
1632 os.stat(support.TESTFN)
1633 except FileNotFoundError:
1634 exists = False
1635 except OSError as exc:
1636 exists = True
1637 self.fail("file %s must not exist; os.stat failed with %s"
1638 % (support.TESTFN, exc))
1639 else:
1640 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001641
Thomas Wouters477c8d52006-05-27 19:21:47 +00001642 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001643 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001644
1645 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001646 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001647
1648 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001649 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001650
1651 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001652 self.addCleanup(support.unlink, support.TESTFN)
1653
Victor Stinnere77c9742016-03-25 10:28:23 +01001654 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001655 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001656
1657 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001658 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001659
Thomas Wouters477c8d52006-05-27 19:21:47 +00001660 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001661 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001662
Victor Stinnere77c9742016-03-25 10:28:23 +01001663
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001665 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1667 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001668 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001669 def get_single(f):
1670 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001671 if hasattr(os, f):
1672 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001673 return helper
1674 for f in singles:
1675 locals()["test_"+f] = get_single(f)
1676
Benjamin Peterson7522c742009-01-19 21:00:09 +00001677 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001678 try:
1679 f(support.make_bad_fd(), *args)
1680 except OSError as e:
1681 self.assertEqual(e.errno, errno.EBADF)
1682 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001683 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001684 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001685
Serhiy Storchaka43767632013-11-03 21:31:38 +02001686 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001687 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001688 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001689
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001691 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001692 fd = support.make_bad_fd()
1693 # Make sure none of the descriptors we are about to close are
1694 # currently valid (issue 6542).
1695 for i in range(10):
1696 try: os.fstat(fd+i)
1697 except OSError:
1698 pass
1699 else:
1700 break
1701 if i < 2:
1702 raise unittest.SkipTest(
1703 "Unable to acquire a range of invalid file descriptors")
1704 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001705
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001707 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001709
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001711 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001712 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001713
Serhiy Storchaka43767632013-11-03 21:31:38 +02001714 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001715 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001716 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001717
Serhiy Storchaka43767632013-11-03 21:31:38 +02001718 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001719 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001720 self.check(os.pathconf, "PC_NAME_MAX")
1721 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001722
Serhiy Storchaka43767632013-11-03 21:31:38 +02001723 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001724 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001725 self.check(os.truncate, 0)
1726 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001727
Serhiy Storchaka43767632013-11-03 21:31:38 +02001728 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001729 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001730 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001731
Serhiy Storchaka43767632013-11-03 21:31:38 +02001732 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001733 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001734 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001735
Victor Stinner57ddf782014-01-08 15:21:28 +01001736 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1737 def test_readv(self):
1738 buf = bytearray(10)
1739 self.check(os.readv, [buf])
1740
Serhiy Storchaka43767632013-11-03 21:31:38 +02001741 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001742 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001743 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001744
Serhiy Storchaka43767632013-11-03 21:31:38 +02001745 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001746 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001747 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001748
Victor Stinner57ddf782014-01-08 15:21:28 +01001749 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1750 def test_writev(self):
1751 self.check(os.writev, [b'abc'])
1752
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001753 def test_inheritable(self):
1754 self.check(os.get_inheritable)
1755 self.check(os.set_inheritable, True)
1756
1757 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1758 'needs os.get_blocking() and os.set_blocking()')
1759 def test_blocking(self):
1760 self.check(os.get_blocking)
1761 self.check(os.set_blocking, True)
1762
Brian Curtin1b9df392010-11-24 20:24:31 +00001763
1764class LinkTests(unittest.TestCase):
1765 def setUp(self):
1766 self.file1 = support.TESTFN
1767 self.file2 = os.path.join(support.TESTFN + "2")
1768
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001769 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001770 for file in (self.file1, self.file2):
1771 if os.path.exists(file):
1772 os.unlink(file)
1773
Brian Curtin1b9df392010-11-24 20:24:31 +00001774 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001775 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001776
xdegaye6a55d092017-11-12 17:57:04 +01001777 try:
1778 os.link(file1, file2)
1779 except PermissionError as e:
1780 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001781 with open(file1, "r") as f1, open(file2, "r") as f2:
1782 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1783
1784 def test_link(self):
1785 self._test_link(self.file1, self.file2)
1786
1787 def test_link_bytes(self):
1788 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1789 bytes(self.file2, sys.getfilesystemencoding()))
1790
Brian Curtinf498b752010-11-30 15:54:04 +00001791 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001792 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001793 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001794 except UnicodeError:
1795 raise unittest.SkipTest("Unable to encode for this platform.")
1796
Brian Curtinf498b752010-11-30 15:54:04 +00001797 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001798 self.file2 = self.file1 + "2"
1799 self._test_link(self.file1, self.file2)
1800
Serhiy Storchaka43767632013-11-03 21:31:38 +02001801@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1802class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001803 # uid_t and gid_t are 32-bit unsigned integers on Linux
1804 UID_OVERFLOW = (1 << 32)
1805 GID_OVERFLOW = (1 << 32)
1806
Serhiy Storchaka43767632013-11-03 21:31:38 +02001807 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1808 def test_setuid(self):
1809 if os.getuid() != 0:
1810 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001811 self.assertRaises(TypeError, os.setuid, 'not an int')
1812 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001813
Serhiy Storchaka43767632013-11-03 21:31:38 +02001814 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1815 def test_setgid(self):
1816 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1817 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001818 self.assertRaises(TypeError, os.setgid, 'not an int')
1819 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001820
Serhiy Storchaka43767632013-11-03 21:31:38 +02001821 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1822 def test_seteuid(self):
1823 if os.getuid() != 0:
1824 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001825 self.assertRaises(TypeError, os.setegid, 'not an int')
1826 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001827
Serhiy Storchaka43767632013-11-03 21:31:38 +02001828 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1829 def test_setegid(self):
1830 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1831 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001832 self.assertRaises(TypeError, os.setegid, 'not an int')
1833 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001834
Serhiy Storchaka43767632013-11-03 21:31:38 +02001835 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1836 def test_setreuid(self):
1837 if os.getuid() != 0:
1838 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001839 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1840 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1841 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1842 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001843
Serhiy Storchaka43767632013-11-03 21:31:38 +02001844 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1845 def test_setreuid_neg1(self):
1846 # Needs to accept -1. We run this in a subprocess to avoid
1847 # altering the test runner's process state (issue8045).
1848 subprocess.check_call([
1849 sys.executable, '-c',
1850 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001851
Serhiy Storchaka43767632013-11-03 21:31:38 +02001852 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1853 def test_setregid(self):
1854 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1855 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001856 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
1857 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
1858 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
1859 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001860
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1862 def test_setregid_neg1(self):
1863 # Needs to accept -1. We run this in a subprocess to avoid
1864 # altering the test runner's process state (issue8045).
1865 subprocess.check_call([
1866 sys.executable, '-c',
1867 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001868
Serhiy Storchaka43767632013-11-03 21:31:38 +02001869@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1870class Pep383Tests(unittest.TestCase):
1871 def setUp(self):
1872 if support.TESTFN_UNENCODABLE:
1873 self.dir = support.TESTFN_UNENCODABLE
1874 elif support.TESTFN_NONASCII:
1875 self.dir = support.TESTFN_NONASCII
1876 else:
1877 self.dir = support.TESTFN
1878 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001879
Serhiy Storchaka43767632013-11-03 21:31:38 +02001880 bytesfn = []
1881 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001882 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001883 fn = os.fsencode(fn)
1884 except UnicodeEncodeError:
1885 return
1886 bytesfn.append(fn)
1887 add_filename(support.TESTFN_UNICODE)
1888 if support.TESTFN_UNENCODABLE:
1889 add_filename(support.TESTFN_UNENCODABLE)
1890 if support.TESTFN_NONASCII:
1891 add_filename(support.TESTFN_NONASCII)
1892 if not bytesfn:
1893 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001894
Serhiy Storchaka43767632013-11-03 21:31:38 +02001895 self.unicodefn = set()
1896 os.mkdir(self.dir)
1897 try:
1898 for fn in bytesfn:
1899 support.create_empty_file(os.path.join(self.bdir, fn))
1900 fn = os.fsdecode(fn)
1901 if fn in self.unicodefn:
1902 raise ValueError("duplicate filename")
1903 self.unicodefn.add(fn)
1904 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001905 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001906 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001907
Serhiy Storchaka43767632013-11-03 21:31:38 +02001908 def tearDown(self):
1909 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001910
Serhiy Storchaka43767632013-11-03 21:31:38 +02001911 def test_listdir(self):
1912 expected = self.unicodefn
1913 found = set(os.listdir(self.dir))
1914 self.assertEqual(found, expected)
1915 # test listdir without arguments
1916 current_directory = os.getcwd()
1917 try:
1918 os.chdir(os.sep)
1919 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1920 finally:
1921 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001922
Serhiy Storchaka43767632013-11-03 21:31:38 +02001923 def test_open(self):
1924 for fn in self.unicodefn:
1925 f = open(os.path.join(self.dir, fn), 'rb')
1926 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001927
Serhiy Storchaka43767632013-11-03 21:31:38 +02001928 @unittest.skipUnless(hasattr(os, 'statvfs'),
1929 "need os.statvfs()")
1930 def test_statvfs(self):
1931 # issue #9645
1932 for fn in self.unicodefn:
1933 # should not fail with file not found error
1934 fullname = os.path.join(self.dir, fn)
1935 os.statvfs(fullname)
1936
1937 def test_stat(self):
1938 for fn in self.unicodefn:
1939 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001940
Brian Curtineb24d742010-04-12 17:16:38 +00001941@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1942class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001943 def _kill(self, sig):
1944 # Start sys.executable as a subprocess and communicate from the
1945 # subprocess to the parent that the interpreter is ready. When it
1946 # becomes ready, send *sig* via os.kill to the subprocess and check
1947 # that the return code is equal to *sig*.
1948 import ctypes
1949 from ctypes import wintypes
1950 import msvcrt
1951
1952 # Since we can't access the contents of the process' stdout until the
1953 # process has exited, use PeekNamedPipe to see what's inside stdout
1954 # without waiting. This is done so we can tell that the interpreter
1955 # is started and running at a point where it could handle a signal.
1956 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1957 PeekNamedPipe.restype = wintypes.BOOL
1958 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1959 ctypes.POINTER(ctypes.c_char), # stdout buf
1960 wintypes.DWORD, # Buffer size
1961 ctypes.POINTER(wintypes.DWORD), # bytes read
1962 ctypes.POINTER(wintypes.DWORD), # bytes avail
1963 ctypes.POINTER(wintypes.DWORD)) # bytes left
1964 msg = "running"
1965 proc = subprocess.Popen([sys.executable, "-c",
1966 "import sys;"
1967 "sys.stdout.write('{}');"
1968 "sys.stdout.flush();"
1969 "input()".format(msg)],
1970 stdout=subprocess.PIPE,
1971 stderr=subprocess.PIPE,
1972 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001973 self.addCleanup(proc.stdout.close)
1974 self.addCleanup(proc.stderr.close)
1975 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001976
1977 count, max = 0, 100
1978 while count < max and proc.poll() is None:
1979 # Create a string buffer to store the result of stdout from the pipe
1980 buf = ctypes.create_string_buffer(len(msg))
1981 # Obtain the text currently in proc.stdout
1982 # Bytes read/avail/left are left as NULL and unused
1983 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1984 buf, ctypes.sizeof(buf), None, None, None)
1985 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1986 if buf.value:
1987 self.assertEqual(msg, buf.value.decode())
1988 break
1989 time.sleep(0.1)
1990 count += 1
1991 else:
1992 self.fail("Did not receive communication from the subprocess")
1993
Brian Curtineb24d742010-04-12 17:16:38 +00001994 os.kill(proc.pid, sig)
1995 self.assertEqual(proc.wait(), sig)
1996
1997 def test_kill_sigterm(self):
1998 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001999 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002000
2001 def test_kill_int(self):
2002 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002003 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002004
2005 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002006 tagname = "test_os_%s" % uuid.uuid1()
2007 m = mmap.mmap(-1, 1, tagname)
2008 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002009 # Run a script which has console control handling enabled.
2010 proc = subprocess.Popen([sys.executable,
2011 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002012 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002013 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2014 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002015 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002016 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002017 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002018 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002019 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002020 count += 1
2021 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002022 # Forcefully kill the process if we weren't able to signal it.
2023 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002024 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002025 os.kill(proc.pid, event)
2026 # proc.send_signal(event) could also be done here.
2027 # Allow time for the signal to be passed and the process to exit.
2028 time.sleep(0.5)
2029 if not proc.poll():
2030 # Forcefully kill the process if we weren't able to signal it.
2031 os.kill(proc.pid, signal.SIGINT)
2032 self.fail("subprocess did not stop on {}".format(name))
2033
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002034 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002035 def test_CTRL_C_EVENT(self):
2036 from ctypes import wintypes
2037 import ctypes
2038
2039 # Make a NULL value by creating a pointer with no argument.
2040 NULL = ctypes.POINTER(ctypes.c_int)()
2041 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2042 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2043 wintypes.BOOL)
2044 SetConsoleCtrlHandler.restype = wintypes.BOOL
2045
2046 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002047 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002048 # by subprocesses.
2049 SetConsoleCtrlHandler(NULL, 0)
2050
2051 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2052
2053 def test_CTRL_BREAK_EVENT(self):
2054 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2055
2056
Brian Curtind40e6f72010-07-08 21:39:08 +00002057@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002058class Win32ListdirTests(unittest.TestCase):
2059 """Test listdir on Windows."""
2060
2061 def setUp(self):
2062 self.created_paths = []
2063 for i in range(2):
2064 dir_name = 'SUB%d' % i
2065 dir_path = os.path.join(support.TESTFN, dir_name)
2066 file_name = 'FILE%d' % i
2067 file_path = os.path.join(support.TESTFN, file_name)
2068 os.makedirs(dir_path)
2069 with open(file_path, 'w') as f:
2070 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2071 self.created_paths.extend([dir_name, file_name])
2072 self.created_paths.sort()
2073
2074 def tearDown(self):
2075 shutil.rmtree(support.TESTFN)
2076
2077 def test_listdir_no_extended_path(self):
2078 """Test when the path is not an "extended" path."""
2079 # unicode
2080 self.assertEqual(
2081 sorted(os.listdir(support.TESTFN)),
2082 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002083
Tim Golden781bbeb2013-10-25 20:24:06 +01002084 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002085 self.assertEqual(
2086 sorted(os.listdir(os.fsencode(support.TESTFN))),
2087 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002088
2089 def test_listdir_extended_path(self):
2090 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002091 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002092 # unicode
2093 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2094 self.assertEqual(
2095 sorted(os.listdir(path)),
2096 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002097
Tim Golden781bbeb2013-10-25 20:24:06 +01002098 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002099 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2100 self.assertEqual(
2101 sorted(os.listdir(path)),
2102 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002103
2104
Berker Peksage0b5b202018-08-15 13:03:41 +03002105@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2106class ReadlinkTests(unittest.TestCase):
2107 filelink = 'readlinktest'
2108 filelink_target = os.path.abspath(__file__)
2109 filelinkb = os.fsencode(filelink)
2110 filelinkb_target = os.fsencode(filelink_target)
2111
2112 def setUp(self):
2113 self.assertTrue(os.path.exists(self.filelink_target))
2114 self.assertTrue(os.path.exists(self.filelinkb_target))
2115 self.assertFalse(os.path.exists(self.filelink))
2116 self.assertFalse(os.path.exists(self.filelinkb))
2117
2118 def test_not_symlink(self):
2119 filelink_target = FakePath(self.filelink_target)
2120 self.assertRaises(OSError, os.readlink, self.filelink_target)
2121 self.assertRaises(OSError, os.readlink, filelink_target)
2122
2123 def test_missing_link(self):
2124 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2125 self.assertRaises(FileNotFoundError, os.readlink,
2126 FakePath('missing-link'))
2127
2128 @support.skip_unless_symlink
2129 def test_pathlike(self):
2130 os.symlink(self.filelink_target, self.filelink)
2131 self.addCleanup(support.unlink, self.filelink)
2132 filelink = FakePath(self.filelink)
2133 self.assertEqual(os.readlink(filelink), self.filelink_target)
2134
2135 @support.skip_unless_symlink
2136 def test_pathlike_bytes(self):
2137 os.symlink(self.filelinkb_target, self.filelinkb)
2138 self.addCleanup(support.unlink, self.filelinkb)
2139 path = os.readlink(FakePath(self.filelinkb))
2140 self.assertEqual(path, self.filelinkb_target)
2141 self.assertIsInstance(path, bytes)
2142
2143 @support.skip_unless_symlink
2144 def test_bytes(self):
2145 os.symlink(self.filelinkb_target, self.filelinkb)
2146 self.addCleanup(support.unlink, self.filelinkb)
2147 path = os.readlink(self.filelinkb)
2148 self.assertEqual(path, self.filelinkb_target)
2149 self.assertIsInstance(path, bytes)
2150
2151
Tim Golden781bbeb2013-10-25 20:24:06 +01002152@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002153@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002154class Win32SymlinkTests(unittest.TestCase):
2155 filelink = 'filelinktest'
2156 filelink_target = os.path.abspath(__file__)
2157 dirlink = 'dirlinktest'
2158 dirlink_target = os.path.dirname(filelink_target)
2159 missing_link = 'missing link'
2160
2161 def setUp(self):
2162 assert os.path.exists(self.dirlink_target)
2163 assert os.path.exists(self.filelink_target)
2164 assert not os.path.exists(self.dirlink)
2165 assert not os.path.exists(self.filelink)
2166 assert not os.path.exists(self.missing_link)
2167
2168 def tearDown(self):
2169 if os.path.exists(self.filelink):
2170 os.remove(self.filelink)
2171 if os.path.exists(self.dirlink):
2172 os.rmdir(self.dirlink)
2173 if os.path.lexists(self.missing_link):
2174 os.remove(self.missing_link)
2175
2176 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002177 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002178 self.assertTrue(os.path.exists(self.dirlink))
2179 self.assertTrue(os.path.isdir(self.dirlink))
2180 self.assertTrue(os.path.islink(self.dirlink))
2181 self.check_stat(self.dirlink, self.dirlink_target)
2182
2183 def test_file_link(self):
2184 os.symlink(self.filelink_target, self.filelink)
2185 self.assertTrue(os.path.exists(self.filelink))
2186 self.assertTrue(os.path.isfile(self.filelink))
2187 self.assertTrue(os.path.islink(self.filelink))
2188 self.check_stat(self.filelink, self.filelink_target)
2189
2190 def _create_missing_dir_link(self):
2191 'Create a "directory" link to a non-existent target'
2192 linkname = self.missing_link
2193 if os.path.lexists(linkname):
2194 os.remove(linkname)
2195 target = r'c:\\target does not exist.29r3c740'
2196 assert not os.path.exists(target)
2197 target_is_dir = True
2198 os.symlink(target, linkname, target_is_dir)
2199
2200 def test_remove_directory_link_to_missing_target(self):
2201 self._create_missing_dir_link()
2202 # For compatibility with Unix, os.remove will check the
2203 # directory status and call RemoveDirectory if the symlink
2204 # was created with target_is_dir==True.
2205 os.remove(self.missing_link)
2206
2207 @unittest.skip("currently fails; consider for improvement")
2208 def test_isdir_on_directory_link_to_missing_target(self):
2209 self._create_missing_dir_link()
2210 # consider having isdir return true for directory links
2211 self.assertTrue(os.path.isdir(self.missing_link))
2212
2213 @unittest.skip("currently fails; consider for improvement")
2214 def test_rmdir_on_directory_link_to_missing_target(self):
2215 self._create_missing_dir_link()
2216 # consider allowing rmdir to remove directory links
2217 os.rmdir(self.missing_link)
2218
2219 def check_stat(self, link, target):
2220 self.assertEqual(os.stat(link), os.stat(target))
2221 self.assertNotEqual(os.lstat(link), os.stat(link))
2222
Brian Curtind25aef52011-06-13 15:16:04 -05002223 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002224 self.assertEqual(os.stat(bytes_link), os.stat(target))
2225 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002226
2227 def test_12084(self):
2228 level1 = os.path.abspath(support.TESTFN)
2229 level2 = os.path.join(level1, "level2")
2230 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002231 self.addCleanup(support.rmtree, level1)
2232
2233 os.mkdir(level1)
2234 os.mkdir(level2)
2235 os.mkdir(level3)
2236
2237 file1 = os.path.abspath(os.path.join(level1, "file1"))
2238 create_file(file1)
2239
2240 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002241 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002242 os.chdir(level2)
2243 link = os.path.join(level2, "link")
2244 os.symlink(os.path.relpath(file1), "link")
2245 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002246
Victor Stinnerae39d232016-03-24 17:12:55 +01002247 # Check os.stat calls from the same dir as the link
2248 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002249
Victor Stinnerae39d232016-03-24 17:12:55 +01002250 # Check os.stat calls from a dir below the link
2251 os.chdir(level1)
2252 self.assertEqual(os.stat(file1),
2253 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002254
Victor Stinnerae39d232016-03-24 17:12:55 +01002255 # Check os.stat calls from a dir above the link
2256 os.chdir(level3)
2257 self.assertEqual(os.stat(file1),
2258 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002259 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002260 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002261
SSE43c34aad2018-02-13 00:10:35 +07002262 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2263 and os.path.exists(r'C:\ProgramData'),
2264 'Test directories not found')
2265 def test_29248(self):
2266 # os.symlink() calls CreateSymbolicLink, which creates
2267 # the reparse data buffer with the print name stored
2268 # first, so the offset is always 0. CreateSymbolicLink
2269 # stores the "PrintName" DOS path (e.g. "C:\") first,
2270 # with an offset of 0, followed by the "SubstituteName"
2271 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2272 # the other hand, seems to have been created manually
2273 # with an inverted order.
2274 target = os.readlink(r'C:\Users\All Users')
2275 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2276
Steve Dower6921e732018-03-05 14:26:08 -08002277 def test_buffer_overflow(self):
2278 # Older versions would have a buffer overflow when detecting
2279 # whether a link source was a directory. This test ensures we
2280 # no longer crash, but does not otherwise validate the behavior
2281 segment = 'X' * 27
2282 path = os.path.join(*[segment] * 10)
2283 test_cases = [
2284 # overflow with absolute src
2285 ('\\' + path, segment),
2286 # overflow dest with relative src
2287 (segment, path),
2288 # overflow when joining src
2289 (path[:180], path[:180]),
2290 ]
2291 for src, dest in test_cases:
2292 try:
2293 os.symlink(src, dest)
2294 except FileNotFoundError:
2295 pass
2296 else:
2297 try:
2298 os.remove(dest)
2299 except OSError:
2300 pass
2301 # Also test with bytes, since that is a separate code path.
2302 try:
2303 os.symlink(os.fsencode(src), os.fsencode(dest))
2304 except FileNotFoundError:
2305 pass
2306 else:
2307 try:
2308 os.remove(dest)
2309 except OSError:
2310 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002311
Tim Golden0321cf22014-05-05 19:46:17 +01002312@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2313class Win32JunctionTests(unittest.TestCase):
2314 junction = 'junctiontest'
2315 junction_target = os.path.dirname(os.path.abspath(__file__))
2316
2317 def setUp(self):
2318 assert os.path.exists(self.junction_target)
2319 assert not os.path.exists(self.junction)
2320
2321 def tearDown(self):
2322 if os.path.exists(self.junction):
2323 # os.rmdir delegates to Windows' RemoveDirectoryW,
2324 # which removes junction points safely.
2325 os.rmdir(self.junction)
2326
2327 def test_create_junction(self):
2328 _winapi.CreateJunction(self.junction_target, self.junction)
2329 self.assertTrue(os.path.exists(self.junction))
2330 self.assertTrue(os.path.isdir(self.junction))
2331
2332 # Junctions are not recognized as links.
2333 self.assertFalse(os.path.islink(self.junction))
2334
2335 def test_unlink_removes_junction(self):
2336 _winapi.CreateJunction(self.junction_target, self.junction)
2337 self.assertTrue(os.path.exists(self.junction))
2338
2339 os.unlink(self.junction)
2340 self.assertFalse(os.path.exists(self.junction))
2341
Mark Becwarb82bfac2019-02-02 16:08:23 -05002342@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2343class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002344 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002345 nt = support.import_module('nt')
2346 ctypes = support.import_module('ctypes')
2347 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002348
2349 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2350 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2351
2352 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2353 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2354 ctypes.wintypes.LPDWORD)
2355
2356 # This is a pseudo-handle that doesn't need to be closed
2357 hproc = kernel.GetCurrentProcess()
2358
2359 handle_count = ctypes.wintypes.DWORD()
2360 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2361 self.assertEqual(1, ok)
2362
2363 before_count = handle_count.value
2364
2365 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002366 filenames = [
2367 r'\\?\C:',
2368 r'\\?\NUL',
2369 r'\\?\CONIN',
2370 __file__,
2371 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002372
Berker Peksag6ef726a2019-04-22 18:46:28 +03002373 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002374 for name in filenames:
2375 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002376 nt._getfinalpathname(name)
2377 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002378 # Failure is expected
2379 pass
2380 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002381 os.stat(name)
2382 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002383 pass
2384
2385 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2386 self.assertEqual(1, ok)
2387
2388 handle_delta = handle_count.value - before_count
2389
2390 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002391
Jason R. Coombs3a092862013-05-27 23:21:28 -04002392@support.skip_unless_symlink
2393class NonLocalSymlinkTests(unittest.TestCase):
2394
2395 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002396 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002397 Create this structure:
2398
2399 base
2400 \___ some_dir
2401 """
2402 os.makedirs('base/some_dir')
2403
2404 def tearDown(self):
2405 shutil.rmtree('base')
2406
2407 def test_directory_link_nonlocal(self):
2408 """
2409 The symlink target should resolve relative to the link, not relative
2410 to the current directory.
2411
2412 Then, link base/some_link -> base/some_dir and ensure that some_link
2413 is resolved as a directory.
2414
2415 In issue13772, it was discovered that directory detection failed if
2416 the symlink target was not specified relative to the current
2417 directory, which was a defect in the implementation.
2418 """
2419 src = os.path.join('base', 'some_link')
2420 os.symlink('some_dir', src)
2421 assert os.path.isdir(src)
2422
2423
Victor Stinnere8d51452010-08-19 01:05:19 +00002424class FSEncodingTests(unittest.TestCase):
2425 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002426 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2427 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002428
Victor Stinnere8d51452010-08-19 01:05:19 +00002429 def test_identity(self):
2430 # assert fsdecode(fsencode(x)) == x
2431 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2432 try:
2433 bytesfn = os.fsencode(fn)
2434 except UnicodeEncodeError:
2435 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002436 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002437
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002438
Brett Cannonefb00c02012-02-29 18:31:31 -05002439
2440class DeviceEncodingTests(unittest.TestCase):
2441
2442 def test_bad_fd(self):
2443 # Return None when an fd doesn't actually exist.
2444 self.assertIsNone(os.device_encoding(123456))
2445
Paul Monson62dfd7d2019-04-25 11:36:45 -07002446 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002447 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002448 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002449 def test_device_encoding(self):
2450 encoding = os.device_encoding(0)
2451 self.assertIsNotNone(encoding)
2452 self.assertTrue(codecs.lookup(encoding))
2453
2454
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002455class PidTests(unittest.TestCase):
2456 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2457 def test_getppid(self):
2458 p = subprocess.Popen([sys.executable, '-c',
2459 'import os; print(os.getppid())'],
2460 stdout=subprocess.PIPE)
2461 stdout, _ = p.communicate()
2462 # We are the parent of our subprocess
2463 self.assertEqual(int(stdout), os.getpid())
2464
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002465 def test_waitpid(self):
2466 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002467 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002468 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002469 status = os.waitpid(pid, 0)
2470 self.assertEqual(status, (pid, 0))
2471
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002472
Victor Stinner4659ccf2016-09-14 10:57:00 +02002473class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002474 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002475 self.exitcode = 17
2476
2477 filename = support.TESTFN
2478 self.addCleanup(support.unlink, filename)
2479
2480 if not with_env:
2481 code = 'import sys; sys.exit(%s)' % self.exitcode
2482 else:
2483 self.env = dict(os.environ)
2484 # create an unique key
2485 self.key = str(uuid.uuid4())
2486 self.env[self.key] = self.key
2487 # read the variable from os.environ to check that it exists
2488 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2489 % (self.key, self.exitcode))
2490
2491 with open(filename, "w") as fp:
2492 fp.write(code)
2493
Berker Peksag81816462016-09-15 20:19:47 +03002494 args = [sys.executable, filename]
2495 if use_bytes:
2496 args = [os.fsencode(a) for a in args]
2497 self.env = {os.fsencode(k): os.fsencode(v)
2498 for k, v in self.env.items()}
2499
2500 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002501
Berker Peksag4af23d72016-09-15 20:32:44 +03002502 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002503 def test_spawnl(self):
2504 args = self.create_args()
2505 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2506 self.assertEqual(exitcode, self.exitcode)
2507
Berker Peksag4af23d72016-09-15 20:32:44 +03002508 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002509 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002510 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002511 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2512 self.assertEqual(exitcode, self.exitcode)
2513
Berker Peksag4af23d72016-09-15 20:32:44 +03002514 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002515 def test_spawnlp(self):
2516 args = self.create_args()
2517 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2518 self.assertEqual(exitcode, self.exitcode)
2519
Berker Peksag4af23d72016-09-15 20:32:44 +03002520 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002521 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002522 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002523 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2524 self.assertEqual(exitcode, self.exitcode)
2525
Berker Peksag4af23d72016-09-15 20:32:44 +03002526 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002527 def test_spawnv(self):
2528 args = self.create_args()
2529 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2530 self.assertEqual(exitcode, self.exitcode)
2531
Berker Peksag4af23d72016-09-15 20:32:44 +03002532 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002533 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002534 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002535 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2536 self.assertEqual(exitcode, self.exitcode)
2537
Berker Peksag4af23d72016-09-15 20:32:44 +03002538 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002539 def test_spawnvp(self):
2540 args = self.create_args()
2541 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2542 self.assertEqual(exitcode, self.exitcode)
2543
Berker Peksag4af23d72016-09-15 20:32:44 +03002544 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002545 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002546 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002547 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2548 self.assertEqual(exitcode, self.exitcode)
2549
Berker Peksag4af23d72016-09-15 20:32:44 +03002550 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002551 def test_nowait(self):
2552 args = self.create_args()
2553 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2554 result = os.waitpid(pid, 0)
2555 self.assertEqual(result[0], pid)
2556 status = result[1]
2557 if hasattr(os, 'WIFEXITED'):
2558 self.assertTrue(os.WIFEXITED(status))
2559 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2560 else:
2561 self.assertEqual(status, self.exitcode << 8)
2562
Berker Peksag4af23d72016-09-15 20:32:44 +03002563 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002564 def test_spawnve_bytes(self):
2565 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2566 args = self.create_args(with_env=True, use_bytes=True)
2567 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2568 self.assertEqual(exitcode, self.exitcode)
2569
Steve Dower859fd7b2016-11-19 18:53:19 -08002570 @requires_os_func('spawnl')
2571 def test_spawnl_noargs(self):
2572 args = self.create_args()
2573 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002574 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002575
2576 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002577 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002578 args = self.create_args()
2579 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002580 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002581
2582 @requires_os_func('spawnv')
2583 def test_spawnv_noargs(self):
2584 args = self.create_args()
2585 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2586 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002587 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2588 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002589
2590 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002591 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002592 args = self.create_args()
2593 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2594 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002595 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2596 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002597
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002598 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002599 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002600
Ville Skyttä49b27342017-08-03 09:00:59 +03002601 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002602 newenv = os.environ.copy()
2603 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2604 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002605 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002606 except ValueError:
2607 pass
2608 else:
2609 self.assertEqual(exitcode, 127)
2610
Ville Skyttä49b27342017-08-03 09:00:59 +03002611 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002612 newenv = os.environ.copy()
2613 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2614 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002615 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002616 except ValueError:
2617 pass
2618 else:
2619 self.assertEqual(exitcode, 127)
2620
Ville Skyttä49b27342017-08-03 09:00:59 +03002621 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002622 newenv = os.environ.copy()
2623 newenv["FRUIT=ORANGE"] = "lemon"
2624 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002625 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002626 except ValueError:
2627 pass
2628 else:
2629 self.assertEqual(exitcode, 127)
2630
Ville Skyttä49b27342017-08-03 09:00:59 +03002631 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002632 filename = support.TESTFN
2633 self.addCleanup(support.unlink, filename)
2634 with open(filename, "w") as fp:
2635 fp.write('import sys, os\n'
2636 'if os.getenv("FRUIT") != "orange=lemon":\n'
2637 ' raise AssertionError')
2638 args = [sys.executable, filename]
2639 newenv = os.environ.copy()
2640 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002641 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002642 self.assertEqual(exitcode, 0)
2643
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002644 @requires_os_func('spawnve')
2645 def test_spawnve_invalid_env(self):
2646 self._test_invalid_env(os.spawnve)
2647
2648 @requires_os_func('spawnvpe')
2649 def test_spawnvpe_invalid_env(self):
2650 self._test_invalid_env(os.spawnvpe)
2651
Serhiy Storchaka77703942017-06-25 07:33:01 +03002652
Brian Curtin0151b8e2010-09-24 13:43:43 +00002653# The introduction of this TestCase caused at least two different errors on
2654# *nix buildbots. Temporarily skip this to let the buildbots move along.
2655@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002656@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2657class LoginTests(unittest.TestCase):
2658 def test_getlogin(self):
2659 user_name = os.getlogin()
2660 self.assertNotEqual(len(user_name), 0)
2661
2662
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002663@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2664 "needs os.getpriority and os.setpriority")
2665class ProgramPriorityTests(unittest.TestCase):
2666 """Tests for os.getpriority() and os.setpriority()."""
2667
2668 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002669
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002670 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2671 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2672 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002673 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2674 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002675 raise unittest.SkipTest("unable to reliably test setpriority "
2676 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002677 else:
2678 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002679 finally:
2680 try:
2681 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2682 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002683 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002684 raise
2685
2686
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002688
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002689 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002690
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002691 def __init__(self, conn):
2692 asynchat.async_chat.__init__(self, conn)
2693 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002694 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002695 self.closed = False
2696 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002697
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 def handle_read(self):
2699 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002700 if self.accumulate:
2701 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002702
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002703 def get_data(self):
2704 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002705
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002707 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002708 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002709
2710 def handle_error(self):
2711 raise
2712
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 def __init__(self, address):
2714 threading.Thread.__init__(self)
2715 asyncore.dispatcher.__init__(self)
2716 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2717 self.bind(address)
2718 self.listen(5)
2719 self.host, self.port = self.socket.getsockname()[:2]
2720 self.handler_instance = None
2721 self._active = False
2722 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002723
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002724 # --- public API
2725
2726 @property
2727 def running(self):
2728 return self._active
2729
2730 def start(self):
2731 assert not self.running
2732 self.__flag = threading.Event()
2733 threading.Thread.start(self)
2734 self.__flag.wait()
2735
2736 def stop(self):
2737 assert self.running
2738 self._active = False
2739 self.join()
2740
2741 def wait(self):
2742 # wait for handler connection to be closed, then stop the server
2743 while not getattr(self.handler_instance, "closed", False):
2744 time.sleep(0.001)
2745 self.stop()
2746
2747 # --- internals
2748
2749 def run(self):
2750 self._active = True
2751 self.__flag.set()
2752 while self._active and asyncore.socket_map:
2753 self._active_lock.acquire()
2754 asyncore.loop(timeout=0.001, count=1)
2755 self._active_lock.release()
2756 asyncore.close_all()
2757
2758 def handle_accept(self):
2759 conn, addr = self.accept()
2760 self.handler_instance = self.Handler(conn)
2761
2762 def handle_connect(self):
2763 self.close()
2764 handle_read = handle_connect
2765
2766 def writable(self):
2767 return 0
2768
2769 def handle_error(self):
2770 raise
2771
2772
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002773@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2774class TestSendfile(unittest.TestCase):
2775
Victor Stinner8c663fd2017-11-08 14:44:44 -08002776 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002777 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002778 not sys.platform.startswith("solaris") and \
2779 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002780 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2781 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002782 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2783 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002784
2785 @classmethod
2786 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002787 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002788 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002789
2790 @classmethod
2791 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002792 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002793 support.unlink(support.TESTFN)
2794
2795 def setUp(self):
2796 self.server = SendfileTestServer((support.HOST, 0))
2797 self.server.start()
2798 self.client = socket.socket()
2799 self.client.connect((self.server.host, self.server.port))
2800 self.client.settimeout(1)
2801 # synchronize by waiting for "220 ready" response
2802 self.client.recv(1024)
2803 self.sockno = self.client.fileno()
2804 self.file = open(support.TESTFN, 'rb')
2805 self.fileno = self.file.fileno()
2806
2807 def tearDown(self):
2808 self.file.close()
2809 self.client.close()
2810 if self.server.running:
2811 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002812 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002813
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002814 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002815 """A higher level wrapper representing how an application is
2816 supposed to use sendfile().
2817 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002818 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002819 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002820 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002821 except OSError as err:
2822 if err.errno == errno.ECONNRESET:
2823 # disconnected
2824 raise
2825 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2826 # we have to retry send data
2827 continue
2828 else:
2829 raise
2830
2831 def test_send_whole_file(self):
2832 # normal send
2833 total_sent = 0
2834 offset = 0
2835 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002836 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002837 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2838 if sent == 0:
2839 break
2840 offset += sent
2841 total_sent += sent
2842 self.assertTrue(sent <= nbytes)
2843 self.assertEqual(offset, total_sent)
2844
2845 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002846 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002847 self.client.close()
2848 self.server.wait()
2849 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002850 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002851 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002852
2853 def test_send_at_certain_offset(self):
2854 # start sending a file at a certain offset
2855 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002856 offset = len(self.DATA) // 2
2857 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002858 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002859 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002860 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2861 if sent == 0:
2862 break
2863 offset += sent
2864 total_sent += sent
2865 self.assertTrue(sent <= nbytes)
2866
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002867 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002868 self.client.close()
2869 self.server.wait()
2870 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002871 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002872 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002873 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002874 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002875
2876 def test_offset_overflow(self):
2877 # specify an offset > file size
2878 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002879 try:
2880 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2881 except OSError as e:
2882 # Solaris can raise EINVAL if offset >= file length, ignore.
2883 if e.errno != errno.EINVAL:
2884 raise
2885 else:
2886 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002887 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002888 self.client.close()
2889 self.server.wait()
2890 data = self.server.handler_instance.get_data()
2891 self.assertEqual(data, b'')
2892
2893 def test_invalid_offset(self):
2894 with self.assertRaises(OSError) as cm:
2895 os.sendfile(self.sockno, self.fileno, -1, 4096)
2896 self.assertEqual(cm.exception.errno, errno.EINVAL)
2897
Martin Panterbf19d162015-09-09 01:01:13 +00002898 def test_keywords(self):
2899 # Keyword arguments should be supported
2900 os.sendfile(out=self.sockno, offset=0, count=4096,
2901 **{'in': self.fileno})
2902 if self.SUPPORT_HEADERS_TRAILERS:
2903 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002904 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002905
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002906 # --- headers / trailers tests
2907
Serhiy Storchaka43767632013-11-03 21:31:38 +02002908 @requires_headers_trailers
2909 def test_headers(self):
2910 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002911 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002912 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002913 headers=[b"x" * 512, b"y" * 256])
2914 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002915 total_sent += sent
2916 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002917 while total_sent < len(expected_data):
2918 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002919 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2920 offset, nbytes)
2921 if sent == 0:
2922 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002923 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002924 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002925 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002926
Serhiy Storchaka43767632013-11-03 21:31:38 +02002927 self.assertEqual(total_sent, len(expected_data))
2928 self.client.close()
2929 self.server.wait()
2930 data = self.server.handler_instance.get_data()
2931 self.assertEqual(hash(data), hash(expected_data))
2932
2933 @requires_headers_trailers
2934 def test_trailers(self):
2935 TESTFN2 = support.TESTFN + "2"
2936 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002937
2938 self.addCleanup(support.unlink, TESTFN2)
2939 create_file(TESTFN2, file_data)
2940
2941 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002942 os.sendfile(self.sockno, f.fileno(), 0, 5,
2943 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002944 self.client.close()
2945 self.server.wait()
2946 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002947 self.assertEqual(data, b"abcde123456789")
2948
2949 @requires_headers_trailers
2950 @requires_32b
2951 def test_headers_overflow_32bits(self):
2952 self.server.handler_instance.accumulate = False
2953 with self.assertRaises(OSError) as cm:
2954 os.sendfile(self.sockno, self.fileno, 0, 0,
2955 headers=[b"x" * 2**16] * 2**15)
2956 self.assertEqual(cm.exception.errno, errno.EINVAL)
2957
2958 @requires_headers_trailers
2959 @requires_32b
2960 def test_trailers_overflow_32bits(self):
2961 self.server.handler_instance.accumulate = False
2962 with self.assertRaises(OSError) as cm:
2963 os.sendfile(self.sockno, self.fileno, 0, 0,
2964 trailers=[b"x" * 2**16] * 2**15)
2965 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002966
Serhiy Storchaka43767632013-11-03 21:31:38 +02002967 @requires_headers_trailers
2968 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2969 'test needs os.SF_NODISKIO')
2970 def test_flags(self):
2971 try:
2972 os.sendfile(self.sockno, self.fileno, 0, 4096,
2973 flags=os.SF_NODISKIO)
2974 except OSError as err:
2975 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2976 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002977
2978
Larry Hastings9cf065c2012-06-22 16:30:09 -07002979def supports_extended_attributes():
2980 if not hasattr(os, "setxattr"):
2981 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002982
Larry Hastings9cf065c2012-06-22 16:30:09 -07002983 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002984 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002985 try:
2986 os.setxattr(fp.fileno(), b"user.test", b"")
2987 except OSError:
2988 return False
2989 finally:
2990 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002991
2992 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002993
2994
2995@unittest.skipUnless(supports_extended_attributes(),
2996 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002997# Kernels < 2.6.39 don't respect setxattr flags.
2998@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002999class ExtendedAttributeTests(unittest.TestCase):
3000
Larry Hastings9cf065c2012-06-22 16:30:09 -07003001 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003002 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003003 self.addCleanup(support.unlink, fn)
3004 create_file(fn)
3005
Benjamin Peterson799bd802011-08-31 22:15:17 -04003006 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003007 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003008 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003009
Victor Stinnerf12e5062011-10-16 22:12:03 +02003010 init_xattr = listxattr(fn)
3011 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003012
Larry Hastings9cf065c2012-06-22 16:30:09 -07003013 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003014 xattr = set(init_xattr)
3015 xattr.add("user.test")
3016 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003017 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3018 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3019 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003020
Benjamin Peterson799bd802011-08-31 22:15:17 -04003021 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003022 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003023 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003024
Benjamin Peterson799bd802011-08-31 22:15:17 -04003025 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003026 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003027 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003028
Larry Hastings9cf065c2012-06-22 16:30:09 -07003029 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003030 xattr.add("user.test2")
3031 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003032 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003033
Benjamin Peterson799bd802011-08-31 22:15:17 -04003034 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003035 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003036 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003037
Victor Stinnerf12e5062011-10-16 22:12:03 +02003038 xattr.remove("user.test")
3039 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003040 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3041 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3042 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3043 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003044 many = sorted("user.test{}".format(i) for i in range(100))
3045 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003046 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003047 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003048
Larry Hastings9cf065c2012-06-22 16:30:09 -07003049 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003050 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003051 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003052
3053 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3054 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003055
3056 def test_simple(self):
3057 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3058 os.listxattr)
3059
3060 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003061 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3062 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003063
3064 def test_fds(self):
3065 def getxattr(path, *args):
3066 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003067 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003068 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003069 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003070 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003071 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003072 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003073 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003074 def listxattr(path, *args):
3075 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003076 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003077 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3078
3079
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003080@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3081class TermsizeTests(unittest.TestCase):
3082 def test_does_not_crash(self):
3083 """Check if get_terminal_size() returns a meaningful value.
3084
3085 There's no easy portable way to actually check the size of the
3086 terminal, so let's check if it returns something sensible instead.
3087 """
3088 try:
3089 size = os.get_terminal_size()
3090 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003091 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003092 # Under win32 a generic OSError can be thrown if the
3093 # handle cannot be retrieved
3094 self.skipTest("failed to query terminal size")
3095 raise
3096
Antoine Pitroucfade362012-02-08 23:48:59 +01003097 self.assertGreaterEqual(size.columns, 0)
3098 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003099
3100 def test_stty_match(self):
3101 """Check if stty returns the same results
3102
3103 stty actually tests stdin, so get_terminal_size is invoked on
3104 stdin explicitly. If stty succeeded, then get_terminal_size()
3105 should work too.
3106 """
3107 try:
3108 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003109 except (FileNotFoundError, subprocess.CalledProcessError,
3110 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003111 self.skipTest("stty invocation failed")
3112 expected = (int(size[1]), int(size[0])) # reversed order
3113
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003114 try:
3115 actual = os.get_terminal_size(sys.__stdin__.fileno())
3116 except OSError as e:
3117 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3118 # Under win32 a generic OSError can be thrown if the
3119 # handle cannot be retrieved
3120 self.skipTest("failed to query terminal size")
3121 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003122 self.assertEqual(expected, actual)
3123
3124
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003125@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
3126class MemfdCreateTests(unittest.TestCase):
3127 def test_memfd_create(self):
3128 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3129 self.assertNotEqual(fd, -1)
3130 self.addCleanup(os.close, fd)
3131 self.assertFalse(os.get_inheritable(fd))
3132 with open(fd, "wb", closefd=False) as f:
3133 f.write(b'memfd_create')
3134 self.assertEqual(f.tell(), 12)
3135
3136 fd2 = os.memfd_create("Hi")
3137 self.addCleanup(os.close, fd2)
3138 self.assertFalse(os.get_inheritable(fd2))
3139
3140
Victor Stinner292c8352012-10-30 02:17:38 +01003141class OSErrorTests(unittest.TestCase):
3142 def setUp(self):
3143 class Str(str):
3144 pass
3145
Victor Stinnerafe17062012-10-31 22:47:43 +01003146 self.bytes_filenames = []
3147 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003148 if support.TESTFN_UNENCODABLE is not None:
3149 decoded = support.TESTFN_UNENCODABLE
3150 else:
3151 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003152 self.unicode_filenames.append(decoded)
3153 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003154 if support.TESTFN_UNDECODABLE is not None:
3155 encoded = support.TESTFN_UNDECODABLE
3156 else:
3157 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003158 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003159 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003160 self.bytes_filenames.append(memoryview(encoded))
3161
3162 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003163
3164 def test_oserror_filename(self):
3165 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003166 (self.filenames, os.chdir,),
3167 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003168 (self.filenames, os.lstat,),
3169 (self.filenames, os.open, os.O_RDONLY),
3170 (self.filenames, os.rmdir,),
3171 (self.filenames, os.stat,),
3172 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003173 ]
3174 if sys.platform == "win32":
3175 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003176 (self.bytes_filenames, os.rename, b"dst"),
3177 (self.bytes_filenames, os.replace, b"dst"),
3178 (self.unicode_filenames, os.rename, "dst"),
3179 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003180 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003181 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003182 else:
3183 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003184 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003185 (self.filenames, os.rename, "dst"),
3186 (self.filenames, os.replace, "dst"),
3187 ))
3188 if hasattr(os, "chown"):
3189 funcs.append((self.filenames, os.chown, 0, 0))
3190 if hasattr(os, "lchown"):
3191 funcs.append((self.filenames, os.lchown, 0, 0))
3192 if hasattr(os, "truncate"):
3193 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003194 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003195 funcs.append((self.filenames, os.chflags, 0))
3196 if hasattr(os, "lchflags"):
3197 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003198 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003199 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003200 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003201 if sys.platform == "win32":
3202 funcs.append((self.bytes_filenames, os.link, b"dst"))
3203 funcs.append((self.unicode_filenames, os.link, "dst"))
3204 else:
3205 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003206 if hasattr(os, "listxattr"):
3207 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003208 (self.filenames, os.listxattr,),
3209 (self.filenames, os.getxattr, "user.test"),
3210 (self.filenames, os.setxattr, "user.test", b'user'),
3211 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003212 ))
3213 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003214 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003215 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003216 if sys.platform == "win32":
3217 funcs.append((self.unicode_filenames, os.readlink,))
3218 else:
3219 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003220
Steve Dowercc16be82016-09-08 10:35:16 -07003221
Victor Stinnerafe17062012-10-31 22:47:43 +01003222 for filenames, func, *func_args in funcs:
3223 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003224 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003225 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003226 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003227 else:
3228 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3229 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003230 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003231 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003232 except UnicodeDecodeError:
3233 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003234 else:
3235 self.fail("No exception thrown by {}".format(func))
3236
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003237class CPUCountTests(unittest.TestCase):
3238 def test_cpu_count(self):
3239 cpus = os.cpu_count()
3240 if cpus is not None:
3241 self.assertIsInstance(cpus, int)
3242 self.assertGreater(cpus, 0)
3243 else:
3244 self.skipTest("Could not determine the number of CPUs")
3245
Victor Stinnerdaf45552013-08-28 00:53:59 +02003246
3247class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003248 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003249 fd = os.open(__file__, os.O_RDONLY)
3250 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003251 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003252
Victor Stinnerdaf45552013-08-28 00:53:59 +02003253 os.set_inheritable(fd, True)
3254 self.assertEqual(os.get_inheritable(fd), True)
3255
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003256 @unittest.skipIf(fcntl is None, "need fcntl")
3257 def test_get_inheritable_cloexec(self):
3258 fd = os.open(__file__, os.O_RDONLY)
3259 self.addCleanup(os.close, fd)
3260 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003261
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003262 # clear FD_CLOEXEC flag
3263 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3264 flags &= ~fcntl.FD_CLOEXEC
3265 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003266
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003267 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003268
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003269 @unittest.skipIf(fcntl is None, "need fcntl")
3270 def test_set_inheritable_cloexec(self):
3271 fd = os.open(__file__, os.O_RDONLY)
3272 self.addCleanup(os.close, fd)
3273 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3274 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003275
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003276 os.set_inheritable(fd, True)
3277 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3278 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003279
Victor Stinnerdaf45552013-08-28 00:53:59 +02003280 def test_open(self):
3281 fd = os.open(__file__, os.O_RDONLY)
3282 self.addCleanup(os.close, fd)
3283 self.assertEqual(os.get_inheritable(fd), False)
3284
3285 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3286 def test_pipe(self):
3287 rfd, wfd = os.pipe()
3288 self.addCleanup(os.close, rfd)
3289 self.addCleanup(os.close, wfd)
3290 self.assertEqual(os.get_inheritable(rfd), False)
3291 self.assertEqual(os.get_inheritable(wfd), False)
3292
3293 def test_dup(self):
3294 fd1 = os.open(__file__, os.O_RDONLY)
3295 self.addCleanup(os.close, fd1)
3296
3297 fd2 = os.dup(fd1)
3298 self.addCleanup(os.close, fd2)
3299 self.assertEqual(os.get_inheritable(fd2), False)
3300
3301 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3302 def test_dup2(self):
3303 fd = os.open(__file__, os.O_RDONLY)
3304 self.addCleanup(os.close, fd)
3305
3306 # inheritable by default
3307 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003308 self.addCleanup(os.close, fd2)
3309 self.assertEqual(os.dup2(fd, fd2), fd2)
3310 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003311
3312 # force non-inheritable
3313 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003314 self.addCleanup(os.close, fd3)
3315 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3316 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003317
3318 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3319 def test_openpty(self):
3320 master_fd, slave_fd = os.openpty()
3321 self.addCleanup(os.close, master_fd)
3322 self.addCleanup(os.close, slave_fd)
3323 self.assertEqual(os.get_inheritable(master_fd), False)
3324 self.assertEqual(os.get_inheritable(slave_fd), False)
3325
3326
Brett Cannon3f9183b2016-08-26 14:44:48 -07003327class PathTConverterTests(unittest.TestCase):
3328 # tuples of (function name, allows fd arguments, additional arguments to
3329 # function, cleanup function)
3330 functions = [
3331 ('stat', True, (), None),
3332 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003333 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003334 ('chflags', False, (0,), None),
3335 ('lchflags', False, (0,), None),
3336 ('open', False, (0,), getattr(os, 'close', None)),
3337 ]
3338
3339 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003340 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003341 if os.name == 'nt':
3342 bytes_fspath = bytes_filename = None
3343 else:
3344 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003345 bytes_fspath = FakePath(bytes_filename)
3346 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003347 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003348 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003349
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003350 int_fspath = FakePath(fd)
3351 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003352
3353 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3354 with self.subTest(name=name):
3355 try:
3356 fn = getattr(os, name)
3357 except AttributeError:
3358 continue
3359
Brett Cannon8f96a302016-08-26 19:30:11 -07003360 for path in (str_filename, bytes_filename, str_fspath,
3361 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003362 if path is None:
3363 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003364 with self.subTest(name=name, path=path):
3365 result = fn(path, *extra_args)
3366 if cleanup_fn is not None:
3367 cleanup_fn(result)
3368
3369 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003370 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003371 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003372
3373 if allow_fd:
3374 result = fn(fd, *extra_args) # should not fail
3375 if cleanup_fn is not None:
3376 cleanup_fn(result)
3377 else:
3378 with self.assertRaisesRegex(
3379 TypeError,
3380 'os.PathLike'):
3381 fn(fd, *extra_args)
3382
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003383 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003384 msg = r'__fspath__\(\) to return str or bytes, not %s'
3385 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003386 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003387 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003388 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003389 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003390 os.stat(FakePath(object()))
3391
Brett Cannon3f9183b2016-08-26 14:44:48 -07003392
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003393@unittest.skipUnless(hasattr(os, 'get_blocking'),
3394 'needs os.get_blocking() and os.set_blocking()')
3395class BlockingTests(unittest.TestCase):
3396 def test_blocking(self):
3397 fd = os.open(__file__, os.O_RDONLY)
3398 self.addCleanup(os.close, fd)
3399 self.assertEqual(os.get_blocking(fd), True)
3400
3401 os.set_blocking(fd, False)
3402 self.assertEqual(os.get_blocking(fd), False)
3403
3404 os.set_blocking(fd, True)
3405 self.assertEqual(os.get_blocking(fd), True)
3406
3407
Yury Selivanov97e2e062014-09-26 12:33:06 -04003408
3409class ExportsTests(unittest.TestCase):
3410 def test_os_all(self):
3411 self.assertIn('open', os.__all__)
3412 self.assertIn('walk', os.__all__)
3413
3414
Victor Stinner6036e442015-03-08 01:58:04 +01003415class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003416 check_no_resource_warning = support.check_no_resource_warning
3417
Victor Stinner6036e442015-03-08 01:58:04 +01003418 def setUp(self):
3419 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003420 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003421 self.addCleanup(support.rmtree, self.path)
3422 os.mkdir(self.path)
3423
3424 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003425 path = self.bytes_path if isinstance(name, bytes) else self.path
3426 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003427 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003428 return filename
3429
3430 def get_entries(self, names):
3431 entries = dict((entry.name, entry)
3432 for entry in os.scandir(self.path))
3433 self.assertEqual(sorted(entries.keys()), names)
3434 return entries
3435
3436 def assert_stat_equal(self, stat1, stat2, skip_fields):
3437 if skip_fields:
3438 for attr in dir(stat1):
3439 if not attr.startswith("st_"):
3440 continue
3441 if attr in ("st_dev", "st_ino", "st_nlink"):
3442 continue
3443 self.assertEqual(getattr(stat1, attr),
3444 getattr(stat2, attr),
3445 (stat1, stat2, attr))
3446 else:
3447 self.assertEqual(stat1, stat2)
3448
3449 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003450 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003451 self.assertEqual(entry.name, name)
3452 self.assertEqual(entry.path, os.path.join(self.path, name))
3453 self.assertEqual(entry.inode(),
3454 os.stat(entry.path, follow_symlinks=False).st_ino)
3455
3456 entry_stat = os.stat(entry.path)
3457 self.assertEqual(entry.is_dir(),
3458 stat.S_ISDIR(entry_stat.st_mode))
3459 self.assertEqual(entry.is_file(),
3460 stat.S_ISREG(entry_stat.st_mode))
3461 self.assertEqual(entry.is_symlink(),
3462 os.path.islink(entry.path))
3463
3464 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3465 self.assertEqual(entry.is_dir(follow_symlinks=False),
3466 stat.S_ISDIR(entry_lstat.st_mode))
3467 self.assertEqual(entry.is_file(follow_symlinks=False),
3468 stat.S_ISREG(entry_lstat.st_mode))
3469
3470 self.assert_stat_equal(entry.stat(),
3471 entry_stat,
3472 os.name == 'nt' and not is_symlink)
3473 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3474 entry_lstat,
3475 os.name == 'nt')
3476
3477 def test_attributes(self):
3478 link = hasattr(os, 'link')
3479 symlink = support.can_symlink()
3480
3481 dirname = os.path.join(self.path, "dir")
3482 os.mkdir(dirname)
3483 filename = self.create_file("file.txt")
3484 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003485 try:
3486 os.link(filename, os.path.join(self.path, "link_file.txt"))
3487 except PermissionError as e:
3488 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003489 if symlink:
3490 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3491 target_is_directory=True)
3492 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3493
3494 names = ['dir', 'file.txt']
3495 if link:
3496 names.append('link_file.txt')
3497 if symlink:
3498 names.extend(('symlink_dir', 'symlink_file.txt'))
3499 entries = self.get_entries(names)
3500
3501 entry = entries['dir']
3502 self.check_entry(entry, 'dir', True, False, False)
3503
3504 entry = entries['file.txt']
3505 self.check_entry(entry, 'file.txt', False, True, False)
3506
3507 if link:
3508 entry = entries['link_file.txt']
3509 self.check_entry(entry, 'link_file.txt', False, True, False)
3510
3511 if symlink:
3512 entry = entries['symlink_dir']
3513 self.check_entry(entry, 'symlink_dir', True, False, True)
3514
3515 entry = entries['symlink_file.txt']
3516 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3517
3518 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003519 path = self.bytes_path if isinstance(name, bytes) else self.path
3520 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003521 self.assertEqual(len(entries), 1)
3522
3523 entry = entries[0]
3524 self.assertEqual(entry.name, name)
3525 return entry
3526
Brett Cannon96881cd2016-06-10 14:37:21 -07003527 def create_file_entry(self, name='file.txt'):
3528 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003529 return self.get_entry(os.path.basename(filename))
3530
3531 def test_current_directory(self):
3532 filename = self.create_file()
3533 old_dir = os.getcwd()
3534 try:
3535 os.chdir(self.path)
3536
3537 # call scandir() without parameter: it must list the content
3538 # of the current directory
3539 entries = dict((entry.name, entry) for entry in os.scandir())
3540 self.assertEqual(sorted(entries.keys()),
3541 [os.path.basename(filename)])
3542 finally:
3543 os.chdir(old_dir)
3544
3545 def test_repr(self):
3546 entry = self.create_file_entry()
3547 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3548
Brett Cannon96881cd2016-06-10 14:37:21 -07003549 def test_fspath_protocol(self):
3550 entry = self.create_file_entry()
3551 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3552
3553 def test_fspath_protocol_bytes(self):
3554 bytes_filename = os.fsencode('bytesfile.txt')
3555 bytes_entry = self.create_file_entry(name=bytes_filename)
3556 fspath = os.fspath(bytes_entry)
3557 self.assertIsInstance(fspath, bytes)
3558 self.assertEqual(fspath,
3559 os.path.join(os.fsencode(self.path),bytes_filename))
3560
Victor Stinner6036e442015-03-08 01:58:04 +01003561 def test_removed_dir(self):
3562 path = os.path.join(self.path, 'dir')
3563
3564 os.mkdir(path)
3565 entry = self.get_entry('dir')
3566 os.rmdir(path)
3567
3568 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3569 if os.name == 'nt':
3570 self.assertTrue(entry.is_dir())
3571 self.assertFalse(entry.is_file())
3572 self.assertFalse(entry.is_symlink())
3573 if os.name == 'nt':
3574 self.assertRaises(FileNotFoundError, entry.inode)
3575 # don't fail
3576 entry.stat()
3577 entry.stat(follow_symlinks=False)
3578 else:
3579 self.assertGreater(entry.inode(), 0)
3580 self.assertRaises(FileNotFoundError, entry.stat)
3581 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3582
3583 def test_removed_file(self):
3584 entry = self.create_file_entry()
3585 os.unlink(entry.path)
3586
3587 self.assertFalse(entry.is_dir())
3588 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3589 if os.name == 'nt':
3590 self.assertTrue(entry.is_file())
3591 self.assertFalse(entry.is_symlink())
3592 if os.name == 'nt':
3593 self.assertRaises(FileNotFoundError, entry.inode)
3594 # don't fail
3595 entry.stat()
3596 entry.stat(follow_symlinks=False)
3597 else:
3598 self.assertGreater(entry.inode(), 0)
3599 self.assertRaises(FileNotFoundError, entry.stat)
3600 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3601
3602 def test_broken_symlink(self):
3603 if not support.can_symlink():
3604 return self.skipTest('cannot create symbolic link')
3605
3606 filename = self.create_file("file.txt")
3607 os.symlink(filename,
3608 os.path.join(self.path, "symlink.txt"))
3609 entries = self.get_entries(['file.txt', 'symlink.txt'])
3610 entry = entries['symlink.txt']
3611 os.unlink(filename)
3612
3613 self.assertGreater(entry.inode(), 0)
3614 self.assertFalse(entry.is_dir())
3615 self.assertFalse(entry.is_file()) # broken symlink returns False
3616 self.assertFalse(entry.is_dir(follow_symlinks=False))
3617 self.assertFalse(entry.is_file(follow_symlinks=False))
3618 self.assertTrue(entry.is_symlink())
3619 self.assertRaises(FileNotFoundError, entry.stat)
3620 # don't fail
3621 entry.stat(follow_symlinks=False)
3622
3623 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003624 self.create_file("file.txt")
3625
3626 path_bytes = os.fsencode(self.path)
3627 entries = list(os.scandir(path_bytes))
3628 self.assertEqual(len(entries), 1, entries)
3629 entry = entries[0]
3630
3631 self.assertEqual(entry.name, b'file.txt')
3632 self.assertEqual(entry.path,
3633 os.fsencode(os.path.join(self.path, 'file.txt')))
3634
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003635 def test_bytes_like(self):
3636 self.create_file("file.txt")
3637
3638 for cls in bytearray, memoryview:
3639 path_bytes = cls(os.fsencode(self.path))
3640 with self.assertWarns(DeprecationWarning):
3641 entries = list(os.scandir(path_bytes))
3642 self.assertEqual(len(entries), 1, entries)
3643 entry = entries[0]
3644
3645 self.assertEqual(entry.name, b'file.txt')
3646 self.assertEqual(entry.path,
3647 os.fsencode(os.path.join(self.path, 'file.txt')))
3648 self.assertIs(type(entry.name), bytes)
3649 self.assertIs(type(entry.path), bytes)
3650
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003651 @unittest.skipUnless(os.listdir in os.supports_fd,
3652 'fd support for listdir required for this test.')
3653 def test_fd(self):
3654 self.assertIn(os.scandir, os.supports_fd)
3655 self.create_file('file.txt')
3656 expected_names = ['file.txt']
3657 if support.can_symlink():
3658 os.symlink('file.txt', os.path.join(self.path, 'link'))
3659 expected_names.append('link')
3660
3661 fd = os.open(self.path, os.O_RDONLY)
3662 try:
3663 with os.scandir(fd) as it:
3664 entries = list(it)
3665 names = [entry.name for entry in entries]
3666 self.assertEqual(sorted(names), expected_names)
3667 self.assertEqual(names, os.listdir(fd))
3668 for entry in entries:
3669 self.assertEqual(entry.path, entry.name)
3670 self.assertEqual(os.fspath(entry), entry.name)
3671 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3672 if os.stat in os.supports_dir_fd:
3673 st = os.stat(entry.name, dir_fd=fd)
3674 self.assertEqual(entry.stat(), st)
3675 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3676 self.assertEqual(entry.stat(follow_symlinks=False), st)
3677 finally:
3678 os.close(fd)
3679
Victor Stinner6036e442015-03-08 01:58:04 +01003680 def test_empty_path(self):
3681 self.assertRaises(FileNotFoundError, os.scandir, '')
3682
3683 def test_consume_iterator_twice(self):
3684 self.create_file("file.txt")
3685 iterator = os.scandir(self.path)
3686
3687 entries = list(iterator)
3688 self.assertEqual(len(entries), 1, entries)
3689
3690 # check than consuming the iterator twice doesn't raise exception
3691 entries2 = list(iterator)
3692 self.assertEqual(len(entries2), 0, entries2)
3693
3694 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003695 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003696 self.assertRaises(TypeError, os.scandir, obj)
3697
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003698 def test_close(self):
3699 self.create_file("file.txt")
3700 self.create_file("file2.txt")
3701 iterator = os.scandir(self.path)
3702 next(iterator)
3703 iterator.close()
3704 # multiple closes
3705 iterator.close()
3706 with self.check_no_resource_warning():
3707 del iterator
3708
3709 def test_context_manager(self):
3710 self.create_file("file.txt")
3711 self.create_file("file2.txt")
3712 with os.scandir(self.path) as iterator:
3713 next(iterator)
3714 with self.check_no_resource_warning():
3715 del iterator
3716
3717 def test_context_manager_close(self):
3718 self.create_file("file.txt")
3719 self.create_file("file2.txt")
3720 with os.scandir(self.path) as iterator:
3721 next(iterator)
3722 iterator.close()
3723
3724 def test_context_manager_exception(self):
3725 self.create_file("file.txt")
3726 self.create_file("file2.txt")
3727 with self.assertRaises(ZeroDivisionError):
3728 with os.scandir(self.path) as iterator:
3729 next(iterator)
3730 1/0
3731 with self.check_no_resource_warning():
3732 del iterator
3733
3734 def test_resource_warning(self):
3735 self.create_file("file.txt")
3736 self.create_file("file2.txt")
3737 iterator = os.scandir(self.path)
3738 next(iterator)
3739 with self.assertWarns(ResourceWarning):
3740 del iterator
3741 support.gc_collect()
3742 # exhausted iterator
3743 iterator = os.scandir(self.path)
3744 list(iterator)
3745 with self.check_no_resource_warning():
3746 del iterator
3747
Victor Stinner6036e442015-03-08 01:58:04 +01003748
Ethan Furmancdc08792016-06-02 15:06:09 -07003749class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003750
3751 # Abstracted so it can be overridden to test pure Python implementation
3752 # if a C version is provided.
3753 fspath = staticmethod(os.fspath)
3754
Ethan Furmancdc08792016-06-02 15:06:09 -07003755 def test_return_bytes(self):
3756 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003757 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003758
3759 def test_return_string(self):
3760 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003761 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003762
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003763 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003764 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003765 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003766
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003767 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003768 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3769 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3770
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003771 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003772 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3773 self.assertTrue(issubclass(FakePath, os.PathLike))
3774 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003775
Ethan Furmancdc08792016-06-02 15:06:09 -07003776 def test_garbage_in_exception_out(self):
3777 vapor = type('blah', (), {})
3778 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003779 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003780
3781 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003782 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003783
Brett Cannon044283a2016-07-15 10:41:49 -07003784 def test_bad_pathlike(self):
3785 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003786 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003787 # __fspath__ attribute that is not callable.
3788 c = type('foo', (), {})
3789 c.__fspath__ = 1
3790 self.assertRaises(TypeError, self.fspath, c())
3791 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003792 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003793 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003794
Victor Stinnerc29b5852017-11-02 07:28:27 -07003795
3796class TimesTests(unittest.TestCase):
3797 def test_times(self):
3798 times = os.times()
3799 self.assertIsInstance(times, os.times_result)
3800
3801 for field in ('user', 'system', 'children_user', 'children_system',
3802 'elapsed'):
3803 value = getattr(times, field)
3804 self.assertIsInstance(value, float)
3805
3806 if os.name == 'nt':
3807 self.assertEqual(times.children_user, 0)
3808 self.assertEqual(times.children_system, 0)
3809 self.assertEqual(times.elapsed, 0)
3810
3811
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003812# Only test if the C version is provided, otherwise TestPEP519 already tested
3813# the pure Python implementation.
3814if hasattr(os, "_fspath"):
3815 class TestPEP519PurePython(TestPEP519):
3816
3817 """Explicitly test the pure Python implementation of os.fspath()."""
3818
3819 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003820
3821
Fred Drake2e2be372001-09-20 21:33:42 +00003822if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003823 unittest.main()