blob: 2cfcebcc86488f7bb9ea7b7975e2922213ee8381 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020064from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Victor Stinnerae39d232016-03-24 17:12:55 +010088def create_file(filename, content=b'content'):
89 with open(filename, "xb", 0) as fp:
90 fp.write(content)
91
92
Thomas Wouters0e3f5912006-08-11 14:57:12 +000093# Tests creating TESTFN
94class FileTests(unittest.TestCase):
95 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000096 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000097 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000098 tearDown = setUp
99
100 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000101 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000102 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000103 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000104
Christian Heimesfdab48e2008-01-20 09:06:41 +0000105 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000106 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
107 # We must allocate two consecutive file descriptors, otherwise
108 # it will mess up other file descriptors (perhaps even the three
109 # standard ones).
110 second = os.dup(first)
111 try:
112 retries = 0
113 while second != first + 1:
114 os.close(first)
115 retries += 1
116 if retries > 10:
117 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000118 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000119 first, second = second, os.dup(second)
120 finally:
121 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000122 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000123 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000124 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000126 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000127 def test_rename(self):
128 path = support.TESTFN
129 old = sys.getrefcount(path)
130 self.assertRaises(TypeError, os.rename, path, 0)
131 new = sys.getrefcount(path)
132 self.assertEqual(old, new)
133
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000134 def test_read(self):
135 with open(support.TESTFN, "w+b") as fobj:
136 fobj.write(b"spam")
137 fobj.flush()
138 fd = fobj.fileno()
139 os.lseek(fd, 0, 0)
140 s = os.read(fd, 4)
141 self.assertEqual(type(s), bytes)
142 self.assertEqual(s, b"spam")
143
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200144 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200145 # Skip the test on 32-bit platforms: the number of bytes must fit in a
146 # Py_ssize_t type
147 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
148 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200149 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
150 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200151 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100152 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200153
154 # Issue #21932: Make sure that os.read() does not raise an
155 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200156 with open(support.TESTFN, "rb") as fp:
157 data = os.read(fp.fileno(), size)
158
Victor Stinner8c663fd2017-11-08 14:44:44 -0800159 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200160 # operating system is free to return less bytes than requested.
161 self.assertEqual(data, b'test')
162
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000163 def test_write(self):
164 # os.write() accepts bytes- and buffer-like objects but not strings
165 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
166 self.assertRaises(TypeError, os.write, fd, "beans")
167 os.write(fd, b"bacon\n")
168 os.write(fd, bytearray(b"eggs\n"))
169 os.write(fd, memoryview(b"spam\n"))
170 os.close(fd)
171 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000172 self.assertEqual(fobj.read().splitlines(),
173 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000174
Victor Stinnere0daff12011-03-20 23:36:35 +0100175 def write_windows_console(self, *args):
176 retcode = subprocess.call(args,
177 # use a new console to not flood the test output
178 creationflags=subprocess.CREATE_NEW_CONSOLE,
179 # use a shell to hide the console window (SW_HIDE)
180 shell=True)
181 self.assertEqual(retcode, 0)
182
183 @unittest.skipUnless(sys.platform == 'win32',
184 'test specific to the Windows console')
185 def test_write_windows_console(self):
186 # Issue #11395: the Windows console returns an error (12: not enough
187 # space error) on writing into stdout if stdout mode is binary and the
188 # length is greater than 66,000 bytes (or less, depending on heap
189 # usage).
190 code = "print('x' * 100000)"
191 self.write_windows_console(sys.executable, "-c", code)
192 self.write_windows_console(sys.executable, "-u", "-c", code)
193
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000194 def fdopen_helper(self, *args):
195 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200196 f = os.fdopen(fd, *args)
197 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000198
199 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200200 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
201 os.close(fd)
202
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000203 self.fdopen_helper()
204 self.fdopen_helper('r')
205 self.fdopen_helper('r', 100)
206
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100207 def test_replace(self):
208 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100209 self.addCleanup(support.unlink, support.TESTFN)
210 self.addCleanup(support.unlink, TESTFN2)
211
212 create_file(support.TESTFN, b"1")
213 create_file(TESTFN2, b"2")
214
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100215 os.replace(support.TESTFN, TESTFN2)
216 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
217 with open(TESTFN2, 'r') as f:
218 self.assertEqual(f.read(), "1")
219
Martin Panterbf19d162015-09-09 01:01:13 +0000220 def test_open_keywords(self):
221 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
222 dir_fd=None)
223 os.close(f)
224
225 def test_symlink_keywords(self):
226 symlink = support.get_attribute(os, "symlink")
227 try:
228 symlink(src='target', dst=support.TESTFN,
229 target_is_directory=False, dir_fd=None)
230 except (NotImplementedError, OSError):
231 pass # No OS support or unprivileged user
232
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200233
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234# Test attributes on return values from os.*stat* family.
235class StatAttributeTests(unittest.TestCase):
236 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200237 self.fname = support.TESTFN
238 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100239 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240
Serhiy Storchaka43767632013-11-03 21:31:38 +0200241 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000242 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000243 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244
245 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(result[stat.ST_SIZE], 3)
247 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 # Make sure all the attributes are there
250 members = dir(result)
251 for name in dir(stat):
252 if name[:3] == 'ST_':
253 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000254 if name.endswith("TIME"):
255 def trunc(x): return int(x)
256 else:
257 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700263 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700264 for name in 'st_atime st_mtime st_ctime'.split():
265 floaty = int(getattr(result, name) * 100000)
266 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700267 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700268
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 try:
270 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 except IndexError:
273 pass
274
275 # Make sure that assignment fails
276 try:
277 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200278 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000279 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000280 pass
281
282 try:
283 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200284 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000285 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 pass
287
288 try:
289 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except AttributeError:
292 pass
293
294 # Use the stat_result constructor with a too-short tuple.
295 try:
296 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200297 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000298 except TypeError:
299 pass
300
Ezio Melotti42da6632011-03-15 05:18:48 +0200301 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000302 try:
303 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
304 except TypeError:
305 pass
306
Antoine Pitrou38425292010-09-21 18:19:07 +0000307 def test_stat_attributes(self):
308 self.check_stat_attributes(self.fname)
309
310 def test_stat_attributes_bytes(self):
311 try:
312 fname = self.fname.encode(sys.getfilesystemencoding())
313 except UnicodeEncodeError:
314 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700329 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100340 self.assertTrue(isinstance(result.f_fsid, int))
341
342 # Test that the size of the tuple doesn't change
343 self.assertEqual(len(result), 10)
344
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700374 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Berker Peksag0b4dc482016-09-17 15:49:59 +0300434 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
435 def test_access_denied(self):
436 # Default to FindFirstFile WIN32_FIND_DATA when access is
437 # denied. See issue 28075.
438 # os.environ['TEMP'] should be located on a volume that
439 # supports file ACLs.
440 fname = os.path.join(os.environ['TEMP'], self.fname)
441 self.addCleanup(support.unlink, fname)
442 create_file(fname, b'ABC')
443 # Deny the right to [S]YNCHRONIZE on the file to
444 # force CreateFile to fail with ERROR_ACCESS_DENIED.
445 DETACHED_PROCESS = 8
446 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500447 # bpo-30584: Use security identifier *S-1-5-32-545 instead
448 # of localized "Users" to not depend on the locale.
449 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300450 creationflags=DETACHED_PROCESS
451 )
452 result = os.stat(fname)
453 self.assertNotEqual(result.st_size, 0)
454
Victor Stinner47aacc82015-06-12 17:26:23 +0200455
456class UtimeTests(unittest.TestCase):
457 def setUp(self):
458 self.dirname = support.TESTFN
459 self.fname = os.path.join(self.dirname, "f1")
460
461 self.addCleanup(support.rmtree, self.dirname)
462 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100463 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200464
Victor Stinner47aacc82015-06-12 17:26:23 +0200465 def support_subsecond(self, filename):
466 # Heuristic to check if the filesystem supports timestamp with
467 # subsecond resolution: check if float and int timestamps are different
468 st = os.stat(filename)
469 return ((st.st_atime != st[7])
470 or (st.st_mtime != st[8])
471 or (st.st_ctime != st[9]))
472
473 def _test_utime(self, set_time, filename=None):
474 if not filename:
475 filename = self.fname
476
477 support_subsecond = self.support_subsecond(filename)
478 if support_subsecond:
479 # Timestamp with a resolution of 1 microsecond (10^-6).
480 #
481 # The resolution of the C internal function used by os.utime()
482 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
483 # test with a resolution of 1 ns requires more work:
484 # see the issue #15745.
485 atime_ns = 1002003000 # 1.002003 seconds
486 mtime_ns = 4005006000 # 4.005006 seconds
487 else:
488 # use a resolution of 1 second
489 atime_ns = 5 * 10**9
490 mtime_ns = 8 * 10**9
491
492 set_time(filename, (atime_ns, mtime_ns))
493 st = os.stat(filename)
494
495 if support_subsecond:
496 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
497 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
498 else:
499 self.assertEqual(st.st_atime, atime_ns * 1e-9)
500 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
501 self.assertEqual(st.st_atime_ns, atime_ns)
502 self.assertEqual(st.st_mtime_ns, mtime_ns)
503
504 def test_utime(self):
505 def set_time(filename, ns):
506 # test the ns keyword parameter
507 os.utime(filename, ns=ns)
508 self._test_utime(set_time)
509
510 @staticmethod
511 def ns_to_sec(ns):
512 # Convert a number of nanosecond (int) to a number of seconds (float).
513 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
514 # issue, os.utime() rounds towards minus infinity.
515 return (ns * 1e-9) + 0.5e-9
516
517 def test_utime_by_indexed(self):
518 # pass times as floating point seconds as the second indexed parameter
519 def set_time(filename, ns):
520 atime_ns, mtime_ns = ns
521 atime = self.ns_to_sec(atime_ns)
522 mtime = self.ns_to_sec(mtime_ns)
523 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
524 # or utime(time_t)
525 os.utime(filename, (atime, mtime))
526 self._test_utime(set_time)
527
528 def test_utime_by_times(self):
529 def set_time(filename, ns):
530 atime_ns, mtime_ns = ns
531 atime = self.ns_to_sec(atime_ns)
532 mtime = self.ns_to_sec(mtime_ns)
533 # test the times keyword parameter
534 os.utime(filename, times=(atime, mtime))
535 self._test_utime(set_time)
536
537 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
538 "follow_symlinks support for utime required "
539 "for this test.")
540 def test_utime_nofollow_symlinks(self):
541 def set_time(filename, ns):
542 # use follow_symlinks=False to test utimensat(timespec)
543 # or lutimes(timeval)
544 os.utime(filename, ns=ns, follow_symlinks=False)
545 self._test_utime(set_time)
546
547 @unittest.skipUnless(os.utime in os.supports_fd,
548 "fd support for utime required for this test.")
549 def test_utime_fd(self):
550 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100551 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200552 # use a file descriptor to test futimens(timespec)
553 # or futimes(timeval)
554 os.utime(fp.fileno(), ns=ns)
555 self._test_utime(set_time)
556
557 @unittest.skipUnless(os.utime in os.supports_dir_fd,
558 "dir_fd support for utime required for this test.")
559 def test_utime_dir_fd(self):
560 def set_time(filename, ns):
561 dirname, name = os.path.split(filename)
562 dirfd = os.open(dirname, os.O_RDONLY)
563 try:
564 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
565 os.utime(name, dir_fd=dirfd, ns=ns)
566 finally:
567 os.close(dirfd)
568 self._test_utime(set_time)
569
570 def test_utime_directory(self):
571 def set_time(filename, ns):
572 # test calling os.utime() on a directory
573 os.utime(filename, ns=ns)
574 self._test_utime(set_time, filename=self.dirname)
575
576 def _test_utime_current(self, set_time):
577 # Get the system clock
578 current = time.time()
579
580 # Call os.utime() to set the timestamp to the current system clock
581 set_time(self.fname)
582
583 if not self.support_subsecond(self.fname):
584 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700585 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200586 # On Windows, the usual resolution of time.time() is 15.6 ms.
587 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700588 #
589 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
590 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200591 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200592 st = os.stat(self.fname)
593 msg = ("st_time=%r, current=%r, dt=%r"
594 % (st.st_mtime, current, st.st_mtime - current))
595 self.assertAlmostEqual(st.st_mtime, current,
596 delta=delta, msg=msg)
597
598 def test_utime_current(self):
599 def set_time(filename):
600 # Set to the current time in the new way
601 os.utime(self.fname)
602 self._test_utime_current(set_time)
603
604 def test_utime_current_old(self):
605 def set_time(filename):
606 # Set to the current time in the old explicit way.
607 os.utime(self.fname, None)
608 self._test_utime_current(set_time)
609
610 def get_file_system(self, path):
611 if sys.platform == 'win32':
612 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
613 import ctypes
614 kernel32 = ctypes.windll.kernel32
615 buf = ctypes.create_unicode_buffer("", 100)
616 ok = kernel32.GetVolumeInformationW(root, None, 0,
617 None, None, None,
618 buf, len(buf))
619 if ok:
620 return buf.value
621 # return None if the filesystem is unknown
622
623 def test_large_time(self):
624 # Many filesystems are limited to the year 2038. At least, the test
625 # pass with NTFS filesystem.
626 if self.get_file_system(self.dirname) != "NTFS":
627 self.skipTest("requires NTFS")
628
629 large = 5000000000 # some day in 2128
630 os.utime(self.fname, (large, large))
631 self.assertEqual(os.stat(self.fname).st_mtime, large)
632
633 def test_utime_invalid_arguments(self):
634 # seconds and nanoseconds parameters are mutually exclusive
635 with self.assertRaises(ValueError):
636 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200637 with self.assertRaises(TypeError):
638 os.utime(self.fname, [5, 5])
639 with self.assertRaises(TypeError):
640 os.utime(self.fname, (5,))
641 with self.assertRaises(TypeError):
642 os.utime(self.fname, (5, 5, 5))
643 with self.assertRaises(TypeError):
644 os.utime(self.fname, ns=[5, 5])
645 with self.assertRaises(TypeError):
646 os.utime(self.fname, ns=(5,))
647 with self.assertRaises(TypeError):
648 os.utime(self.fname, ns=(5, 5, 5))
649
650 if os.utime not in os.supports_follow_symlinks:
651 with self.assertRaises(NotImplementedError):
652 os.utime(self.fname, (5, 5), follow_symlinks=False)
653 if os.utime not in os.supports_fd:
654 with open(self.fname, 'wb', 0) as fp:
655 with self.assertRaises(TypeError):
656 os.utime(fp.fileno(), (5, 5))
657 if os.utime not in os.supports_dir_fd:
658 with self.assertRaises(NotImplementedError):
659 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200660
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300661 @support.cpython_only
662 def test_issue31577(self):
663 # The interpreter shouldn't crash in case utime() received a bad
664 # ns argument.
665 def get_bad_int(divmod_ret_val):
666 class BadInt:
667 def __divmod__(*args):
668 return divmod_ret_val
669 return BadInt()
670 with self.assertRaises(TypeError):
671 os.utime(self.fname, ns=(get_bad_int(42), 1))
672 with self.assertRaises(TypeError):
673 os.utime(self.fname, ns=(get_bad_int(()), 1))
674 with self.assertRaises(TypeError):
675 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
676
Victor Stinner47aacc82015-06-12 17:26:23 +0200677
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000678from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000679
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000680class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000681 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000682 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000683
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000684 def setUp(self):
685 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000686 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000687 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000688 for key, value in self._reference().items():
689 os.environ[key] = value
690
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000691 def tearDown(self):
692 os.environ.clear()
693 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000694 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000695 os.environb.clear()
696 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000697
Christian Heimes90333392007-11-01 19:08:42 +0000698 def _reference(self):
699 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
700
701 def _empty_mapping(self):
702 os.environ.clear()
703 return os.environ
704
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000705 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200706 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
707 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000708 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000709 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300710 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200711 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300712 value = popen.read().strip()
713 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000714
Xavier de Gayed1415312016-07-22 12:15:29 +0200715 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
716 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000717 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200718 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
719 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300720 it = iter(popen)
721 self.assertEqual(next(it), "line1\n")
722 self.assertEqual(next(it), "line2\n")
723 self.assertEqual(next(it), "line3\n")
724 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000725
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000726 # Verify environ keys and values from the OS are of the
727 # correct str type.
728 def test_keyvalue_types(self):
729 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000730 self.assertEqual(type(key), str)
731 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000732
Christian Heimes90333392007-11-01 19:08:42 +0000733 def test_items(self):
734 for key, value in self._reference().items():
735 self.assertEqual(os.environ.get(key), value)
736
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000737 # Issue 7310
738 def test___repr__(self):
739 """Check that the repr() of os.environ looks like environ({...})."""
740 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000741 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
742 '{!r}: {!r}'.format(key, value)
743 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000744
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000745 def test_get_exec_path(self):
746 defpath_list = os.defpath.split(os.pathsep)
747 test_path = ['/monty', '/python', '', '/flying/circus']
748 test_env = {'PATH': os.pathsep.join(test_path)}
749
750 saved_environ = os.environ
751 try:
752 os.environ = dict(test_env)
753 # Test that defaulting to os.environ works.
754 self.assertSequenceEqual(test_path, os.get_exec_path())
755 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
756 finally:
757 os.environ = saved_environ
758
759 # No PATH environment variable
760 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
761 # Empty PATH environment variable
762 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
763 # Supplied PATH environment variable
764 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
765
Victor Stinnerb745a742010-05-18 17:17:23 +0000766 if os.supports_bytes_environ:
767 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000768 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000769 # ignore BytesWarning warning
770 with warnings.catch_warnings(record=True):
771 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000772 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000773 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000774 pass
775 else:
776 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000777
778 # bytes key and/or value
779 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
780 ['abc'])
781 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
782 ['abc'])
783 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
784 ['abc'])
785
786 @unittest.skipUnless(os.supports_bytes_environ,
787 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000788 def test_environb(self):
789 # os.environ -> os.environb
790 value = 'euro\u20ac'
791 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000792 value_bytes = value.encode(sys.getfilesystemencoding(),
793 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000794 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000795 msg = "U+20AC character is not encodable to %s" % (
796 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000797 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000798 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000799 self.assertEqual(os.environ['unicode'], value)
800 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000801
802 # os.environb -> os.environ
803 value = b'\xff'
804 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000805 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000806 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000807 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000808
Victor Stinner13ff2452018-01-22 18:32:50 +0100809 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100810 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100811 def test_unset_error(self):
812 if sys.platform == "win32":
813 # an environment variable is limited to 32,767 characters
814 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100815 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100816 else:
817 # "=" is not allowed in a variable name
818 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100819 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100820
Victor Stinner6d101392013-04-14 16:35:04 +0200821 def test_key_type(self):
822 missing = 'missingkey'
823 self.assertNotIn(missing, os.environ)
824
Victor Stinner839e5ea2013-04-14 16:43:03 +0200825 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200826 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200827 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200828 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200829
Victor Stinner839e5ea2013-04-14 16:43:03 +0200830 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200831 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200832 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200833 self.assertTrue(cm.exception.__suppress_context__)
834
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300835 def _test_environ_iteration(self, collection):
836 iterator = iter(collection)
837 new_key = "__new_key__"
838
839 next(iterator) # start iteration over os.environ.items
840
841 # add a new key in os.environ mapping
842 os.environ[new_key] = "test_environ_iteration"
843
844 try:
845 next(iterator) # force iteration over modified mapping
846 self.assertEqual(os.environ[new_key], "test_environ_iteration")
847 finally:
848 del os.environ[new_key]
849
850 def test_iter_error_when_changing_os_environ(self):
851 self._test_environ_iteration(os.environ)
852
853 def test_iter_error_when_changing_os_environ_items(self):
854 self._test_environ_iteration(os.environ.items())
855
856 def test_iter_error_when_changing_os_environ_values(self):
857 self._test_environ_iteration(os.environ.values())
858
Victor Stinner6d101392013-04-14 16:35:04 +0200859
Tim Petersc4e09402003-04-25 07:11:48 +0000860class WalkTests(unittest.TestCase):
861 """Tests for os.walk()."""
862
Victor Stinner0561c532015-03-12 10:28:24 +0100863 # Wrapper to hide minor differences between os.walk and os.fwalk
864 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200865 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200866 if 'follow_symlinks' in kwargs:
867 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200868 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100869
Charles-François Natali7372b062012-02-05 15:15:38 +0100870 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100871 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100872 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000873
874 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000875 # TESTFN/
876 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000877 # tmp1
878 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000879 # tmp2
880 # SUB11/ no kids
881 # SUB2/ a file kid and a dirsymlink kid
882 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300883 # SUB21/ not readable
884 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000885 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200886 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300887 # broken_link2
888 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000889 # TEST2/
890 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100891 self.walk_path = join(support.TESTFN, "TEST1")
892 self.sub1_path = join(self.walk_path, "SUB1")
893 self.sub11_path = join(self.sub1_path, "SUB11")
894 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300895 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100896 tmp1_path = join(self.walk_path, "tmp1")
897 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000898 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300899 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100900 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000901 t2_path = join(support.TESTFN, "TEST2")
902 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200903 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300904 broken_link2_path = join(sub2_path, "broken_link2")
905 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000906
907 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100908 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000909 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300910 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000911 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100912
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300913 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100914 with open(path, "x") as f:
915 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000916
Victor Stinner0561c532015-03-12 10:28:24 +0100917 if support.can_symlink():
918 os.symlink(os.path.abspath(t2_path), self.link_path)
919 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300920 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
921 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300922 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300923 ["broken_link", "broken_link2", "broken_link3",
924 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100925 else:
pxinwr3e028b22019-02-15 13:04:47 +0800926 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100927
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300928 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300929 try:
930 os.listdir(sub21_path)
931 except PermissionError:
932 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
933 else:
934 os.chmod(sub21_path, stat.S_IRWXU)
935 os.unlink(tmp5_path)
936 os.rmdir(sub21_path)
937 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300938
Victor Stinner0561c532015-03-12 10:28:24 +0100939 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000940 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300941 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100942
Tim Petersc4e09402003-04-25 07:11:48 +0000943 self.assertEqual(len(all), 4)
944 # We can't know which order SUB1 and SUB2 will appear in.
945 # Not flipped: TESTFN, SUB1, SUB11, SUB2
946 # flipped: TESTFN, SUB2, SUB1, SUB11
947 flipped = all[0][1][0] != "SUB1"
948 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200949 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300950 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100951 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
952 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
953 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
954 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000955
Brett Cannon3f9183b2016-08-26 14:44:48 -0700956 def test_walk_prune(self, walk_path=None):
957 if walk_path is None:
958 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000959 # Prune the search.
960 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700961 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000962 all.append((root, dirs, files))
963 # Don't descend into SUB1.
964 if 'SUB1' in dirs:
965 # Note that this also mutates the dirs we appended to all!
966 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000967
Victor Stinner0561c532015-03-12 10:28:24 +0100968 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200969 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100970
971 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300972 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100973 self.assertEqual(all[1], self.sub2_tree)
974
Brett Cannon3f9183b2016-08-26 14:44:48 -0700975 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200976 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700977
Victor Stinner0561c532015-03-12 10:28:24 +0100978 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000979 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100980 all = list(self.walk(self.walk_path, topdown=False))
981
Victor Stinner53b0a412016-03-26 01:12:36 +0100982 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000983 # We can't know which order SUB1 and SUB2 will appear in.
984 # Not flipped: SUB11, SUB1, SUB2, TESTFN
985 # flipped: SUB2, SUB11, SUB1, TESTFN
986 flipped = all[3][1][0] != "SUB1"
987 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200988 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300989 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100990 self.assertEqual(all[3],
991 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
992 self.assertEqual(all[flipped],
993 (self.sub11_path, [], []))
994 self.assertEqual(all[flipped + 1],
995 (self.sub1_path, ["SUB11"], ["tmp2"]))
996 self.assertEqual(all[2 - 2 * flipped],
997 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000998
Victor Stinner0561c532015-03-12 10:28:24 +0100999 def test_walk_symlink(self):
1000 if not support.can_symlink():
1001 self.skipTest("need symlink support")
1002
1003 # Walk, following symlinks.
1004 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1005 for root, dirs, files in walk_it:
1006 if root == self.link_path:
1007 self.assertEqual(dirs, [])
1008 self.assertEqual(files, ["tmp4"])
1009 break
1010 else:
1011 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001012
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001013 def test_walk_bad_dir(self):
1014 # Walk top-down.
1015 errors = []
1016 walk_it = self.walk(self.walk_path, onerror=errors.append)
1017 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001018 self.assertEqual(errors, [])
1019 dir1 = 'SUB1'
1020 path1 = os.path.join(root, dir1)
1021 path1new = os.path.join(root, dir1 + '.new')
1022 os.rename(path1, path1new)
1023 try:
1024 roots = [r for r, d, f in walk_it]
1025 self.assertTrue(errors)
1026 self.assertNotIn(path1, roots)
1027 self.assertNotIn(path1new, roots)
1028 for dir2 in dirs:
1029 if dir2 != dir1:
1030 self.assertIn(os.path.join(root, dir2), roots)
1031 finally:
1032 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001033
Charles-François Natali7372b062012-02-05 15:15:38 +01001034
1035@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1036class FwalkTests(WalkTests):
1037 """Tests for os.fwalk()."""
1038
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001039 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001040 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001041 yield (root, dirs, files)
1042
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001043 def fwalk(self, *args, **kwargs):
1044 return os.fwalk(*args, **kwargs)
1045
Larry Hastingsc48fe982012-06-25 04:49:05 -07001046 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1047 """
1048 compare with walk() results.
1049 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001050 walk_kwargs = walk_kwargs.copy()
1051 fwalk_kwargs = fwalk_kwargs.copy()
1052 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1053 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1054 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001055
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001057 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001058 expected[root] = (set(dirs), set(files))
1059
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001060 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001061 self.assertIn(root, expected)
1062 self.assertEqual(expected[root], (set(dirs), set(files)))
1063
Larry Hastingsc48fe982012-06-25 04:49:05 -07001064 def test_compare_to_walk(self):
1065 kwargs = {'top': support.TESTFN}
1066 self._compare_to_walk(kwargs, kwargs)
1067
Charles-François Natali7372b062012-02-05 15:15:38 +01001068 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001069 try:
1070 fd = os.open(".", os.O_RDONLY)
1071 walk_kwargs = {'top': support.TESTFN}
1072 fwalk_kwargs = walk_kwargs.copy()
1073 fwalk_kwargs['dir_fd'] = fd
1074 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1075 finally:
1076 os.close(fd)
1077
1078 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001079 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001080 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1081 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001082 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001083 # check that the FD is valid
1084 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001085 # redundant check
1086 os.stat(rootfd)
1087 # check that listdir() returns consistent information
1088 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001089
1090 def test_fd_leak(self):
1091 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1092 # we both check that calling fwalk() a large number of times doesn't
1093 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1094 minfd = os.dup(1)
1095 os.close(minfd)
1096 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001097 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001098 pass
1099 newfd = os.dup(1)
1100 self.addCleanup(os.close, newfd)
1101 self.assertEqual(newfd, minfd)
1102
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001103class BytesWalkTests(WalkTests):
1104 """Tests for os.walk() with bytes."""
1105 def walk(self, top, **kwargs):
1106 if 'follow_symlinks' in kwargs:
1107 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1108 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1109 root = os.fsdecode(broot)
1110 dirs = list(map(os.fsdecode, bdirs))
1111 files = list(map(os.fsdecode, bfiles))
1112 yield (root, dirs, files)
1113 bdirs[:] = list(map(os.fsencode, dirs))
1114 bfiles[:] = list(map(os.fsencode, files))
1115
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001116@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1117class BytesFwalkTests(FwalkTests):
1118 """Tests for os.walk() with bytes."""
1119 def fwalk(self, top='.', *args, **kwargs):
1120 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1121 root = os.fsdecode(broot)
1122 dirs = list(map(os.fsdecode, bdirs))
1123 files = list(map(os.fsdecode, bfiles))
1124 yield (root, dirs, files, topfd)
1125 bdirs[:] = list(map(os.fsencode, dirs))
1126 bfiles[:] = list(map(os.fsencode, files))
1127
Charles-François Natali7372b062012-02-05 15:15:38 +01001128
Guido van Rossume7ba4952007-06-06 23:52:48 +00001129class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001130 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001131 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001132
1133 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001134 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001135 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1136 os.makedirs(path) # Should work
1137 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1138 os.makedirs(path)
1139
1140 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001141 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001142 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1143 os.makedirs(path)
1144 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1145 'dir5', 'dir6')
1146 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001147
Serhiy Storchakae304e332017-03-24 13:27:42 +02001148 def test_mode(self):
1149 with support.temp_umask(0o002):
1150 base = support.TESTFN
1151 parent = os.path.join(base, 'dir1')
1152 path = os.path.join(parent, 'dir2')
1153 os.makedirs(path, 0o555)
1154 self.assertTrue(os.path.exists(path))
1155 self.assertTrue(os.path.isdir(path))
1156 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001157 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1158 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001159
Terry Reedy5a22b652010-12-02 07:05:56 +00001160 def test_exist_ok_existing_directory(self):
1161 path = os.path.join(support.TESTFN, 'dir1')
1162 mode = 0o777
1163 old_mask = os.umask(0o022)
1164 os.makedirs(path, mode)
1165 self.assertRaises(OSError, os.makedirs, path, mode)
1166 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001167 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001168 os.makedirs(path, mode=mode, exist_ok=True)
1169 os.umask(old_mask)
1170
Martin Pantera82642f2015-11-19 04:48:44 +00001171 # Issue #25583: A drive root could raise PermissionError on Windows
1172 os.makedirs(os.path.abspath('/'), exist_ok=True)
1173
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001174 def test_exist_ok_s_isgid_directory(self):
1175 path = os.path.join(support.TESTFN, 'dir1')
1176 S_ISGID = stat.S_ISGID
1177 mode = 0o777
1178 old_mask = os.umask(0o022)
1179 try:
1180 existing_testfn_mode = stat.S_IMODE(
1181 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001182 try:
1183 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001184 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001185 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001186 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1187 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1188 # The os should apply S_ISGID from the parent dir for us, but
1189 # this test need not depend on that behavior. Be explicit.
1190 os.makedirs(path, mode | S_ISGID)
1191 # http://bugs.python.org/issue14992
1192 # Should not fail when the bit is already set.
1193 os.makedirs(path, mode, exist_ok=True)
1194 # remove the bit.
1195 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001196 # May work even when the bit is not already set when demanded.
1197 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001198 finally:
1199 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001200
1201 def test_exist_ok_existing_regular_file(self):
1202 base = support.TESTFN
1203 path = os.path.join(support.TESTFN, 'dir1')
1204 f = open(path, 'w')
1205 f.write('abc')
1206 f.close()
1207 self.assertRaises(OSError, os.makedirs, path)
1208 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1209 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1210 os.remove(path)
1211
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001212 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001213 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001214 'dir4', 'dir5', 'dir6')
1215 # If the tests failed, the bottom-most directory ('../dir6')
1216 # may not have been created, so we look for the outermost directory
1217 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001218 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001219 path = os.path.dirname(path)
1220
1221 os.removedirs(path)
1222
Andrew Svetlov405faed2012-12-25 12:18:09 +02001223
R David Murrayf2ad1732014-12-25 18:36:56 -05001224@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1225class ChownFileTests(unittest.TestCase):
1226
Berker Peksag036a71b2015-07-21 09:29:48 +03001227 @classmethod
1228 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001229 os.mkdir(support.TESTFN)
1230
1231 def test_chown_uid_gid_arguments_must_be_index(self):
1232 stat = os.stat(support.TESTFN)
1233 uid = stat.st_uid
1234 gid = stat.st_gid
1235 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1236 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1237 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1238 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1239 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1240
1241 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1242 def test_chown(self):
1243 gid_1, gid_2 = groups[:2]
1244 uid = os.stat(support.TESTFN).st_uid
1245 os.chown(support.TESTFN, uid, gid_1)
1246 gid = os.stat(support.TESTFN).st_gid
1247 self.assertEqual(gid, gid_1)
1248 os.chown(support.TESTFN, uid, gid_2)
1249 gid = os.stat(support.TESTFN).st_gid
1250 self.assertEqual(gid, gid_2)
1251
1252 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1253 "test needs root privilege and more than one user")
1254 def test_chown_with_root(self):
1255 uid_1, uid_2 = all_users[:2]
1256 gid = os.stat(support.TESTFN).st_gid
1257 os.chown(support.TESTFN, uid_1, gid)
1258 uid = os.stat(support.TESTFN).st_uid
1259 self.assertEqual(uid, uid_1)
1260 os.chown(support.TESTFN, uid_2, gid)
1261 uid = os.stat(support.TESTFN).st_uid
1262 self.assertEqual(uid, uid_2)
1263
1264 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1265 "test needs non-root account and more than one user")
1266 def test_chown_without_permission(self):
1267 uid_1, uid_2 = all_users[:2]
1268 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001269 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001270 os.chown(support.TESTFN, uid_1, gid)
1271 os.chown(support.TESTFN, uid_2, gid)
1272
Berker Peksag036a71b2015-07-21 09:29:48 +03001273 @classmethod
1274 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001275 os.rmdir(support.TESTFN)
1276
1277
Andrew Svetlov405faed2012-12-25 12:18:09 +02001278class RemoveDirsTests(unittest.TestCase):
1279 def setUp(self):
1280 os.makedirs(support.TESTFN)
1281
1282 def tearDown(self):
1283 support.rmtree(support.TESTFN)
1284
1285 def test_remove_all(self):
1286 dira = os.path.join(support.TESTFN, 'dira')
1287 os.mkdir(dira)
1288 dirb = os.path.join(dira, 'dirb')
1289 os.mkdir(dirb)
1290 os.removedirs(dirb)
1291 self.assertFalse(os.path.exists(dirb))
1292 self.assertFalse(os.path.exists(dira))
1293 self.assertFalse(os.path.exists(support.TESTFN))
1294
1295 def test_remove_partial(self):
1296 dira = os.path.join(support.TESTFN, 'dira')
1297 os.mkdir(dira)
1298 dirb = os.path.join(dira, 'dirb')
1299 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001300 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001301 os.removedirs(dirb)
1302 self.assertFalse(os.path.exists(dirb))
1303 self.assertTrue(os.path.exists(dira))
1304 self.assertTrue(os.path.exists(support.TESTFN))
1305
1306 def test_remove_nothing(self):
1307 dira = os.path.join(support.TESTFN, 'dira')
1308 os.mkdir(dira)
1309 dirb = os.path.join(dira, 'dirb')
1310 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001311 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001312 with self.assertRaises(OSError):
1313 os.removedirs(dirb)
1314 self.assertTrue(os.path.exists(dirb))
1315 self.assertTrue(os.path.exists(dira))
1316 self.assertTrue(os.path.exists(support.TESTFN))
1317
1318
Guido van Rossume7ba4952007-06-06 23:52:48 +00001319class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001320 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001321 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001322 f.write(b'hello')
1323 f.close()
1324 with open(os.devnull, 'rb') as f:
1325 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001326
Andrew Svetlov405faed2012-12-25 12:18:09 +02001327
Guido van Rossume7ba4952007-06-06 23:52:48 +00001328class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001329 def test_urandom_length(self):
1330 self.assertEqual(len(os.urandom(0)), 0)
1331 self.assertEqual(len(os.urandom(1)), 1)
1332 self.assertEqual(len(os.urandom(10)), 10)
1333 self.assertEqual(len(os.urandom(100)), 100)
1334 self.assertEqual(len(os.urandom(1000)), 1000)
1335
1336 def test_urandom_value(self):
1337 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001338 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001339 data2 = os.urandom(16)
1340 self.assertNotEqual(data1, data2)
1341
1342 def get_urandom_subprocess(self, count):
1343 code = '\n'.join((
1344 'import os, sys',
1345 'data = os.urandom(%s)' % count,
1346 'sys.stdout.buffer.write(data)',
1347 'sys.stdout.buffer.flush()'))
1348 out = assert_python_ok('-c', code)
1349 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001350 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001351 return stdout
1352
1353 def test_urandom_subprocess(self):
1354 data1 = self.get_urandom_subprocess(16)
1355 data2 = self.get_urandom_subprocess(16)
1356 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001357
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001358
Victor Stinner9b1f4742016-09-06 16:18:52 -07001359@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1360class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001361 @classmethod
1362 def setUpClass(cls):
1363 try:
1364 os.getrandom(1)
1365 except OSError as exc:
1366 if exc.errno == errno.ENOSYS:
1367 # Python compiled on a more recent Linux version
1368 # than the current Linux kernel
1369 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1370 else:
1371 raise
1372
Victor Stinner9b1f4742016-09-06 16:18:52 -07001373 def test_getrandom_type(self):
1374 data = os.getrandom(16)
1375 self.assertIsInstance(data, bytes)
1376 self.assertEqual(len(data), 16)
1377
1378 def test_getrandom0(self):
1379 empty = os.getrandom(0)
1380 self.assertEqual(empty, b'')
1381
1382 def test_getrandom_random(self):
1383 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1384
1385 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1386 # resource /dev/random
1387
1388 def test_getrandom_nonblock(self):
1389 # The call must not fail. Check also that the flag exists
1390 try:
1391 os.getrandom(1, os.GRND_NONBLOCK)
1392 except BlockingIOError:
1393 # System urandom is not initialized yet
1394 pass
1395
1396 def test_getrandom_value(self):
1397 data1 = os.getrandom(16)
1398 data2 = os.getrandom(16)
1399 self.assertNotEqual(data1, data2)
1400
1401
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001402# os.urandom() doesn't use a file descriptor when it is implemented with the
1403# getentropy() function, the getrandom() function or the getrandom() syscall
1404OS_URANDOM_DONT_USE_FD = (
1405 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1406 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1407 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001408
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001409@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1410 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001411class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001412 @unittest.skipUnless(resource, "test requires the resource module")
1413 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001414 # Check urandom() failing when it is not able to open /dev/random.
1415 # We spawn a new process to make the test more robust (if getrlimit()
1416 # failed to restore the file descriptor limit after this, the whole
1417 # test suite would crash; this actually happened on the OS X Tiger
1418 # buildbot).
1419 code = """if 1:
1420 import errno
1421 import os
1422 import resource
1423
1424 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1425 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1426 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001427 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001428 except OSError as e:
1429 assert e.errno == errno.EMFILE, e.errno
1430 else:
1431 raise AssertionError("OSError not raised")
1432 """
1433 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001434
Antoine Pitroue472aea2014-04-26 14:33:03 +02001435 def test_urandom_fd_closed(self):
1436 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1437 # closed.
1438 code = """if 1:
1439 import os
1440 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001441 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001442 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001443 with test.support.SuppressCrashReport():
1444 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001445 sys.stdout.buffer.write(os.urandom(4))
1446 """
1447 rc, out, err = assert_python_ok('-Sc', code)
1448
1449 def test_urandom_fd_reopened(self):
1450 # Issue #21207: urandom() should detect its fd to /dev/urandom
1451 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001452 self.addCleanup(support.unlink, support.TESTFN)
1453 create_file(support.TESTFN, b"x" * 256)
1454
Antoine Pitroue472aea2014-04-26 14:33:03 +02001455 code = """if 1:
1456 import os
1457 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001458 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001459 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001460 with test.support.SuppressCrashReport():
1461 for fd in range(3, 256):
1462 try:
1463 os.close(fd)
1464 except OSError:
1465 pass
1466 else:
1467 # Found the urandom fd (XXX hopefully)
1468 break
1469 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001470 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001471 new_fd = f.fileno()
1472 # Issue #26935: posix allows new_fd and fd to be equal but
1473 # some libc implementations have dup2 return an error in this
1474 # case.
1475 if new_fd != fd:
1476 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001477 sys.stdout.buffer.write(os.urandom(4))
1478 sys.stdout.buffer.write(os.urandom(4))
1479 """.format(TESTFN=support.TESTFN)
1480 rc, out, err = assert_python_ok('-Sc', code)
1481 self.assertEqual(len(out), 8)
1482 self.assertNotEqual(out[0:4], out[4:8])
1483 rc, out2, err2 = assert_python_ok('-Sc', code)
1484 self.assertEqual(len(out2), 8)
1485 self.assertNotEqual(out2, out)
1486
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001487
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001488@contextlib.contextmanager
1489def _execvpe_mockup(defpath=None):
1490 """
1491 Stubs out execv and execve functions when used as context manager.
1492 Records exec calls. The mock execv and execve functions always raise an
1493 exception as they would normally never return.
1494 """
1495 # A list of tuples containing (function name, first arg, args)
1496 # of calls to execv or execve that have been made.
1497 calls = []
1498
1499 def mock_execv(name, *args):
1500 calls.append(('execv', name, args))
1501 raise RuntimeError("execv called")
1502
1503 def mock_execve(name, *args):
1504 calls.append(('execve', name, args))
1505 raise OSError(errno.ENOTDIR, "execve called")
1506
1507 try:
1508 orig_execv = os.execv
1509 orig_execve = os.execve
1510 orig_defpath = os.defpath
1511 os.execv = mock_execv
1512 os.execve = mock_execve
1513 if defpath is not None:
1514 os.defpath = defpath
1515 yield calls
1516 finally:
1517 os.execv = orig_execv
1518 os.execve = orig_execve
1519 os.defpath = orig_defpath
1520
Victor Stinner4659ccf2016-09-14 10:57:00 +02001521
Guido van Rossume7ba4952007-06-06 23:52:48 +00001522class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001523 @unittest.skipIf(USING_LINUXTHREADS,
1524 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001525 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001526 self.assertRaises(OSError, os.execvpe, 'no such app-',
1527 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001528
Steve Dowerbce26262016-11-19 19:17:26 -08001529 def test_execv_with_bad_arglist(self):
1530 self.assertRaises(ValueError, os.execv, 'notepad', ())
1531 self.assertRaises(ValueError, os.execv, 'notepad', [])
1532 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1533 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1534
Thomas Heller6790d602007-08-30 17:15:14 +00001535 def test_execvpe_with_bad_arglist(self):
1536 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001537 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1538 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001539
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001540 @unittest.skipUnless(hasattr(os, '_execvpe'),
1541 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001542 def _test_internal_execvpe(self, test_type):
1543 program_path = os.sep + 'absolutepath'
1544 if test_type is bytes:
1545 program = b'executable'
1546 fullpath = os.path.join(os.fsencode(program_path), program)
1547 native_fullpath = fullpath
1548 arguments = [b'progname', 'arg1', 'arg2']
1549 else:
1550 program = 'executable'
1551 arguments = ['progname', 'arg1', 'arg2']
1552 fullpath = os.path.join(program_path, program)
1553 if os.name != "nt":
1554 native_fullpath = os.fsencode(fullpath)
1555 else:
1556 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001557 env = {'spam': 'beans'}
1558
Victor Stinnerb745a742010-05-18 17:17:23 +00001559 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001560 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001561 self.assertRaises(RuntimeError,
1562 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001563 self.assertEqual(len(calls), 1)
1564 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1565
Victor Stinnerb745a742010-05-18 17:17:23 +00001566 # test os._execvpe() with a relative path:
1567 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001568 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001569 self.assertRaises(OSError,
1570 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001571 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001572 self.assertSequenceEqual(calls[0],
1573 ('execve', native_fullpath, (arguments, env)))
1574
1575 # test os._execvpe() with a relative path:
1576 # os.get_exec_path() reads the 'PATH' variable
1577 with _execvpe_mockup() as calls:
1578 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001579 if test_type is bytes:
1580 env_path[b'PATH'] = program_path
1581 else:
1582 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001583 self.assertRaises(OSError,
1584 os._execvpe, program, arguments, env=env_path)
1585 self.assertEqual(len(calls), 1)
1586 self.assertSequenceEqual(calls[0],
1587 ('execve', native_fullpath, (arguments, env_path)))
1588
1589 def test_internal_execvpe_str(self):
1590 self._test_internal_execvpe(str)
1591 if os.name != "nt":
1592 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001593
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001594 def test_execve_invalid_env(self):
1595 args = [sys.executable, '-c', 'pass']
1596
Ville Skyttä49b27342017-08-03 09:00:59 +03001597 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001598 newenv = os.environ.copy()
1599 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1600 with self.assertRaises(ValueError):
1601 os.execve(args[0], args, newenv)
1602
Ville Skyttä49b27342017-08-03 09:00:59 +03001603 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001604 newenv = os.environ.copy()
1605 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1606 with self.assertRaises(ValueError):
1607 os.execve(args[0], args, newenv)
1608
Ville Skyttä49b27342017-08-03 09:00:59 +03001609 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001610 newenv = os.environ.copy()
1611 newenv["FRUIT=ORANGE"] = "lemon"
1612 with self.assertRaises(ValueError):
1613 os.execve(args[0], args, newenv)
1614
Alexey Izbyshev83460312018-10-20 03:28:22 +03001615 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1616 def test_execve_with_empty_path(self):
1617 # bpo-32890: Check GetLastError() misuse
1618 try:
1619 os.execve('', ['arg'], {})
1620 except OSError as e:
1621 self.assertTrue(e.winerror is None or e.winerror != 0)
1622 else:
1623 self.fail('No OSError raised')
1624
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001625
Serhiy Storchaka43767632013-11-03 21:31:38 +02001626@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001627class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001628 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001629 try:
1630 os.stat(support.TESTFN)
1631 except FileNotFoundError:
1632 exists = False
1633 except OSError as exc:
1634 exists = True
1635 self.fail("file %s must not exist; os.stat failed with %s"
1636 % (support.TESTFN, exc))
1637 else:
1638 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001639
Thomas Wouters477c8d52006-05-27 19:21:47 +00001640 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001641 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001642
1643 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001644 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001645
1646 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001647 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001648
1649 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001650 self.addCleanup(support.unlink, support.TESTFN)
1651
Victor Stinnere77c9742016-03-25 10:28:23 +01001652 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001653 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001654
1655 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001656 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001657
Thomas Wouters477c8d52006-05-27 19:21:47 +00001658 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001659 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001660
Victor Stinnere77c9742016-03-25 10:28:23 +01001661
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001663 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1665 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001666 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001667 def get_single(f):
1668 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001669 if hasattr(os, f):
1670 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001671 return helper
1672 for f in singles:
1673 locals()["test_"+f] = get_single(f)
1674
Benjamin Peterson7522c742009-01-19 21:00:09 +00001675 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001676 try:
1677 f(support.make_bad_fd(), *args)
1678 except OSError as e:
1679 self.assertEqual(e.errno, errno.EBADF)
1680 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001681 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001682 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001683
Serhiy Storchaka43767632013-11-03 21:31:38 +02001684 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001685 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001686 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001687
Serhiy Storchaka43767632013-11-03 21:31:38 +02001688 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001689 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 fd = support.make_bad_fd()
1691 # Make sure none of the descriptors we are about to close are
1692 # currently valid (issue 6542).
1693 for i in range(10):
1694 try: os.fstat(fd+i)
1695 except OSError:
1696 pass
1697 else:
1698 break
1699 if i < 2:
1700 raise unittest.SkipTest(
1701 "Unable to acquire a range of invalid file descriptors")
1702 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001703
Serhiy Storchaka43767632013-11-03 21:31:38 +02001704 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001705 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001707
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001709 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001711
Serhiy Storchaka43767632013-11-03 21:31:38 +02001712 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001713 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001714 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001715
Serhiy Storchaka43767632013-11-03 21:31:38 +02001716 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001717 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001718 self.check(os.pathconf, "PC_NAME_MAX")
1719 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001720
Serhiy Storchaka43767632013-11-03 21:31:38 +02001721 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001722 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001723 self.check(os.truncate, 0)
1724 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001725
Serhiy Storchaka43767632013-11-03 21:31:38 +02001726 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001727 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001728 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001729
Serhiy Storchaka43767632013-11-03 21:31:38 +02001730 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001731 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001732 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001733
Victor Stinner57ddf782014-01-08 15:21:28 +01001734 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1735 def test_readv(self):
1736 buf = bytearray(10)
1737 self.check(os.readv, [buf])
1738
Serhiy Storchaka43767632013-11-03 21:31:38 +02001739 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001740 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001741 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001742
Serhiy Storchaka43767632013-11-03 21:31:38 +02001743 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001744 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001745 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001746
Victor Stinner57ddf782014-01-08 15:21:28 +01001747 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1748 def test_writev(self):
1749 self.check(os.writev, [b'abc'])
1750
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001751 def test_inheritable(self):
1752 self.check(os.get_inheritable)
1753 self.check(os.set_inheritable, True)
1754
1755 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1756 'needs os.get_blocking() and os.set_blocking()')
1757 def test_blocking(self):
1758 self.check(os.get_blocking)
1759 self.check(os.set_blocking, True)
1760
Brian Curtin1b9df392010-11-24 20:24:31 +00001761
1762class LinkTests(unittest.TestCase):
1763 def setUp(self):
1764 self.file1 = support.TESTFN
1765 self.file2 = os.path.join(support.TESTFN + "2")
1766
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001767 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001768 for file in (self.file1, self.file2):
1769 if os.path.exists(file):
1770 os.unlink(file)
1771
Brian Curtin1b9df392010-11-24 20:24:31 +00001772 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001773 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001774
xdegaye6a55d092017-11-12 17:57:04 +01001775 try:
1776 os.link(file1, file2)
1777 except PermissionError as e:
1778 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001779 with open(file1, "r") as f1, open(file2, "r") as f2:
1780 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1781
1782 def test_link(self):
1783 self._test_link(self.file1, self.file2)
1784
1785 def test_link_bytes(self):
1786 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1787 bytes(self.file2, sys.getfilesystemencoding()))
1788
Brian Curtinf498b752010-11-30 15:54:04 +00001789 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001790 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001791 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001792 except UnicodeError:
1793 raise unittest.SkipTest("Unable to encode for this platform.")
1794
Brian Curtinf498b752010-11-30 15:54:04 +00001795 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001796 self.file2 = self.file1 + "2"
1797 self._test_link(self.file1, self.file2)
1798
Serhiy Storchaka43767632013-11-03 21:31:38 +02001799@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1800class PosixUidGidTests(unittest.TestCase):
1801 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1802 def test_setuid(self):
1803 if os.getuid() != 0:
1804 self.assertRaises(OSError, os.setuid, 0)
1805 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001806
Serhiy Storchaka43767632013-11-03 21:31:38 +02001807 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1808 def test_setgid(self):
1809 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1810 self.assertRaises(OSError, os.setgid, 0)
1811 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001812
Serhiy Storchaka43767632013-11-03 21:31:38 +02001813 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1814 def test_seteuid(self):
1815 if os.getuid() != 0:
1816 self.assertRaises(OSError, os.seteuid, 0)
1817 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001818
Serhiy Storchaka43767632013-11-03 21:31:38 +02001819 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1820 def test_setegid(self):
1821 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1822 self.assertRaises(OSError, os.setegid, 0)
1823 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001824
Serhiy Storchaka43767632013-11-03 21:31:38 +02001825 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1826 def test_setreuid(self):
1827 if os.getuid() != 0:
1828 self.assertRaises(OSError, os.setreuid, 0, 0)
1829 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1830 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001831
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1833 def test_setreuid_neg1(self):
1834 # Needs to accept -1. We run this in a subprocess to avoid
1835 # altering the test runner's process state (issue8045).
1836 subprocess.check_call([
1837 sys.executable, '-c',
1838 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001839
Serhiy Storchaka43767632013-11-03 21:31:38 +02001840 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1841 def test_setregid(self):
1842 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1843 self.assertRaises(OSError, os.setregid, 0, 0)
1844 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1845 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001846
Serhiy Storchaka43767632013-11-03 21:31:38 +02001847 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1848 def test_setregid_neg1(self):
1849 # Needs to accept -1. We run this in a subprocess to avoid
1850 # altering the test runner's process state (issue8045).
1851 subprocess.check_call([
1852 sys.executable, '-c',
1853 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001854
Serhiy Storchaka43767632013-11-03 21:31:38 +02001855@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1856class Pep383Tests(unittest.TestCase):
1857 def setUp(self):
1858 if support.TESTFN_UNENCODABLE:
1859 self.dir = support.TESTFN_UNENCODABLE
1860 elif support.TESTFN_NONASCII:
1861 self.dir = support.TESTFN_NONASCII
1862 else:
1863 self.dir = support.TESTFN
1864 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001865
Serhiy Storchaka43767632013-11-03 21:31:38 +02001866 bytesfn = []
1867 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001868 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001869 fn = os.fsencode(fn)
1870 except UnicodeEncodeError:
1871 return
1872 bytesfn.append(fn)
1873 add_filename(support.TESTFN_UNICODE)
1874 if support.TESTFN_UNENCODABLE:
1875 add_filename(support.TESTFN_UNENCODABLE)
1876 if support.TESTFN_NONASCII:
1877 add_filename(support.TESTFN_NONASCII)
1878 if not bytesfn:
1879 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001880
Serhiy Storchaka43767632013-11-03 21:31:38 +02001881 self.unicodefn = set()
1882 os.mkdir(self.dir)
1883 try:
1884 for fn in bytesfn:
1885 support.create_empty_file(os.path.join(self.bdir, fn))
1886 fn = os.fsdecode(fn)
1887 if fn in self.unicodefn:
1888 raise ValueError("duplicate filename")
1889 self.unicodefn.add(fn)
1890 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001891 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001892 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001893
Serhiy Storchaka43767632013-11-03 21:31:38 +02001894 def tearDown(self):
1895 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001896
Serhiy Storchaka43767632013-11-03 21:31:38 +02001897 def test_listdir(self):
1898 expected = self.unicodefn
1899 found = set(os.listdir(self.dir))
1900 self.assertEqual(found, expected)
1901 # test listdir without arguments
1902 current_directory = os.getcwd()
1903 try:
1904 os.chdir(os.sep)
1905 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1906 finally:
1907 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001908
Serhiy Storchaka43767632013-11-03 21:31:38 +02001909 def test_open(self):
1910 for fn in self.unicodefn:
1911 f = open(os.path.join(self.dir, fn), 'rb')
1912 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001913
Serhiy Storchaka43767632013-11-03 21:31:38 +02001914 @unittest.skipUnless(hasattr(os, 'statvfs'),
1915 "need os.statvfs()")
1916 def test_statvfs(self):
1917 # issue #9645
1918 for fn in self.unicodefn:
1919 # should not fail with file not found error
1920 fullname = os.path.join(self.dir, fn)
1921 os.statvfs(fullname)
1922
1923 def test_stat(self):
1924 for fn in self.unicodefn:
1925 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001926
Brian Curtineb24d742010-04-12 17:16:38 +00001927@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1928class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001929 def _kill(self, sig):
1930 # Start sys.executable as a subprocess and communicate from the
1931 # subprocess to the parent that the interpreter is ready. When it
1932 # becomes ready, send *sig* via os.kill to the subprocess and check
1933 # that the return code is equal to *sig*.
1934 import ctypes
1935 from ctypes import wintypes
1936 import msvcrt
1937
1938 # Since we can't access the contents of the process' stdout until the
1939 # process has exited, use PeekNamedPipe to see what's inside stdout
1940 # without waiting. This is done so we can tell that the interpreter
1941 # is started and running at a point where it could handle a signal.
1942 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1943 PeekNamedPipe.restype = wintypes.BOOL
1944 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1945 ctypes.POINTER(ctypes.c_char), # stdout buf
1946 wintypes.DWORD, # Buffer size
1947 ctypes.POINTER(wintypes.DWORD), # bytes read
1948 ctypes.POINTER(wintypes.DWORD), # bytes avail
1949 ctypes.POINTER(wintypes.DWORD)) # bytes left
1950 msg = "running"
1951 proc = subprocess.Popen([sys.executable, "-c",
1952 "import sys;"
1953 "sys.stdout.write('{}');"
1954 "sys.stdout.flush();"
1955 "input()".format(msg)],
1956 stdout=subprocess.PIPE,
1957 stderr=subprocess.PIPE,
1958 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001959 self.addCleanup(proc.stdout.close)
1960 self.addCleanup(proc.stderr.close)
1961 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001962
1963 count, max = 0, 100
1964 while count < max and proc.poll() is None:
1965 # Create a string buffer to store the result of stdout from the pipe
1966 buf = ctypes.create_string_buffer(len(msg))
1967 # Obtain the text currently in proc.stdout
1968 # Bytes read/avail/left are left as NULL and unused
1969 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1970 buf, ctypes.sizeof(buf), None, None, None)
1971 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1972 if buf.value:
1973 self.assertEqual(msg, buf.value.decode())
1974 break
1975 time.sleep(0.1)
1976 count += 1
1977 else:
1978 self.fail("Did not receive communication from the subprocess")
1979
Brian Curtineb24d742010-04-12 17:16:38 +00001980 os.kill(proc.pid, sig)
1981 self.assertEqual(proc.wait(), sig)
1982
1983 def test_kill_sigterm(self):
1984 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001985 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001986
1987 def test_kill_int(self):
1988 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001989 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001990
1991 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001992 tagname = "test_os_%s" % uuid.uuid1()
1993 m = mmap.mmap(-1, 1, tagname)
1994 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001995 # Run a script which has console control handling enabled.
1996 proc = subprocess.Popen([sys.executable,
1997 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001998 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001999 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2000 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002001 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002002 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002003 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002004 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002005 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002006 count += 1
2007 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002008 # Forcefully kill the process if we weren't able to signal it.
2009 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002010 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002011 os.kill(proc.pid, event)
2012 # proc.send_signal(event) could also be done here.
2013 # Allow time for the signal to be passed and the process to exit.
2014 time.sleep(0.5)
2015 if not proc.poll():
2016 # Forcefully kill the process if we weren't able to signal it.
2017 os.kill(proc.pid, signal.SIGINT)
2018 self.fail("subprocess did not stop on {}".format(name))
2019
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002020 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002021 def test_CTRL_C_EVENT(self):
2022 from ctypes import wintypes
2023 import ctypes
2024
2025 # Make a NULL value by creating a pointer with no argument.
2026 NULL = ctypes.POINTER(ctypes.c_int)()
2027 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2028 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2029 wintypes.BOOL)
2030 SetConsoleCtrlHandler.restype = wintypes.BOOL
2031
2032 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002033 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002034 # by subprocesses.
2035 SetConsoleCtrlHandler(NULL, 0)
2036
2037 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2038
2039 def test_CTRL_BREAK_EVENT(self):
2040 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2041
2042
Brian Curtind40e6f72010-07-08 21:39:08 +00002043@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002044class Win32ListdirTests(unittest.TestCase):
2045 """Test listdir on Windows."""
2046
2047 def setUp(self):
2048 self.created_paths = []
2049 for i in range(2):
2050 dir_name = 'SUB%d' % i
2051 dir_path = os.path.join(support.TESTFN, dir_name)
2052 file_name = 'FILE%d' % i
2053 file_path = os.path.join(support.TESTFN, file_name)
2054 os.makedirs(dir_path)
2055 with open(file_path, 'w') as f:
2056 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2057 self.created_paths.extend([dir_name, file_name])
2058 self.created_paths.sort()
2059
2060 def tearDown(self):
2061 shutil.rmtree(support.TESTFN)
2062
2063 def test_listdir_no_extended_path(self):
2064 """Test when the path is not an "extended" path."""
2065 # unicode
2066 self.assertEqual(
2067 sorted(os.listdir(support.TESTFN)),
2068 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002069
Tim Golden781bbeb2013-10-25 20:24:06 +01002070 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002071 self.assertEqual(
2072 sorted(os.listdir(os.fsencode(support.TESTFN))),
2073 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002074
2075 def test_listdir_extended_path(self):
2076 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002077 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002078 # unicode
2079 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2080 self.assertEqual(
2081 sorted(os.listdir(path)),
2082 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002083
Tim Golden781bbeb2013-10-25 20:24:06 +01002084 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002085 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2086 self.assertEqual(
2087 sorted(os.listdir(path)),
2088 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002089
2090
Berker Peksage0b5b202018-08-15 13:03:41 +03002091@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2092class ReadlinkTests(unittest.TestCase):
2093 filelink = 'readlinktest'
2094 filelink_target = os.path.abspath(__file__)
2095 filelinkb = os.fsencode(filelink)
2096 filelinkb_target = os.fsencode(filelink_target)
2097
2098 def setUp(self):
2099 self.assertTrue(os.path.exists(self.filelink_target))
2100 self.assertTrue(os.path.exists(self.filelinkb_target))
2101 self.assertFalse(os.path.exists(self.filelink))
2102 self.assertFalse(os.path.exists(self.filelinkb))
2103
2104 def test_not_symlink(self):
2105 filelink_target = FakePath(self.filelink_target)
2106 self.assertRaises(OSError, os.readlink, self.filelink_target)
2107 self.assertRaises(OSError, os.readlink, filelink_target)
2108
2109 def test_missing_link(self):
2110 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2111 self.assertRaises(FileNotFoundError, os.readlink,
2112 FakePath('missing-link'))
2113
2114 @support.skip_unless_symlink
2115 def test_pathlike(self):
2116 os.symlink(self.filelink_target, self.filelink)
2117 self.addCleanup(support.unlink, self.filelink)
2118 filelink = FakePath(self.filelink)
2119 self.assertEqual(os.readlink(filelink), self.filelink_target)
2120
2121 @support.skip_unless_symlink
2122 def test_pathlike_bytes(self):
2123 os.symlink(self.filelinkb_target, self.filelinkb)
2124 self.addCleanup(support.unlink, self.filelinkb)
2125 path = os.readlink(FakePath(self.filelinkb))
2126 self.assertEqual(path, self.filelinkb_target)
2127 self.assertIsInstance(path, bytes)
2128
2129 @support.skip_unless_symlink
2130 def test_bytes(self):
2131 os.symlink(self.filelinkb_target, self.filelinkb)
2132 self.addCleanup(support.unlink, self.filelinkb)
2133 path = os.readlink(self.filelinkb)
2134 self.assertEqual(path, self.filelinkb_target)
2135 self.assertIsInstance(path, bytes)
2136
2137
Tim Golden781bbeb2013-10-25 20:24:06 +01002138@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002139@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002140class Win32SymlinkTests(unittest.TestCase):
2141 filelink = 'filelinktest'
2142 filelink_target = os.path.abspath(__file__)
2143 dirlink = 'dirlinktest'
2144 dirlink_target = os.path.dirname(filelink_target)
2145 missing_link = 'missing link'
2146
2147 def setUp(self):
2148 assert os.path.exists(self.dirlink_target)
2149 assert os.path.exists(self.filelink_target)
2150 assert not os.path.exists(self.dirlink)
2151 assert not os.path.exists(self.filelink)
2152 assert not os.path.exists(self.missing_link)
2153
2154 def tearDown(self):
2155 if os.path.exists(self.filelink):
2156 os.remove(self.filelink)
2157 if os.path.exists(self.dirlink):
2158 os.rmdir(self.dirlink)
2159 if os.path.lexists(self.missing_link):
2160 os.remove(self.missing_link)
2161
2162 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002163 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002164 self.assertTrue(os.path.exists(self.dirlink))
2165 self.assertTrue(os.path.isdir(self.dirlink))
2166 self.assertTrue(os.path.islink(self.dirlink))
2167 self.check_stat(self.dirlink, self.dirlink_target)
2168
2169 def test_file_link(self):
2170 os.symlink(self.filelink_target, self.filelink)
2171 self.assertTrue(os.path.exists(self.filelink))
2172 self.assertTrue(os.path.isfile(self.filelink))
2173 self.assertTrue(os.path.islink(self.filelink))
2174 self.check_stat(self.filelink, self.filelink_target)
2175
2176 def _create_missing_dir_link(self):
2177 'Create a "directory" link to a non-existent target'
2178 linkname = self.missing_link
2179 if os.path.lexists(linkname):
2180 os.remove(linkname)
2181 target = r'c:\\target does not exist.29r3c740'
2182 assert not os.path.exists(target)
2183 target_is_dir = True
2184 os.symlink(target, linkname, target_is_dir)
2185
2186 def test_remove_directory_link_to_missing_target(self):
2187 self._create_missing_dir_link()
2188 # For compatibility with Unix, os.remove will check the
2189 # directory status and call RemoveDirectory if the symlink
2190 # was created with target_is_dir==True.
2191 os.remove(self.missing_link)
2192
2193 @unittest.skip("currently fails; consider for improvement")
2194 def test_isdir_on_directory_link_to_missing_target(self):
2195 self._create_missing_dir_link()
2196 # consider having isdir return true for directory links
2197 self.assertTrue(os.path.isdir(self.missing_link))
2198
2199 @unittest.skip("currently fails; consider for improvement")
2200 def test_rmdir_on_directory_link_to_missing_target(self):
2201 self._create_missing_dir_link()
2202 # consider allowing rmdir to remove directory links
2203 os.rmdir(self.missing_link)
2204
2205 def check_stat(self, link, target):
2206 self.assertEqual(os.stat(link), os.stat(target))
2207 self.assertNotEqual(os.lstat(link), os.stat(link))
2208
Brian Curtind25aef52011-06-13 15:16:04 -05002209 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002210 self.assertEqual(os.stat(bytes_link), os.stat(target))
2211 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002212
2213 def test_12084(self):
2214 level1 = os.path.abspath(support.TESTFN)
2215 level2 = os.path.join(level1, "level2")
2216 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002217 self.addCleanup(support.rmtree, level1)
2218
2219 os.mkdir(level1)
2220 os.mkdir(level2)
2221 os.mkdir(level3)
2222
2223 file1 = os.path.abspath(os.path.join(level1, "file1"))
2224 create_file(file1)
2225
2226 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002227 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002228 os.chdir(level2)
2229 link = os.path.join(level2, "link")
2230 os.symlink(os.path.relpath(file1), "link")
2231 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002232
Victor Stinnerae39d232016-03-24 17:12:55 +01002233 # Check os.stat calls from the same dir as the link
2234 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002235
Victor Stinnerae39d232016-03-24 17:12:55 +01002236 # Check os.stat calls from a dir below the link
2237 os.chdir(level1)
2238 self.assertEqual(os.stat(file1),
2239 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002240
Victor Stinnerae39d232016-03-24 17:12:55 +01002241 # Check os.stat calls from a dir above the link
2242 os.chdir(level3)
2243 self.assertEqual(os.stat(file1),
2244 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002245 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002246 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002247
SSE43c34aad2018-02-13 00:10:35 +07002248 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2249 and os.path.exists(r'C:\ProgramData'),
2250 'Test directories not found')
2251 def test_29248(self):
2252 # os.symlink() calls CreateSymbolicLink, which creates
2253 # the reparse data buffer with the print name stored
2254 # first, so the offset is always 0. CreateSymbolicLink
2255 # stores the "PrintName" DOS path (e.g. "C:\") first,
2256 # with an offset of 0, followed by the "SubstituteName"
2257 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2258 # the other hand, seems to have been created manually
2259 # with an inverted order.
2260 target = os.readlink(r'C:\Users\All Users')
2261 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2262
Steve Dower6921e732018-03-05 14:26:08 -08002263 def test_buffer_overflow(self):
2264 # Older versions would have a buffer overflow when detecting
2265 # whether a link source was a directory. This test ensures we
2266 # no longer crash, but does not otherwise validate the behavior
2267 segment = 'X' * 27
2268 path = os.path.join(*[segment] * 10)
2269 test_cases = [
2270 # overflow with absolute src
2271 ('\\' + path, segment),
2272 # overflow dest with relative src
2273 (segment, path),
2274 # overflow when joining src
2275 (path[:180], path[:180]),
2276 ]
2277 for src, dest in test_cases:
2278 try:
2279 os.symlink(src, dest)
2280 except FileNotFoundError:
2281 pass
2282 else:
2283 try:
2284 os.remove(dest)
2285 except OSError:
2286 pass
2287 # Also test with bytes, since that is a separate code path.
2288 try:
2289 os.symlink(os.fsencode(src), os.fsencode(dest))
2290 except FileNotFoundError:
2291 pass
2292 else:
2293 try:
2294 os.remove(dest)
2295 except OSError:
2296 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002297
Tim Golden0321cf22014-05-05 19:46:17 +01002298@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2299class Win32JunctionTests(unittest.TestCase):
2300 junction = 'junctiontest'
2301 junction_target = os.path.dirname(os.path.abspath(__file__))
2302
2303 def setUp(self):
2304 assert os.path.exists(self.junction_target)
2305 assert not os.path.exists(self.junction)
2306
2307 def tearDown(self):
2308 if os.path.exists(self.junction):
2309 # os.rmdir delegates to Windows' RemoveDirectoryW,
2310 # which removes junction points safely.
2311 os.rmdir(self.junction)
2312
2313 def test_create_junction(self):
2314 _winapi.CreateJunction(self.junction_target, self.junction)
2315 self.assertTrue(os.path.exists(self.junction))
2316 self.assertTrue(os.path.isdir(self.junction))
2317
2318 # Junctions are not recognized as links.
2319 self.assertFalse(os.path.islink(self.junction))
2320
2321 def test_unlink_removes_junction(self):
2322 _winapi.CreateJunction(self.junction_target, self.junction)
2323 self.assertTrue(os.path.exists(self.junction))
2324
2325 os.unlink(self.junction)
2326 self.assertFalse(os.path.exists(self.junction))
2327
Mark Becwarb82bfac2019-02-02 16:08:23 -05002328@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2329class Win32NtTests(unittest.TestCase):
2330 def setUp(self):
2331 from test import support
2332 self.nt = support.import_module('nt')
2333 pass
2334
2335 def tearDown(self):
2336 pass
2337
2338 def test_getfinalpathname_handles(self):
2339 try:
2340 import ctypes, ctypes.wintypes
2341 except ImportError:
2342 raise unittest.SkipTest('ctypes module is required for this test')
2343
2344 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2345 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2346
2347 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2348 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2349 ctypes.wintypes.LPDWORD)
2350
2351 # This is a pseudo-handle that doesn't need to be closed
2352 hproc = kernel.GetCurrentProcess()
2353
2354 handle_count = ctypes.wintypes.DWORD()
2355 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2356 self.assertEqual(1, ok)
2357
2358 before_count = handle_count.value
2359
2360 # The first two test the error path, __file__ tests the success path
2361 filenames = [ r'\\?\C:',
2362 r'\\?\NUL',
2363 r'\\?\CONIN',
2364 __file__ ]
2365
2366 for i in range(10):
2367 for name in filenames:
2368 try:
2369 tmp = self.nt._getfinalpathname(name)
2370 except:
2371 # Failure is expected
2372 pass
2373 try:
2374 tmp = os.stat(name)
2375 except:
2376 pass
2377
2378 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2379 self.assertEqual(1, ok)
2380
2381 handle_delta = handle_count.value - before_count
2382
2383 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002384
Jason R. Coombs3a092862013-05-27 23:21:28 -04002385@support.skip_unless_symlink
2386class NonLocalSymlinkTests(unittest.TestCase):
2387
2388 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002389 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002390 Create this structure:
2391
2392 base
2393 \___ some_dir
2394 """
2395 os.makedirs('base/some_dir')
2396
2397 def tearDown(self):
2398 shutil.rmtree('base')
2399
2400 def test_directory_link_nonlocal(self):
2401 """
2402 The symlink target should resolve relative to the link, not relative
2403 to the current directory.
2404
2405 Then, link base/some_link -> base/some_dir and ensure that some_link
2406 is resolved as a directory.
2407
2408 In issue13772, it was discovered that directory detection failed if
2409 the symlink target was not specified relative to the current
2410 directory, which was a defect in the implementation.
2411 """
2412 src = os.path.join('base', 'some_link')
2413 os.symlink('some_dir', src)
2414 assert os.path.isdir(src)
2415
2416
Victor Stinnere8d51452010-08-19 01:05:19 +00002417class FSEncodingTests(unittest.TestCase):
2418 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002419 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2420 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002421
Victor Stinnere8d51452010-08-19 01:05:19 +00002422 def test_identity(self):
2423 # assert fsdecode(fsencode(x)) == x
2424 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2425 try:
2426 bytesfn = os.fsencode(fn)
2427 except UnicodeEncodeError:
2428 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002430
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002431
Brett Cannonefb00c02012-02-29 18:31:31 -05002432
2433class DeviceEncodingTests(unittest.TestCase):
2434
2435 def test_bad_fd(self):
2436 # Return None when an fd doesn't actually exist.
2437 self.assertIsNone(os.device_encoding(123456))
2438
Philip Jenveye308b7c2012-02-29 16:16:15 -08002439 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2440 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002441 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002442 def test_device_encoding(self):
2443 encoding = os.device_encoding(0)
2444 self.assertIsNotNone(encoding)
2445 self.assertTrue(codecs.lookup(encoding))
2446
2447
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002448class PidTests(unittest.TestCase):
2449 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2450 def test_getppid(self):
2451 p = subprocess.Popen([sys.executable, '-c',
2452 'import os; print(os.getppid())'],
2453 stdout=subprocess.PIPE)
2454 stdout, _ = p.communicate()
2455 # We are the parent of our subprocess
2456 self.assertEqual(int(stdout), os.getpid())
2457
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002458 def test_waitpid(self):
2459 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002460 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002461 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002462 status = os.waitpid(pid, 0)
2463 self.assertEqual(status, (pid, 0))
2464
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002465
Victor Stinner4659ccf2016-09-14 10:57:00 +02002466class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002467 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002468 self.exitcode = 17
2469
2470 filename = support.TESTFN
2471 self.addCleanup(support.unlink, filename)
2472
2473 if not with_env:
2474 code = 'import sys; sys.exit(%s)' % self.exitcode
2475 else:
2476 self.env = dict(os.environ)
2477 # create an unique key
2478 self.key = str(uuid.uuid4())
2479 self.env[self.key] = self.key
2480 # read the variable from os.environ to check that it exists
2481 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2482 % (self.key, self.exitcode))
2483
2484 with open(filename, "w") as fp:
2485 fp.write(code)
2486
Berker Peksag81816462016-09-15 20:19:47 +03002487 args = [sys.executable, filename]
2488 if use_bytes:
2489 args = [os.fsencode(a) for a in args]
2490 self.env = {os.fsencode(k): os.fsencode(v)
2491 for k, v in self.env.items()}
2492
2493 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002494
Berker Peksag4af23d72016-09-15 20:32:44 +03002495 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002496 def test_spawnl(self):
2497 args = self.create_args()
2498 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2499 self.assertEqual(exitcode, self.exitcode)
2500
Berker Peksag4af23d72016-09-15 20:32:44 +03002501 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002502 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002503 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002504 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2505 self.assertEqual(exitcode, self.exitcode)
2506
Berker Peksag4af23d72016-09-15 20:32:44 +03002507 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002508 def test_spawnlp(self):
2509 args = self.create_args()
2510 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2511 self.assertEqual(exitcode, self.exitcode)
2512
Berker Peksag4af23d72016-09-15 20:32:44 +03002513 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002514 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002515 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002516 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2517 self.assertEqual(exitcode, self.exitcode)
2518
Berker Peksag4af23d72016-09-15 20:32:44 +03002519 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002520 def test_spawnv(self):
2521 args = self.create_args()
2522 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2523 self.assertEqual(exitcode, self.exitcode)
2524
Berker Peksag4af23d72016-09-15 20:32:44 +03002525 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002526 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002527 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002528 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2529 self.assertEqual(exitcode, self.exitcode)
2530
Berker Peksag4af23d72016-09-15 20:32:44 +03002531 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002532 def test_spawnvp(self):
2533 args = self.create_args()
2534 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2535 self.assertEqual(exitcode, self.exitcode)
2536
Berker Peksag4af23d72016-09-15 20:32:44 +03002537 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002538 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002539 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002540 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2541 self.assertEqual(exitcode, self.exitcode)
2542
Berker Peksag4af23d72016-09-15 20:32:44 +03002543 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002544 def test_nowait(self):
2545 args = self.create_args()
2546 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2547 result = os.waitpid(pid, 0)
2548 self.assertEqual(result[0], pid)
2549 status = result[1]
2550 if hasattr(os, 'WIFEXITED'):
2551 self.assertTrue(os.WIFEXITED(status))
2552 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2553 else:
2554 self.assertEqual(status, self.exitcode << 8)
2555
Berker Peksag4af23d72016-09-15 20:32:44 +03002556 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002557 def test_spawnve_bytes(self):
2558 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2559 args = self.create_args(with_env=True, use_bytes=True)
2560 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2561 self.assertEqual(exitcode, self.exitcode)
2562
Steve Dower859fd7b2016-11-19 18:53:19 -08002563 @requires_os_func('spawnl')
2564 def test_spawnl_noargs(self):
2565 args = self.create_args()
2566 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002567 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002568
2569 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002570 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002571 args = self.create_args()
2572 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002573 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002574
2575 @requires_os_func('spawnv')
2576 def test_spawnv_noargs(self):
2577 args = self.create_args()
2578 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2579 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002580 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2581 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002582
2583 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002584 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002585 args = self.create_args()
2586 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2587 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002588 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2589 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002590
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002591 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002592 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002593
Ville Skyttä49b27342017-08-03 09:00:59 +03002594 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002595 newenv = os.environ.copy()
2596 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2597 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002598 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002599 except ValueError:
2600 pass
2601 else:
2602 self.assertEqual(exitcode, 127)
2603
Ville Skyttä49b27342017-08-03 09:00:59 +03002604 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002605 newenv = os.environ.copy()
2606 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2607 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002608 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002609 except ValueError:
2610 pass
2611 else:
2612 self.assertEqual(exitcode, 127)
2613
Ville Skyttä49b27342017-08-03 09:00:59 +03002614 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002615 newenv = os.environ.copy()
2616 newenv["FRUIT=ORANGE"] = "lemon"
2617 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002618 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002619 except ValueError:
2620 pass
2621 else:
2622 self.assertEqual(exitcode, 127)
2623
Ville Skyttä49b27342017-08-03 09:00:59 +03002624 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002625 filename = support.TESTFN
2626 self.addCleanup(support.unlink, filename)
2627 with open(filename, "w") as fp:
2628 fp.write('import sys, os\n'
2629 'if os.getenv("FRUIT") != "orange=lemon":\n'
2630 ' raise AssertionError')
2631 args = [sys.executable, filename]
2632 newenv = os.environ.copy()
2633 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002634 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002635 self.assertEqual(exitcode, 0)
2636
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002637 @requires_os_func('spawnve')
2638 def test_spawnve_invalid_env(self):
2639 self._test_invalid_env(os.spawnve)
2640
2641 @requires_os_func('spawnvpe')
2642 def test_spawnvpe_invalid_env(self):
2643 self._test_invalid_env(os.spawnvpe)
2644
Serhiy Storchaka77703942017-06-25 07:33:01 +03002645
Brian Curtin0151b8e2010-09-24 13:43:43 +00002646# The introduction of this TestCase caused at least two different errors on
2647# *nix buildbots. Temporarily skip this to let the buildbots move along.
2648@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002649@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2650class LoginTests(unittest.TestCase):
2651 def test_getlogin(self):
2652 user_name = os.getlogin()
2653 self.assertNotEqual(len(user_name), 0)
2654
2655
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002656@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2657 "needs os.getpriority and os.setpriority")
2658class ProgramPriorityTests(unittest.TestCase):
2659 """Tests for os.getpriority() and os.setpriority()."""
2660
2661 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002662
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002663 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2664 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2665 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002666 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2667 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002668 raise unittest.SkipTest("unable to reliably test setpriority "
2669 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002670 else:
2671 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002672 finally:
2673 try:
2674 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2675 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002676 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002677 raise
2678
2679
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002681
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002682 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002683
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002684 def __init__(self, conn):
2685 asynchat.async_chat.__init__(self, conn)
2686 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002687 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002688 self.closed = False
2689 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002690
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002691 def handle_read(self):
2692 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002693 if self.accumulate:
2694 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002695
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002696 def get_data(self):
2697 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002698
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002699 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002700 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002702
2703 def handle_error(self):
2704 raise
2705
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 def __init__(self, address):
2707 threading.Thread.__init__(self)
2708 asyncore.dispatcher.__init__(self)
2709 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2710 self.bind(address)
2711 self.listen(5)
2712 self.host, self.port = self.socket.getsockname()[:2]
2713 self.handler_instance = None
2714 self._active = False
2715 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002716
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002717 # --- public API
2718
2719 @property
2720 def running(self):
2721 return self._active
2722
2723 def start(self):
2724 assert not self.running
2725 self.__flag = threading.Event()
2726 threading.Thread.start(self)
2727 self.__flag.wait()
2728
2729 def stop(self):
2730 assert self.running
2731 self._active = False
2732 self.join()
2733
2734 def wait(self):
2735 # wait for handler connection to be closed, then stop the server
2736 while not getattr(self.handler_instance, "closed", False):
2737 time.sleep(0.001)
2738 self.stop()
2739
2740 # --- internals
2741
2742 def run(self):
2743 self._active = True
2744 self.__flag.set()
2745 while self._active and asyncore.socket_map:
2746 self._active_lock.acquire()
2747 asyncore.loop(timeout=0.001, count=1)
2748 self._active_lock.release()
2749 asyncore.close_all()
2750
2751 def handle_accept(self):
2752 conn, addr = self.accept()
2753 self.handler_instance = self.Handler(conn)
2754
2755 def handle_connect(self):
2756 self.close()
2757 handle_read = handle_connect
2758
2759 def writable(self):
2760 return 0
2761
2762 def handle_error(self):
2763 raise
2764
2765
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002766@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2767class TestSendfile(unittest.TestCase):
2768
Victor Stinner8c663fd2017-11-08 14:44:44 -08002769 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002770 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002771 not sys.platform.startswith("solaris") and \
2772 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002773 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2774 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002775 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2776 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002777
2778 @classmethod
2779 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002780 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002781 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002782
2783 @classmethod
2784 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002785 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002786 support.unlink(support.TESTFN)
2787
2788 def setUp(self):
2789 self.server = SendfileTestServer((support.HOST, 0))
2790 self.server.start()
2791 self.client = socket.socket()
2792 self.client.connect((self.server.host, self.server.port))
2793 self.client.settimeout(1)
2794 # synchronize by waiting for "220 ready" response
2795 self.client.recv(1024)
2796 self.sockno = self.client.fileno()
2797 self.file = open(support.TESTFN, 'rb')
2798 self.fileno = self.file.fileno()
2799
2800 def tearDown(self):
2801 self.file.close()
2802 self.client.close()
2803 if self.server.running:
2804 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002805 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002806
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002807 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002808 """A higher level wrapper representing how an application is
2809 supposed to use sendfile().
2810 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002811 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002812 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002813 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002814 except OSError as err:
2815 if err.errno == errno.ECONNRESET:
2816 # disconnected
2817 raise
2818 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2819 # we have to retry send data
2820 continue
2821 else:
2822 raise
2823
2824 def test_send_whole_file(self):
2825 # normal send
2826 total_sent = 0
2827 offset = 0
2828 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002829 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002830 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2831 if sent == 0:
2832 break
2833 offset += sent
2834 total_sent += sent
2835 self.assertTrue(sent <= nbytes)
2836 self.assertEqual(offset, total_sent)
2837
2838 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002839 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002840 self.client.close()
2841 self.server.wait()
2842 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002843 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002844 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002845
2846 def test_send_at_certain_offset(self):
2847 # start sending a file at a certain offset
2848 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002849 offset = len(self.DATA) // 2
2850 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002851 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002852 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002853 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2854 if sent == 0:
2855 break
2856 offset += sent
2857 total_sent += sent
2858 self.assertTrue(sent <= nbytes)
2859
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002860 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002861 self.client.close()
2862 self.server.wait()
2863 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002864 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002865 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002866 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002867 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002868
2869 def test_offset_overflow(self):
2870 # specify an offset > file size
2871 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002872 try:
2873 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2874 except OSError as e:
2875 # Solaris can raise EINVAL if offset >= file length, ignore.
2876 if e.errno != errno.EINVAL:
2877 raise
2878 else:
2879 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002880 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002881 self.client.close()
2882 self.server.wait()
2883 data = self.server.handler_instance.get_data()
2884 self.assertEqual(data, b'')
2885
2886 def test_invalid_offset(self):
2887 with self.assertRaises(OSError) as cm:
2888 os.sendfile(self.sockno, self.fileno, -1, 4096)
2889 self.assertEqual(cm.exception.errno, errno.EINVAL)
2890
Martin Panterbf19d162015-09-09 01:01:13 +00002891 def test_keywords(self):
2892 # Keyword arguments should be supported
2893 os.sendfile(out=self.sockno, offset=0, count=4096,
2894 **{'in': self.fileno})
2895 if self.SUPPORT_HEADERS_TRAILERS:
2896 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002897 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002898
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002899 # --- headers / trailers tests
2900
Serhiy Storchaka43767632013-11-03 21:31:38 +02002901 @requires_headers_trailers
2902 def test_headers(self):
2903 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002904 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002905 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002906 headers=[b"x" * 512, b"y" * 256])
2907 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002908 total_sent += sent
2909 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002910 while total_sent < len(expected_data):
2911 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002912 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2913 offset, nbytes)
2914 if sent == 0:
2915 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002916 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002917 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002918 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002919
Serhiy Storchaka43767632013-11-03 21:31:38 +02002920 self.assertEqual(total_sent, len(expected_data))
2921 self.client.close()
2922 self.server.wait()
2923 data = self.server.handler_instance.get_data()
2924 self.assertEqual(hash(data), hash(expected_data))
2925
2926 @requires_headers_trailers
2927 def test_trailers(self):
2928 TESTFN2 = support.TESTFN + "2"
2929 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002930
2931 self.addCleanup(support.unlink, TESTFN2)
2932 create_file(TESTFN2, file_data)
2933
2934 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002935 os.sendfile(self.sockno, f.fileno(), 0, 5,
2936 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002937 self.client.close()
2938 self.server.wait()
2939 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002940 self.assertEqual(data, b"abcde123456789")
2941
2942 @requires_headers_trailers
2943 @requires_32b
2944 def test_headers_overflow_32bits(self):
2945 self.server.handler_instance.accumulate = False
2946 with self.assertRaises(OSError) as cm:
2947 os.sendfile(self.sockno, self.fileno, 0, 0,
2948 headers=[b"x" * 2**16] * 2**15)
2949 self.assertEqual(cm.exception.errno, errno.EINVAL)
2950
2951 @requires_headers_trailers
2952 @requires_32b
2953 def test_trailers_overflow_32bits(self):
2954 self.server.handler_instance.accumulate = False
2955 with self.assertRaises(OSError) as cm:
2956 os.sendfile(self.sockno, self.fileno, 0, 0,
2957 trailers=[b"x" * 2**16] * 2**15)
2958 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002959
Serhiy Storchaka43767632013-11-03 21:31:38 +02002960 @requires_headers_trailers
2961 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2962 'test needs os.SF_NODISKIO')
2963 def test_flags(self):
2964 try:
2965 os.sendfile(self.sockno, self.fileno, 0, 4096,
2966 flags=os.SF_NODISKIO)
2967 except OSError as err:
2968 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2969 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002970
2971
Larry Hastings9cf065c2012-06-22 16:30:09 -07002972def supports_extended_attributes():
2973 if not hasattr(os, "setxattr"):
2974 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002975
Larry Hastings9cf065c2012-06-22 16:30:09 -07002976 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002977 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002978 try:
2979 os.setxattr(fp.fileno(), b"user.test", b"")
2980 except OSError:
2981 return False
2982 finally:
2983 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002984
2985 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002986
2987
2988@unittest.skipUnless(supports_extended_attributes(),
2989 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002990# Kernels < 2.6.39 don't respect setxattr flags.
2991@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002992class ExtendedAttributeTests(unittest.TestCase):
2993
Larry Hastings9cf065c2012-06-22 16:30:09 -07002994 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002995 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002996 self.addCleanup(support.unlink, fn)
2997 create_file(fn)
2998
Benjamin Peterson799bd802011-08-31 22:15:17 -04002999 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003000 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003001 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003002
Victor Stinnerf12e5062011-10-16 22:12:03 +02003003 init_xattr = listxattr(fn)
3004 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003005
Larry Hastings9cf065c2012-06-22 16:30:09 -07003006 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003007 xattr = set(init_xattr)
3008 xattr.add("user.test")
3009 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003010 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3011 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3012 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003013
Benjamin Peterson799bd802011-08-31 22:15:17 -04003014 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003015 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003016 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003017
Benjamin Peterson799bd802011-08-31 22:15:17 -04003018 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003019 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003020 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003021
Larry Hastings9cf065c2012-06-22 16:30:09 -07003022 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003023 xattr.add("user.test2")
3024 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003025 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003026
Benjamin Peterson799bd802011-08-31 22:15:17 -04003027 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003028 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003029 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003030
Victor Stinnerf12e5062011-10-16 22:12:03 +02003031 xattr.remove("user.test")
3032 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003033 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3034 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3035 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3036 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003037 many = sorted("user.test{}".format(i) for i in range(100))
3038 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003039 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003040 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003041
Larry Hastings9cf065c2012-06-22 16:30:09 -07003042 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003043 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003044 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003045
3046 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3047 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003048
3049 def test_simple(self):
3050 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3051 os.listxattr)
3052
3053 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003054 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3055 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003056
3057 def test_fds(self):
3058 def getxattr(path, *args):
3059 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003060 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003061 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003062 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003063 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003064 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003065 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003066 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003067 def listxattr(path, *args):
3068 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003069 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003070 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3071
3072
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003073@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3074class TermsizeTests(unittest.TestCase):
3075 def test_does_not_crash(self):
3076 """Check if get_terminal_size() returns a meaningful value.
3077
3078 There's no easy portable way to actually check the size of the
3079 terminal, so let's check if it returns something sensible instead.
3080 """
3081 try:
3082 size = os.get_terminal_size()
3083 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003084 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003085 # Under win32 a generic OSError can be thrown if the
3086 # handle cannot be retrieved
3087 self.skipTest("failed to query terminal size")
3088 raise
3089
Antoine Pitroucfade362012-02-08 23:48:59 +01003090 self.assertGreaterEqual(size.columns, 0)
3091 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003092
3093 def test_stty_match(self):
3094 """Check if stty returns the same results
3095
3096 stty actually tests stdin, so get_terminal_size is invoked on
3097 stdin explicitly. If stty succeeded, then get_terminal_size()
3098 should work too.
3099 """
3100 try:
3101 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003102 except (FileNotFoundError, subprocess.CalledProcessError,
3103 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003104 self.skipTest("stty invocation failed")
3105 expected = (int(size[1]), int(size[0])) # reversed order
3106
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003107 try:
3108 actual = os.get_terminal_size(sys.__stdin__.fileno())
3109 except OSError as e:
3110 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3111 # Under win32 a generic OSError can be thrown if the
3112 # handle cannot be retrieved
3113 self.skipTest("failed to query terminal size")
3114 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003115 self.assertEqual(expected, actual)
3116
3117
Victor Stinner292c8352012-10-30 02:17:38 +01003118class OSErrorTests(unittest.TestCase):
3119 def setUp(self):
3120 class Str(str):
3121 pass
3122
Victor Stinnerafe17062012-10-31 22:47:43 +01003123 self.bytes_filenames = []
3124 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003125 if support.TESTFN_UNENCODABLE is not None:
3126 decoded = support.TESTFN_UNENCODABLE
3127 else:
3128 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003129 self.unicode_filenames.append(decoded)
3130 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003131 if support.TESTFN_UNDECODABLE is not None:
3132 encoded = support.TESTFN_UNDECODABLE
3133 else:
3134 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003135 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003136 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003137 self.bytes_filenames.append(memoryview(encoded))
3138
3139 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003140
3141 def test_oserror_filename(self):
3142 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003143 (self.filenames, os.chdir,),
3144 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003145 (self.filenames, os.lstat,),
3146 (self.filenames, os.open, os.O_RDONLY),
3147 (self.filenames, os.rmdir,),
3148 (self.filenames, os.stat,),
3149 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003150 ]
3151 if sys.platform == "win32":
3152 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003153 (self.bytes_filenames, os.rename, b"dst"),
3154 (self.bytes_filenames, os.replace, b"dst"),
3155 (self.unicode_filenames, os.rename, "dst"),
3156 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003157 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003158 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003159 else:
3160 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003161 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003162 (self.filenames, os.rename, "dst"),
3163 (self.filenames, os.replace, "dst"),
3164 ))
3165 if hasattr(os, "chown"):
3166 funcs.append((self.filenames, os.chown, 0, 0))
3167 if hasattr(os, "lchown"):
3168 funcs.append((self.filenames, os.lchown, 0, 0))
3169 if hasattr(os, "truncate"):
3170 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003171 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003172 funcs.append((self.filenames, os.chflags, 0))
3173 if hasattr(os, "lchflags"):
3174 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003175 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003176 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003177 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003178 if sys.platform == "win32":
3179 funcs.append((self.bytes_filenames, os.link, b"dst"))
3180 funcs.append((self.unicode_filenames, os.link, "dst"))
3181 else:
3182 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003183 if hasattr(os, "listxattr"):
3184 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003185 (self.filenames, os.listxattr,),
3186 (self.filenames, os.getxattr, "user.test"),
3187 (self.filenames, os.setxattr, "user.test", b'user'),
3188 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003189 ))
3190 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003191 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003192 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003193 if sys.platform == "win32":
3194 funcs.append((self.unicode_filenames, os.readlink,))
3195 else:
3196 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003197
Steve Dowercc16be82016-09-08 10:35:16 -07003198
Victor Stinnerafe17062012-10-31 22:47:43 +01003199 for filenames, func, *func_args in funcs:
3200 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003201 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003202 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003203 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003204 else:
3205 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3206 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003207 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003208 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003209 except UnicodeDecodeError:
3210 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003211 else:
3212 self.fail("No exception thrown by {}".format(func))
3213
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003214class CPUCountTests(unittest.TestCase):
3215 def test_cpu_count(self):
3216 cpus = os.cpu_count()
3217 if cpus is not None:
3218 self.assertIsInstance(cpus, int)
3219 self.assertGreater(cpus, 0)
3220 else:
3221 self.skipTest("Could not determine the number of CPUs")
3222
Victor Stinnerdaf45552013-08-28 00:53:59 +02003223
3224class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003225 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003226 fd = os.open(__file__, os.O_RDONLY)
3227 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003228 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003229
Victor Stinnerdaf45552013-08-28 00:53:59 +02003230 os.set_inheritable(fd, True)
3231 self.assertEqual(os.get_inheritable(fd), True)
3232
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003233 @unittest.skipIf(fcntl is None, "need fcntl")
3234 def test_get_inheritable_cloexec(self):
3235 fd = os.open(__file__, os.O_RDONLY)
3236 self.addCleanup(os.close, fd)
3237 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003238
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003239 # clear FD_CLOEXEC flag
3240 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3241 flags &= ~fcntl.FD_CLOEXEC
3242 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003243
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003244 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003245
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003246 @unittest.skipIf(fcntl is None, "need fcntl")
3247 def test_set_inheritable_cloexec(self):
3248 fd = os.open(__file__, os.O_RDONLY)
3249 self.addCleanup(os.close, fd)
3250 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3251 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003252
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003253 os.set_inheritable(fd, True)
3254 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3255 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003256
Victor Stinnerdaf45552013-08-28 00:53:59 +02003257 def test_open(self):
3258 fd = os.open(__file__, os.O_RDONLY)
3259 self.addCleanup(os.close, fd)
3260 self.assertEqual(os.get_inheritable(fd), False)
3261
3262 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3263 def test_pipe(self):
3264 rfd, wfd = os.pipe()
3265 self.addCleanup(os.close, rfd)
3266 self.addCleanup(os.close, wfd)
3267 self.assertEqual(os.get_inheritable(rfd), False)
3268 self.assertEqual(os.get_inheritable(wfd), False)
3269
3270 def test_dup(self):
3271 fd1 = os.open(__file__, os.O_RDONLY)
3272 self.addCleanup(os.close, fd1)
3273
3274 fd2 = os.dup(fd1)
3275 self.addCleanup(os.close, fd2)
3276 self.assertEqual(os.get_inheritable(fd2), False)
3277
3278 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3279 def test_dup2(self):
3280 fd = os.open(__file__, os.O_RDONLY)
3281 self.addCleanup(os.close, fd)
3282
3283 # inheritable by default
3284 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003285 self.addCleanup(os.close, fd2)
3286 self.assertEqual(os.dup2(fd, fd2), fd2)
3287 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003288
3289 # force non-inheritable
3290 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003291 self.addCleanup(os.close, fd3)
3292 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3293 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003294
3295 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3296 def test_openpty(self):
3297 master_fd, slave_fd = os.openpty()
3298 self.addCleanup(os.close, master_fd)
3299 self.addCleanup(os.close, slave_fd)
3300 self.assertEqual(os.get_inheritable(master_fd), False)
3301 self.assertEqual(os.get_inheritable(slave_fd), False)
3302
3303
Brett Cannon3f9183b2016-08-26 14:44:48 -07003304class PathTConverterTests(unittest.TestCase):
3305 # tuples of (function name, allows fd arguments, additional arguments to
3306 # function, cleanup function)
3307 functions = [
3308 ('stat', True, (), None),
3309 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003310 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003311 ('chflags', False, (0,), None),
3312 ('lchflags', False, (0,), None),
3313 ('open', False, (0,), getattr(os, 'close', None)),
3314 ]
3315
3316 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003317 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003318 if os.name == 'nt':
3319 bytes_fspath = bytes_filename = None
3320 else:
3321 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003322 bytes_fspath = FakePath(bytes_filename)
3323 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003324 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003325 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003326
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003327 int_fspath = FakePath(fd)
3328 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003329
3330 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3331 with self.subTest(name=name):
3332 try:
3333 fn = getattr(os, name)
3334 except AttributeError:
3335 continue
3336
Brett Cannon8f96a302016-08-26 19:30:11 -07003337 for path in (str_filename, bytes_filename, str_fspath,
3338 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003339 if path is None:
3340 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003341 with self.subTest(name=name, path=path):
3342 result = fn(path, *extra_args)
3343 if cleanup_fn is not None:
3344 cleanup_fn(result)
3345
3346 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003347 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003348 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003349
3350 if allow_fd:
3351 result = fn(fd, *extra_args) # should not fail
3352 if cleanup_fn is not None:
3353 cleanup_fn(result)
3354 else:
3355 with self.assertRaisesRegex(
3356 TypeError,
3357 'os.PathLike'):
3358 fn(fd, *extra_args)
3359
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003360 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003361 msg = r'__fspath__\(\) to return str or bytes, not %s'
3362 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003363 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003364 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003365 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003366 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003367 os.stat(FakePath(object()))
3368
Brett Cannon3f9183b2016-08-26 14:44:48 -07003369
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003370@unittest.skipUnless(hasattr(os, 'get_blocking'),
3371 'needs os.get_blocking() and os.set_blocking()')
3372class BlockingTests(unittest.TestCase):
3373 def test_blocking(self):
3374 fd = os.open(__file__, os.O_RDONLY)
3375 self.addCleanup(os.close, fd)
3376 self.assertEqual(os.get_blocking(fd), True)
3377
3378 os.set_blocking(fd, False)
3379 self.assertEqual(os.get_blocking(fd), False)
3380
3381 os.set_blocking(fd, True)
3382 self.assertEqual(os.get_blocking(fd), True)
3383
3384
Yury Selivanov97e2e062014-09-26 12:33:06 -04003385
3386class ExportsTests(unittest.TestCase):
3387 def test_os_all(self):
3388 self.assertIn('open', os.__all__)
3389 self.assertIn('walk', os.__all__)
3390
3391
Victor Stinner6036e442015-03-08 01:58:04 +01003392class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003393 check_no_resource_warning = support.check_no_resource_warning
3394
Victor Stinner6036e442015-03-08 01:58:04 +01003395 def setUp(self):
3396 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003397 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003398 self.addCleanup(support.rmtree, self.path)
3399 os.mkdir(self.path)
3400
3401 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003402 path = self.bytes_path if isinstance(name, bytes) else self.path
3403 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003404 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003405 return filename
3406
3407 def get_entries(self, names):
3408 entries = dict((entry.name, entry)
3409 for entry in os.scandir(self.path))
3410 self.assertEqual(sorted(entries.keys()), names)
3411 return entries
3412
3413 def assert_stat_equal(self, stat1, stat2, skip_fields):
3414 if skip_fields:
3415 for attr in dir(stat1):
3416 if not attr.startswith("st_"):
3417 continue
3418 if attr in ("st_dev", "st_ino", "st_nlink"):
3419 continue
3420 self.assertEqual(getattr(stat1, attr),
3421 getattr(stat2, attr),
3422 (stat1, stat2, attr))
3423 else:
3424 self.assertEqual(stat1, stat2)
3425
3426 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003427 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003428 self.assertEqual(entry.name, name)
3429 self.assertEqual(entry.path, os.path.join(self.path, name))
3430 self.assertEqual(entry.inode(),
3431 os.stat(entry.path, follow_symlinks=False).st_ino)
3432
3433 entry_stat = os.stat(entry.path)
3434 self.assertEqual(entry.is_dir(),
3435 stat.S_ISDIR(entry_stat.st_mode))
3436 self.assertEqual(entry.is_file(),
3437 stat.S_ISREG(entry_stat.st_mode))
3438 self.assertEqual(entry.is_symlink(),
3439 os.path.islink(entry.path))
3440
3441 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3442 self.assertEqual(entry.is_dir(follow_symlinks=False),
3443 stat.S_ISDIR(entry_lstat.st_mode))
3444 self.assertEqual(entry.is_file(follow_symlinks=False),
3445 stat.S_ISREG(entry_lstat.st_mode))
3446
3447 self.assert_stat_equal(entry.stat(),
3448 entry_stat,
3449 os.name == 'nt' and not is_symlink)
3450 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3451 entry_lstat,
3452 os.name == 'nt')
3453
3454 def test_attributes(self):
3455 link = hasattr(os, 'link')
3456 symlink = support.can_symlink()
3457
3458 dirname = os.path.join(self.path, "dir")
3459 os.mkdir(dirname)
3460 filename = self.create_file("file.txt")
3461 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003462 try:
3463 os.link(filename, os.path.join(self.path, "link_file.txt"))
3464 except PermissionError as e:
3465 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003466 if symlink:
3467 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3468 target_is_directory=True)
3469 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3470
3471 names = ['dir', 'file.txt']
3472 if link:
3473 names.append('link_file.txt')
3474 if symlink:
3475 names.extend(('symlink_dir', 'symlink_file.txt'))
3476 entries = self.get_entries(names)
3477
3478 entry = entries['dir']
3479 self.check_entry(entry, 'dir', True, False, False)
3480
3481 entry = entries['file.txt']
3482 self.check_entry(entry, 'file.txt', False, True, False)
3483
3484 if link:
3485 entry = entries['link_file.txt']
3486 self.check_entry(entry, 'link_file.txt', False, True, False)
3487
3488 if symlink:
3489 entry = entries['symlink_dir']
3490 self.check_entry(entry, 'symlink_dir', True, False, True)
3491
3492 entry = entries['symlink_file.txt']
3493 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3494
3495 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003496 path = self.bytes_path if isinstance(name, bytes) else self.path
3497 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003498 self.assertEqual(len(entries), 1)
3499
3500 entry = entries[0]
3501 self.assertEqual(entry.name, name)
3502 return entry
3503
Brett Cannon96881cd2016-06-10 14:37:21 -07003504 def create_file_entry(self, name='file.txt'):
3505 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003506 return self.get_entry(os.path.basename(filename))
3507
3508 def test_current_directory(self):
3509 filename = self.create_file()
3510 old_dir = os.getcwd()
3511 try:
3512 os.chdir(self.path)
3513
3514 # call scandir() without parameter: it must list the content
3515 # of the current directory
3516 entries = dict((entry.name, entry) for entry in os.scandir())
3517 self.assertEqual(sorted(entries.keys()),
3518 [os.path.basename(filename)])
3519 finally:
3520 os.chdir(old_dir)
3521
3522 def test_repr(self):
3523 entry = self.create_file_entry()
3524 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3525
Brett Cannon96881cd2016-06-10 14:37:21 -07003526 def test_fspath_protocol(self):
3527 entry = self.create_file_entry()
3528 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3529
3530 def test_fspath_protocol_bytes(self):
3531 bytes_filename = os.fsencode('bytesfile.txt')
3532 bytes_entry = self.create_file_entry(name=bytes_filename)
3533 fspath = os.fspath(bytes_entry)
3534 self.assertIsInstance(fspath, bytes)
3535 self.assertEqual(fspath,
3536 os.path.join(os.fsencode(self.path),bytes_filename))
3537
Victor Stinner6036e442015-03-08 01:58:04 +01003538 def test_removed_dir(self):
3539 path = os.path.join(self.path, 'dir')
3540
3541 os.mkdir(path)
3542 entry = self.get_entry('dir')
3543 os.rmdir(path)
3544
3545 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3546 if os.name == 'nt':
3547 self.assertTrue(entry.is_dir())
3548 self.assertFalse(entry.is_file())
3549 self.assertFalse(entry.is_symlink())
3550 if os.name == 'nt':
3551 self.assertRaises(FileNotFoundError, entry.inode)
3552 # don't fail
3553 entry.stat()
3554 entry.stat(follow_symlinks=False)
3555 else:
3556 self.assertGreater(entry.inode(), 0)
3557 self.assertRaises(FileNotFoundError, entry.stat)
3558 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3559
3560 def test_removed_file(self):
3561 entry = self.create_file_entry()
3562 os.unlink(entry.path)
3563
3564 self.assertFalse(entry.is_dir())
3565 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3566 if os.name == 'nt':
3567 self.assertTrue(entry.is_file())
3568 self.assertFalse(entry.is_symlink())
3569 if os.name == 'nt':
3570 self.assertRaises(FileNotFoundError, entry.inode)
3571 # don't fail
3572 entry.stat()
3573 entry.stat(follow_symlinks=False)
3574 else:
3575 self.assertGreater(entry.inode(), 0)
3576 self.assertRaises(FileNotFoundError, entry.stat)
3577 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3578
3579 def test_broken_symlink(self):
3580 if not support.can_symlink():
3581 return self.skipTest('cannot create symbolic link')
3582
3583 filename = self.create_file("file.txt")
3584 os.symlink(filename,
3585 os.path.join(self.path, "symlink.txt"))
3586 entries = self.get_entries(['file.txt', 'symlink.txt'])
3587 entry = entries['symlink.txt']
3588 os.unlink(filename)
3589
3590 self.assertGreater(entry.inode(), 0)
3591 self.assertFalse(entry.is_dir())
3592 self.assertFalse(entry.is_file()) # broken symlink returns False
3593 self.assertFalse(entry.is_dir(follow_symlinks=False))
3594 self.assertFalse(entry.is_file(follow_symlinks=False))
3595 self.assertTrue(entry.is_symlink())
3596 self.assertRaises(FileNotFoundError, entry.stat)
3597 # don't fail
3598 entry.stat(follow_symlinks=False)
3599
3600 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003601 self.create_file("file.txt")
3602
3603 path_bytes = os.fsencode(self.path)
3604 entries = list(os.scandir(path_bytes))
3605 self.assertEqual(len(entries), 1, entries)
3606 entry = entries[0]
3607
3608 self.assertEqual(entry.name, b'file.txt')
3609 self.assertEqual(entry.path,
3610 os.fsencode(os.path.join(self.path, 'file.txt')))
3611
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003612 def test_bytes_like(self):
3613 self.create_file("file.txt")
3614
3615 for cls in bytearray, memoryview:
3616 path_bytes = cls(os.fsencode(self.path))
3617 with self.assertWarns(DeprecationWarning):
3618 entries = list(os.scandir(path_bytes))
3619 self.assertEqual(len(entries), 1, entries)
3620 entry = entries[0]
3621
3622 self.assertEqual(entry.name, b'file.txt')
3623 self.assertEqual(entry.path,
3624 os.fsencode(os.path.join(self.path, 'file.txt')))
3625 self.assertIs(type(entry.name), bytes)
3626 self.assertIs(type(entry.path), bytes)
3627
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003628 @unittest.skipUnless(os.listdir in os.supports_fd,
3629 'fd support for listdir required for this test.')
3630 def test_fd(self):
3631 self.assertIn(os.scandir, os.supports_fd)
3632 self.create_file('file.txt')
3633 expected_names = ['file.txt']
3634 if support.can_symlink():
3635 os.symlink('file.txt', os.path.join(self.path, 'link'))
3636 expected_names.append('link')
3637
3638 fd = os.open(self.path, os.O_RDONLY)
3639 try:
3640 with os.scandir(fd) as it:
3641 entries = list(it)
3642 names = [entry.name for entry in entries]
3643 self.assertEqual(sorted(names), expected_names)
3644 self.assertEqual(names, os.listdir(fd))
3645 for entry in entries:
3646 self.assertEqual(entry.path, entry.name)
3647 self.assertEqual(os.fspath(entry), entry.name)
3648 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3649 if os.stat in os.supports_dir_fd:
3650 st = os.stat(entry.name, dir_fd=fd)
3651 self.assertEqual(entry.stat(), st)
3652 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3653 self.assertEqual(entry.stat(follow_symlinks=False), st)
3654 finally:
3655 os.close(fd)
3656
Victor Stinner6036e442015-03-08 01:58:04 +01003657 def test_empty_path(self):
3658 self.assertRaises(FileNotFoundError, os.scandir, '')
3659
3660 def test_consume_iterator_twice(self):
3661 self.create_file("file.txt")
3662 iterator = os.scandir(self.path)
3663
3664 entries = list(iterator)
3665 self.assertEqual(len(entries), 1, entries)
3666
3667 # check than consuming the iterator twice doesn't raise exception
3668 entries2 = list(iterator)
3669 self.assertEqual(len(entries2), 0, entries2)
3670
3671 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003672 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003673 self.assertRaises(TypeError, os.scandir, obj)
3674
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003675 def test_close(self):
3676 self.create_file("file.txt")
3677 self.create_file("file2.txt")
3678 iterator = os.scandir(self.path)
3679 next(iterator)
3680 iterator.close()
3681 # multiple closes
3682 iterator.close()
3683 with self.check_no_resource_warning():
3684 del iterator
3685
3686 def test_context_manager(self):
3687 self.create_file("file.txt")
3688 self.create_file("file2.txt")
3689 with os.scandir(self.path) as iterator:
3690 next(iterator)
3691 with self.check_no_resource_warning():
3692 del iterator
3693
3694 def test_context_manager_close(self):
3695 self.create_file("file.txt")
3696 self.create_file("file2.txt")
3697 with os.scandir(self.path) as iterator:
3698 next(iterator)
3699 iterator.close()
3700
3701 def test_context_manager_exception(self):
3702 self.create_file("file.txt")
3703 self.create_file("file2.txt")
3704 with self.assertRaises(ZeroDivisionError):
3705 with os.scandir(self.path) as iterator:
3706 next(iterator)
3707 1/0
3708 with self.check_no_resource_warning():
3709 del iterator
3710
3711 def test_resource_warning(self):
3712 self.create_file("file.txt")
3713 self.create_file("file2.txt")
3714 iterator = os.scandir(self.path)
3715 next(iterator)
3716 with self.assertWarns(ResourceWarning):
3717 del iterator
3718 support.gc_collect()
3719 # exhausted iterator
3720 iterator = os.scandir(self.path)
3721 list(iterator)
3722 with self.check_no_resource_warning():
3723 del iterator
3724
Victor Stinner6036e442015-03-08 01:58:04 +01003725
Ethan Furmancdc08792016-06-02 15:06:09 -07003726class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003727
3728 # Abstracted so it can be overridden to test pure Python implementation
3729 # if a C version is provided.
3730 fspath = staticmethod(os.fspath)
3731
Ethan Furmancdc08792016-06-02 15:06:09 -07003732 def test_return_bytes(self):
3733 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003734 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003735
3736 def test_return_string(self):
3737 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003738 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003739
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003740 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003741 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003742 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003743
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003744 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003745 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3746 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3747
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003748 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003749 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3750 self.assertTrue(issubclass(FakePath, os.PathLike))
3751 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003752
Ethan Furmancdc08792016-06-02 15:06:09 -07003753 def test_garbage_in_exception_out(self):
3754 vapor = type('blah', (), {})
3755 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003756 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003757
3758 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003759 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003760
Brett Cannon044283a2016-07-15 10:41:49 -07003761 def test_bad_pathlike(self):
3762 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003763 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003764 # __fspath__ attribute that is not callable.
3765 c = type('foo', (), {})
3766 c.__fspath__ = 1
3767 self.assertRaises(TypeError, self.fspath, c())
3768 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003769 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003770 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003771
Victor Stinnerc29b5852017-11-02 07:28:27 -07003772
3773class TimesTests(unittest.TestCase):
3774 def test_times(self):
3775 times = os.times()
3776 self.assertIsInstance(times, os.times_result)
3777
3778 for field in ('user', 'system', 'children_user', 'children_system',
3779 'elapsed'):
3780 value = getattr(times, field)
3781 self.assertIsInstance(value, float)
3782
3783 if os.name == 'nt':
3784 self.assertEqual(times.children_user, 0)
3785 self.assertEqual(times.children_system, 0)
3786 self.assertEqual(times.elapsed, 0)
3787
3788
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003789# Only test if the C version is provided, otherwise TestPEP519 already tested
3790# the pure Python implementation.
3791if hasattr(os, "_fspath"):
3792 class TestPEP519PurePython(TestPEP519):
3793
3794 """Explicitly test the pure Python implementation of os.fspath()."""
3795
3796 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003797
3798
Fred Drake2e2be372001-09-20 21:33:42 +00003799if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003800 unittest.main()