blob: 97202109ee3f4592cfa3814aaeeafbbc438eb936 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner138adb82015-06-12 22:01:54 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner138adb82015-06-12 22:01:54 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner138adb82015-06-12 22:01:54 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner138adb82015-06-12 22:01:54 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner138adb82015-06-12 22:01:54 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner138adb82015-06-12 22:01:54 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Berker Peksagbf3c1c32016-09-18 13:56:29 +030085def create_file(filename, content=b'content'):
86 with open(filename, "xb", 0) as fp:
87 fp.write(content)
88
89
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090# Tests creating TESTFN
91class FileTests(unittest.TestCase):
92 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000093 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000094 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000095 tearDown = setUp
96
97 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000098 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000099 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000100 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000101
Christian Heimesfdab48e2008-01-20 09:06:41 +0000102 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000103 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
104 # We must allocate two consecutive file descriptors, otherwise
105 # it will mess up other file descriptors (perhaps even the three
106 # standard ones).
107 second = os.dup(first)
108 try:
109 retries = 0
110 while second != first + 1:
111 os.close(first)
112 retries += 1
113 if retries > 10:
114 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000115 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000116 first, second = second, os.dup(second)
117 finally:
118 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000119 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000120 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000121 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000122
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000123 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000124 def test_rename(self):
125 path = support.TESTFN
126 old = sys.getrefcount(path)
127 self.assertRaises(TypeError, os.rename, path, 0)
128 new = sys.getrefcount(path)
129 self.assertEqual(old, new)
130
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000131 def test_read(self):
132 with open(support.TESTFN, "w+b") as fobj:
133 fobj.write(b"spam")
134 fobj.flush()
135 fd = fobj.fileno()
136 os.lseek(fd, 0, 0)
137 s = os.read(fd, 4)
138 self.assertEqual(type(s), bytes)
139 self.assertEqual(s, b"spam")
140
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200142 # Skip the test on 32-bit platforms: the number of bytes must fit in a
143 # Py_ssize_t type
144 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
145 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200146 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
147 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200148 with open(support.TESTFN, "wb") as fp:
149 fp.write(b'test')
150 self.addCleanup(support.unlink, support.TESTFN)
151
152 # Issue #21932: Make sure that os.read() does not raise an
153 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200154 with open(support.TESTFN, "rb") as fp:
155 data = os.read(fp.fileno(), size)
156
157 # The test does not try to read more than 2 GB at once because the
158 # operating system is free to return less bytes than requested.
159 self.assertEqual(data, b'test')
160
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000161 def test_write(self):
162 # os.write() accepts bytes- and buffer-like objects but not strings
163 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
164 self.assertRaises(TypeError, os.write, fd, "beans")
165 os.write(fd, b"bacon\n")
166 os.write(fd, bytearray(b"eggs\n"))
167 os.write(fd, memoryview(b"spam\n"))
168 os.close(fd)
169 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000170 self.assertEqual(fobj.read().splitlines(),
171 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000172
Victor Stinnere0daff12011-03-20 23:36:35 +0100173 def write_windows_console(self, *args):
174 retcode = subprocess.call(args,
175 # use a new console to not flood the test output
176 creationflags=subprocess.CREATE_NEW_CONSOLE,
177 # use a shell to hide the console window (SW_HIDE)
178 shell=True)
179 self.assertEqual(retcode, 0)
180
181 @unittest.skipUnless(sys.platform == 'win32',
182 'test specific to the Windows console')
183 def test_write_windows_console(self):
184 # Issue #11395: the Windows console returns an error (12: not enough
185 # space error) on writing into stdout if stdout mode is binary and the
186 # length is greater than 66,000 bytes (or less, depending on heap
187 # usage).
188 code = "print('x' * 100000)"
189 self.write_windows_console(sys.executable, "-c", code)
190 self.write_windows_console(sys.executable, "-u", "-c", code)
191
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000192 def fdopen_helper(self, *args):
193 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200194 f = os.fdopen(fd, *args)
195 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196
197 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200198 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
199 os.close(fd)
200
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000201 self.fdopen_helper()
202 self.fdopen_helper('r')
203 self.fdopen_helper('r', 100)
204
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100205 def test_replace(self):
206 TESTFN2 = support.TESTFN + ".2"
207 with open(support.TESTFN, 'w') as f:
208 f.write("1")
209 with open(TESTFN2, 'w') as f:
210 f.write("2")
211 self.addCleanup(os.unlink, TESTFN2)
212 os.replace(support.TESTFN, TESTFN2)
213 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
214 with open(TESTFN2, 'r') as f:
215 self.assertEqual(f.read(), "1")
216
Martin Panterbf19d162015-09-09 01:01:13 +0000217 def test_open_keywords(self):
218 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
219 dir_fd=None)
220 os.close(f)
221
222 def test_symlink_keywords(self):
223 symlink = support.get_attribute(os, "symlink")
224 try:
225 symlink(src='target', dst=support.TESTFN,
226 target_is_directory=False, dir_fd=None)
227 except (NotImplementedError, OSError):
228 pass # No OS support or unprivileged user
229
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200230
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231# Test attributes on return values from os.*stat* family.
232class StatAttributeTests(unittest.TestCase):
233 def setUp(self):
Berker Peksagbf3c1c32016-09-18 13:56:29 +0300234 self.fname = support.TESTFN
235 self.addCleanup(support.unlink, self.fname)
236 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237
Serhiy Storchaka43767632013-11-03 21:31:38 +0200238 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000239 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000240 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
242 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000243 self.assertEqual(result[stat.ST_SIZE], 3)
244 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246 # Make sure all the attributes are there
247 members = dir(result)
248 for name in dir(stat):
249 if name[:3] == 'ST_':
250 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000251 if name.endswith("TIME"):
252 def trunc(x): return int(x)
253 else:
254 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000256 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258
Larry Hastings6fe20b32012-04-19 15:07:49 -0700259 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700260 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700261 for name in 'st_atime st_mtime st_ctime'.split():
262 floaty = int(getattr(result, name) * 100000)
263 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700264 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700265
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266 try:
267 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200268 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 except IndexError:
270 pass
271
272 # Make sure that assignment fails
273 try:
274 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200275 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000276 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000277 pass
278
279 try:
280 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200281 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000282 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 pass
284
285 try:
286 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200287 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288 except AttributeError:
289 pass
290
291 # Use the stat_result constructor with a too-short tuple.
292 try:
293 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except TypeError:
296 pass
297
Ezio Melotti42da6632011-03-15 05:18:48 +0200298 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000299 try:
300 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
301 except TypeError:
302 pass
303
Antoine Pitrou38425292010-09-21 18:19:07 +0000304 def test_stat_attributes(self):
305 self.check_stat_attributes(self.fname)
306
307 def test_stat_attributes_bytes(self):
308 try:
309 fname = self.fname.encode(sys.getfilesystemencoding())
310 except UnicodeEncodeError:
311 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100312 with warnings.catch_warnings():
313 warnings.simplefilter("ignore", DeprecationWarning)
314 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000315
Christian Heimes25827622013-10-12 01:27:08 +0200316 def test_stat_result_pickle(self):
317 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200318 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
319 p = pickle.dumps(result, proto)
320 self.assertIn(b'stat_result', p)
321 if proto < 4:
322 self.assertIn(b'cos\nstat_result\n', p)
323 unpickled = pickle.loads(p)
324 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200325
Serhiy Storchaka43767632013-11-03 21:31:38 +0200326 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000327 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 try:
329 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000330 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000331 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000332 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200333 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000334
335 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000336 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000337
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000338 # Make sure all the attributes are there.
339 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
340 'ffree', 'favail', 'flag', 'namemax')
341 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000342 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000343
344 # Make sure that assignment really fails
345 try:
346 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200347 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000348 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000349 pass
350
351 try:
352 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200353 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354 except AttributeError:
355 pass
356
357 # Use the constructor with a too-short tuple.
358 try:
359 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200360 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 except TypeError:
362 pass
363
Ezio Melotti42da6632011-03-15 05:18:48 +0200364 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 try:
366 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
367 except TypeError:
368 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000369
Christian Heimes25827622013-10-12 01:27:08 +0200370 @unittest.skipUnless(hasattr(os, 'statvfs'),
371 "need os.statvfs()")
372 def test_statvfs_result_pickle(self):
373 try:
374 result = os.statvfs(self.fname)
375 except OSError as e:
376 # On AtheOS, glibc always returns ENOSYS
377 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200378 self.skipTest('os.statvfs() failed with ENOSYS')
379
Serhiy Storchakabad12572014-12-15 14:03:42 +0200380 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
381 p = pickle.dumps(result, proto)
382 self.assertIn(b'statvfs_result', p)
383 if proto < 4:
384 self.assertIn(b'cos\nstatvfs_result\n', p)
385 unpickled = pickle.loads(p)
386 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200387
Serhiy Storchaka43767632013-11-03 21:31:38 +0200388 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
389 def test_1686475(self):
390 # Verify that an open file can be stat'ed
391 try:
392 os.stat(r"c:\pagefile.sys")
393 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600394 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200395 except OSError as e:
396 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000397
Serhiy Storchaka43767632013-11-03 21:31:38 +0200398 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
399 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
400 def test_15261(self):
401 # Verify that stat'ing a closed fd does not cause crash
402 r, w = os.pipe()
403 try:
404 os.stat(r) # should not raise error
405 finally:
406 os.close(r)
407 os.close(w)
408 with self.assertRaises(OSError) as ctx:
409 os.stat(r)
410 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100411
Zachary Ware63f277b2014-06-19 09:46:37 -0500412 def check_file_attributes(self, result):
413 self.assertTrue(hasattr(result, 'st_file_attributes'))
414 self.assertTrue(isinstance(result.st_file_attributes, int))
415 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
416
417 @unittest.skipUnless(sys.platform == "win32",
418 "st_file_attributes is Win32 specific")
419 def test_file_attributes(self):
420 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
421 result = os.stat(self.fname)
422 self.check_file_attributes(result)
423 self.assertEqual(
424 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
425 0)
426
427 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Berker Peksagbf3c1c32016-09-18 13:56:29 +0300428 dirname = support.TESTFN + "dir"
429 os.mkdir(dirname)
430 self.addCleanup(os.rmdir, dirname)
431
432 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500433 self.check_file_attributes(result)
434 self.assertEqual(
435 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
436 stat.FILE_ATTRIBUTE_DIRECTORY)
437
Berker Peksag0b4dc482016-09-17 15:49:59 +0300438 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
439 def test_access_denied(self):
440 # Default to FindFirstFile WIN32_FIND_DATA when access is
441 # denied. See issue 28075.
442 # os.environ['TEMP'] should be located on a volume that
443 # supports file ACLs.
444 fname = os.path.join(os.environ['TEMP'], self.fname)
445 self.addCleanup(support.unlink, fname)
Berker Peksagbf3c1c32016-09-18 13:56:29 +0300446 create_file(fname, b'ABC')
Berker Peksag0b4dc482016-09-17 15:49:59 +0300447 # Deny the right to [S]YNCHRONIZE on the file to
448 # force CreateFile to fail with ERROR_ACCESS_DENIED.
449 DETACHED_PROCESS = 8
450 subprocess.check_call(
451 ['icacls.exe', fname, '/deny', 'Users:(S)'],
452 creationflags=DETACHED_PROCESS
453 )
454 result = os.stat(fname)
455 self.assertNotEqual(result.st_size, 0)
456
Victor Stinner138adb82015-06-12 22:01:54 +0200457
458class UtimeTests(unittest.TestCase):
459 def setUp(self):
460 self.dirname = support.TESTFN
461 self.fname = os.path.join(self.dirname, "f1")
462
463 self.addCleanup(support.rmtree, self.dirname)
464 os.mkdir(self.dirname)
465 with open(self.fname, 'wb') as fp:
466 fp.write(b"ABC")
467
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200468 def restore_float_times(state):
469 with warnings.catch_warnings():
470 warnings.simplefilter("ignore", DeprecationWarning)
471
472 os.stat_float_times(state)
473
Victor Stinner138adb82015-06-12 22:01:54 +0200474 # ensure that st_atime and st_mtime are float
475 with warnings.catch_warnings():
476 warnings.simplefilter("ignore", DeprecationWarning)
477
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200478 old_float_times = os.stat_float_times(-1)
479 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner138adb82015-06-12 22:01:54 +0200480
481 os.stat_float_times(True)
482
483 def support_subsecond(self, filename):
484 # Heuristic to check if the filesystem supports timestamp with
485 # subsecond resolution: check if float and int timestamps are different
486 st = os.stat(filename)
487 return ((st.st_atime != st[7])
488 or (st.st_mtime != st[8])
489 or (st.st_ctime != st[9]))
490
491 def _test_utime(self, set_time, filename=None):
492 if not filename:
493 filename = self.fname
494
495 support_subsecond = self.support_subsecond(filename)
496 if support_subsecond:
497 # Timestamp with a resolution of 1 microsecond (10^-6).
498 #
499 # The resolution of the C internal function used by os.utime()
500 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
501 # test with a resolution of 1 ns requires more work:
502 # see the issue #15745.
503 atime_ns = 1002003000 # 1.002003 seconds
504 mtime_ns = 4005006000 # 4.005006 seconds
505 else:
506 # use a resolution of 1 second
507 atime_ns = 5 * 10**9
508 mtime_ns = 8 * 10**9
509
510 set_time(filename, (atime_ns, mtime_ns))
511 st = os.stat(filename)
512
513 if support_subsecond:
514 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
515 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
516 else:
517 self.assertEqual(st.st_atime, atime_ns * 1e-9)
518 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
519 self.assertEqual(st.st_atime_ns, atime_ns)
520 self.assertEqual(st.st_mtime_ns, mtime_ns)
521
522 def test_utime(self):
523 def set_time(filename, ns):
524 # test the ns keyword parameter
525 os.utime(filename, ns=ns)
526 self._test_utime(set_time)
527
528 @staticmethod
529 def ns_to_sec(ns):
530 # Convert a number of nanosecond (int) to a number of seconds (float).
531 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
532 # issue, os.utime() rounds towards minus infinity.
533 return (ns * 1e-9) + 0.5e-9
534
535 def test_utime_by_indexed(self):
536 # pass times as floating point seconds as the second indexed parameter
537 def set_time(filename, ns):
538 atime_ns, mtime_ns = ns
539 atime = self.ns_to_sec(atime_ns)
540 mtime = self.ns_to_sec(mtime_ns)
541 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
542 # or utime(time_t)
543 os.utime(filename, (atime, mtime))
544 self._test_utime(set_time)
545
546 def test_utime_by_times(self):
547 def set_time(filename, ns):
548 atime_ns, mtime_ns = ns
549 atime = self.ns_to_sec(atime_ns)
550 mtime = self.ns_to_sec(mtime_ns)
551 # test the times keyword parameter
552 os.utime(filename, times=(atime, mtime))
553 self._test_utime(set_time)
554
555 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
556 "follow_symlinks support for utime required "
557 "for this test.")
558 def test_utime_nofollow_symlinks(self):
559 def set_time(filename, ns):
560 # use follow_symlinks=False to test utimensat(timespec)
561 # or lutimes(timeval)
562 os.utime(filename, ns=ns, follow_symlinks=False)
563 self._test_utime(set_time)
564
565 @unittest.skipUnless(os.utime in os.supports_fd,
566 "fd support for utime required for this test.")
567 def test_utime_fd(self):
568 def set_time(filename, ns):
569 with open(filename, 'wb') as fp:
570 # use a file descriptor to test futimens(timespec)
571 # or futimes(timeval)
572 os.utime(fp.fileno(), ns=ns)
573 self._test_utime(set_time)
574
575 @unittest.skipUnless(os.utime in os.supports_dir_fd,
576 "dir_fd support for utime required for this test.")
577 def test_utime_dir_fd(self):
578 def set_time(filename, ns):
579 dirname, name = os.path.split(filename)
580 dirfd = os.open(dirname, os.O_RDONLY)
581 try:
582 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
583 os.utime(name, dir_fd=dirfd, ns=ns)
584 finally:
585 os.close(dirfd)
586 self._test_utime(set_time)
587
588 def test_utime_directory(self):
589 def set_time(filename, ns):
590 # test calling os.utime() on a directory
591 os.utime(filename, ns=ns)
592 self._test_utime(set_time, filename=self.dirname)
593
594 def _test_utime_current(self, set_time):
595 # Get the system clock
596 current = time.time()
597
598 # Call os.utime() to set the timestamp to the current system clock
599 set_time(self.fname)
600
601 if not self.support_subsecond(self.fname):
602 delta = 1.0
603 else:
604 # On Windows, the usual resolution of time.time() is 15.6 ms
605 delta = 0.020
606 st = os.stat(self.fname)
607 msg = ("st_time=%r, current=%r, dt=%r"
608 % (st.st_mtime, current, st.st_mtime - current))
609 self.assertAlmostEqual(st.st_mtime, current,
610 delta=delta, msg=msg)
611
612 def test_utime_current(self):
613 def set_time(filename):
614 # Set to the current time in the new way
615 os.utime(self.fname)
616 self._test_utime_current(set_time)
617
618 def test_utime_current_old(self):
619 def set_time(filename):
620 # Set to the current time in the old explicit way.
621 os.utime(self.fname, None)
622 self._test_utime_current(set_time)
623
624 def get_file_system(self, path):
625 if sys.platform == 'win32':
626 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
627 import ctypes
628 kernel32 = ctypes.windll.kernel32
629 buf = ctypes.create_unicode_buffer("", 100)
630 ok = kernel32.GetVolumeInformationW(root, None, 0,
631 None, None, None,
632 buf, len(buf))
633 if ok:
634 return buf.value
635 # return None if the filesystem is unknown
636
637 def test_large_time(self):
638 # Many filesystems are limited to the year 2038. At least, the test
639 # pass with NTFS filesystem.
640 if self.get_file_system(self.dirname) != "NTFS":
641 self.skipTest("requires NTFS")
642
643 large = 5000000000 # some day in 2128
644 os.utime(self.fname, (large, large))
645 self.assertEqual(os.stat(self.fname).st_mtime, large)
646
647 def test_utime_invalid_arguments(self):
648 # seconds and nanoseconds parameters are mutually exclusive
649 with self.assertRaises(ValueError):
650 os.utime(self.fname, (5, 5), ns=(5, 5))
651
652
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000653from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000654
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000655class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000656 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000657 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000658
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000659 def setUp(self):
660 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000661 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000662 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000663 for key, value in self._reference().items():
664 os.environ[key] = value
665
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000666 def tearDown(self):
667 os.environ.clear()
668 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000669 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000670 os.environb.clear()
671 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000672
Christian Heimes90333392007-11-01 19:08:42 +0000673 def _reference(self):
674 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
675
676 def _empty_mapping(self):
677 os.environ.clear()
678 return os.environ
679
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000680 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300681 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000682 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000683 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300684 os.environ.update(HELLO="World")
685 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
686 value = popen.read().strip()
687 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000688
Ezio Melottic7e139b2012-09-26 20:01:34 +0300689 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000690 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300691 with os.popen(
692 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
693 it = iter(popen)
694 self.assertEqual(next(it), "line1\n")
695 self.assertEqual(next(it), "line2\n")
696 self.assertEqual(next(it), "line3\n")
697 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000698
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000699 # Verify environ keys and values from the OS are of the
700 # correct str type.
701 def test_keyvalue_types(self):
702 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000703 self.assertEqual(type(key), str)
704 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000705
Christian Heimes90333392007-11-01 19:08:42 +0000706 def test_items(self):
707 for key, value in self._reference().items():
708 self.assertEqual(os.environ.get(key), value)
709
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000710 # Issue 7310
711 def test___repr__(self):
712 """Check that the repr() of os.environ looks like environ({...})."""
713 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000714 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
715 '{!r}: {!r}'.format(key, value)
716 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000717
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000718 def test_get_exec_path(self):
719 defpath_list = os.defpath.split(os.pathsep)
720 test_path = ['/monty', '/python', '', '/flying/circus']
721 test_env = {'PATH': os.pathsep.join(test_path)}
722
723 saved_environ = os.environ
724 try:
725 os.environ = dict(test_env)
726 # Test that defaulting to os.environ works.
727 self.assertSequenceEqual(test_path, os.get_exec_path())
728 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
729 finally:
730 os.environ = saved_environ
731
732 # No PATH environment variable
733 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
734 # Empty PATH environment variable
735 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
736 # Supplied PATH environment variable
737 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
738
Victor Stinnerb745a742010-05-18 17:17:23 +0000739 if os.supports_bytes_environ:
740 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000741 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000742 # ignore BytesWarning warning
743 with warnings.catch_warnings(record=True):
744 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000745 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000746 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000747 pass
748 else:
749 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000750
751 # bytes key and/or value
752 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
753 ['abc'])
754 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
755 ['abc'])
756 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
757 ['abc'])
758
759 @unittest.skipUnless(os.supports_bytes_environ,
760 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000761 def test_environb(self):
762 # os.environ -> os.environb
763 value = 'euro\u20ac'
764 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000765 value_bytes = value.encode(sys.getfilesystemencoding(),
766 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000767 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000768 msg = "U+20AC character is not encodable to %s" % (
769 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000770 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000771 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000772 self.assertEqual(os.environ['unicode'], value)
773 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000774
775 # os.environb -> os.environ
776 value = b'\xff'
777 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000778 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000779 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000780 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000781
Charles-François Natali2966f102011-11-26 11:32:46 +0100782 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
783 # #13415).
784 @support.requires_freebsd_version(7)
785 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100786 def test_unset_error(self):
787 if sys.platform == "win32":
788 # an environment variable is limited to 32,767 characters
789 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100790 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100791 else:
792 # "=" is not allowed in a variable name
793 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100794 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100795
Victor Stinner6d101392013-04-14 16:35:04 +0200796 def test_key_type(self):
797 missing = 'missingkey'
798 self.assertNotIn(missing, os.environ)
799
Victor Stinner839e5ea2013-04-14 16:43:03 +0200800 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200801 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200802 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200803 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200804
Victor Stinner839e5ea2013-04-14 16:43:03 +0200805 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200806 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200807 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200808 self.assertTrue(cm.exception.__suppress_context__)
809
Victor Stinner6d101392013-04-14 16:35:04 +0200810
Tim Petersc4e09402003-04-25 07:11:48 +0000811class WalkTests(unittest.TestCase):
812 """Tests for os.walk()."""
813
Victor Stinner0561c532015-03-12 10:28:24 +0100814 # Wrapper to hide minor differences between os.walk and os.fwalk
815 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200816 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200817 if 'follow_symlinks' in kwargs:
818 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200819 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100820
Charles-François Natali7372b062012-02-05 15:15:38 +0100821 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100822 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000823
824 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000825 # TESTFN/
826 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000827 # tmp1
828 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000829 # tmp2
830 # SUB11/ no kids
831 # SUB2/ a file kid and a dirsymlink kid
832 # tmp3
833 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200834 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000835 # TEST2/
836 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100837 self.walk_path = join(support.TESTFN, "TEST1")
838 self.sub1_path = join(self.walk_path, "SUB1")
839 self.sub11_path = join(self.sub1_path, "SUB11")
840 sub2_path = join(self.walk_path, "SUB2")
841 tmp1_path = join(self.walk_path, "tmp1")
842 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000843 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100844 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000845 t2_path = join(support.TESTFN, "TEST2")
846 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200847 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000848
849 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100850 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000851 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000852 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100853
Guido van Rossumd8faa362007-04-27 19:54:29 +0000854 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000855 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000856 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
857 f.close()
858
Victor Stinner0561c532015-03-12 10:28:24 +0100859 if support.can_symlink():
860 os.symlink(os.path.abspath(t2_path), self.link_path)
861 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200862 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100863 else:
864 self.sub2_tree = (sub2_path, [], ["tmp3"])
865
866 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000867 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300868 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100869
Tim Petersc4e09402003-04-25 07:11:48 +0000870 self.assertEqual(len(all), 4)
871 # We can't know which order SUB1 and SUB2 will appear in.
872 # Not flipped: TESTFN, SUB1, SUB11, SUB2
873 # flipped: TESTFN, SUB2, SUB1, SUB11
874 flipped = all[0][1][0] != "SUB1"
875 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200876 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100877 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
878 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
879 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
880 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000881
Victor Stinner0561c532015-03-12 10:28:24 +0100882 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000883 # Prune the search.
884 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100885 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000886 all.append((root, dirs, files))
887 # Don't descend into SUB1.
888 if 'SUB1' in dirs:
889 # Note that this also mutates the dirs we appended to all!
890 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000891
Victor Stinner0561c532015-03-12 10:28:24 +0100892 self.assertEqual(len(all), 2)
893 self.assertEqual(all[0],
894 (self.walk_path, ["SUB2"], ["tmp1"]))
895
896 all[1][-1].sort()
897 self.assertEqual(all[1], self.sub2_tree)
898
899 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000900 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100901 all = list(self.walk(self.walk_path, topdown=False))
902
Tim Petersc4e09402003-04-25 07:11:48 +0000903 self.assertEqual(len(all), 4)
904 # We can't know which order SUB1 and SUB2 will appear in.
905 # Not flipped: SUB11, SUB1, SUB2, TESTFN
906 # flipped: SUB2, SUB11, SUB1, TESTFN
907 flipped = all[3][1][0] != "SUB1"
908 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200909 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100910 self.assertEqual(all[3],
911 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
912 self.assertEqual(all[flipped],
913 (self.sub11_path, [], []))
914 self.assertEqual(all[flipped + 1],
915 (self.sub1_path, ["SUB11"], ["tmp2"]))
916 self.assertEqual(all[2 - 2 * flipped],
917 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000918
Victor Stinner0561c532015-03-12 10:28:24 +0100919 def test_walk_symlink(self):
920 if not support.can_symlink():
921 self.skipTest("need symlink support")
922
923 # Walk, following symlinks.
924 walk_it = self.walk(self.walk_path, follow_symlinks=True)
925 for root, dirs, files in walk_it:
926 if root == self.link_path:
927 self.assertEqual(dirs, [])
928 self.assertEqual(files, ["tmp4"])
929 break
930 else:
931 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000932
933 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000934 # Tear everything down. This is a decent use for bottom-up on
935 # Windows, which doesn't have a recursive delete command. The
936 # (not so) subtlety is that rmdir will fail unless the dir's
937 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000938 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000939 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000940 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000941 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000942 dirname = os.path.join(root, name)
943 if not os.path.islink(dirname):
944 os.rmdir(dirname)
945 else:
946 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000947 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000948
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200949 def test_walk_bad_dir(self):
950 # Walk top-down.
951 errors = []
952 walk_it = self.walk(self.walk_path, onerror=errors.append)
953 root, dirs, files = next(walk_it)
954 self.assertFalse(errors)
955 dir1 = dirs[0]
956 dir1new = dir1 + '.new'
957 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
958 roots = [r for r, d, f in walk_it]
959 self.assertTrue(errors)
960 self.assertNotIn(os.path.join(root, dir1), roots)
961 self.assertNotIn(os.path.join(root, dir1new), roots)
962 for dir2 in dirs[1:]:
963 self.assertIn(os.path.join(root, dir2), roots)
964
Charles-François Natali7372b062012-02-05 15:15:38 +0100965
966@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
967class FwalkTests(WalkTests):
968 """Tests for os.fwalk()."""
969
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200970 def walk(self, top, **kwargs):
971 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100972 yield (root, dirs, files)
973
Larry Hastingsc48fe982012-06-25 04:49:05 -0700974 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
975 """
976 compare with walk() results.
977 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700978 walk_kwargs = walk_kwargs.copy()
979 fwalk_kwargs = fwalk_kwargs.copy()
980 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
981 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
982 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700983
Charles-François Natali7372b062012-02-05 15:15:38 +0100984 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700985 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100986 expected[root] = (set(dirs), set(files))
987
Larry Hastingsc48fe982012-06-25 04:49:05 -0700988 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100989 self.assertIn(root, expected)
990 self.assertEqual(expected[root], (set(dirs), set(files)))
991
Larry Hastingsc48fe982012-06-25 04:49:05 -0700992 def test_compare_to_walk(self):
993 kwargs = {'top': support.TESTFN}
994 self._compare_to_walk(kwargs, kwargs)
995
Charles-François Natali7372b062012-02-05 15:15:38 +0100996 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700997 try:
998 fd = os.open(".", os.O_RDONLY)
999 walk_kwargs = {'top': support.TESTFN}
1000 fwalk_kwargs = walk_kwargs.copy()
1001 fwalk_kwargs['dir_fd'] = fd
1002 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1003 finally:
1004 os.close(fd)
1005
1006 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001007 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001008 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1009 args = support.TESTFN, topdown, None
1010 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001011 # check that the FD is valid
1012 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001013 # redundant check
1014 os.stat(rootfd)
1015 # check that listdir() returns consistent information
1016 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001017
1018 def test_fd_leak(self):
1019 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1020 # we both check that calling fwalk() a large number of times doesn't
1021 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1022 minfd = os.dup(1)
1023 os.close(minfd)
1024 for i in range(256):
1025 for x in os.fwalk(support.TESTFN):
1026 pass
1027 newfd = os.dup(1)
1028 self.addCleanup(os.close, newfd)
1029 self.assertEqual(newfd, minfd)
1030
1031 def tearDown(self):
1032 # cleanup
1033 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1034 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001035 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001036 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001037 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001038 if stat.S_ISDIR(st.st_mode):
1039 os.rmdir(name, dir_fd=rootfd)
1040 else:
1041 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001042 os.rmdir(support.TESTFN)
1043
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001044class BytesWalkTests(WalkTests):
1045 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001046 def setUp(self):
1047 super().setUp()
1048 self.stack = contextlib.ExitStack()
1049 if os.name == 'nt':
1050 self.stack.enter_context(warnings.catch_warnings())
1051 warnings.simplefilter("ignore", DeprecationWarning)
1052
1053 def tearDown(self):
1054 self.stack.close()
1055 super().tearDown()
1056
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001057 def walk(self, top, **kwargs):
1058 if 'follow_symlinks' in kwargs:
1059 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1060 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1061 root = os.fsdecode(broot)
1062 dirs = list(map(os.fsdecode, bdirs))
1063 files = list(map(os.fsdecode, bfiles))
1064 yield (root, dirs, files)
1065 bdirs[:] = list(map(os.fsencode, dirs))
1066 bfiles[:] = list(map(os.fsencode, files))
1067
Charles-François Natali7372b062012-02-05 15:15:38 +01001068
Guido van Rossume7ba4952007-06-06 23:52:48 +00001069class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001070 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001071 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001072
1073 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001074 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001075 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1076 os.makedirs(path) # Should work
1077 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1078 os.makedirs(path)
1079
1080 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001081 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001082 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1083 os.makedirs(path)
1084 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1085 'dir5', 'dir6')
1086 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001087
Terry Reedy5a22b652010-12-02 07:05:56 +00001088 def test_exist_ok_existing_directory(self):
1089 path = os.path.join(support.TESTFN, 'dir1')
1090 mode = 0o777
1091 old_mask = os.umask(0o022)
1092 os.makedirs(path, mode)
1093 self.assertRaises(OSError, os.makedirs, path, mode)
1094 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001095 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001096 os.makedirs(path, mode=mode, exist_ok=True)
1097 os.umask(old_mask)
1098
Martin Pantera82642f2015-11-19 04:48:44 +00001099 # Issue #25583: A drive root could raise PermissionError on Windows
1100 os.makedirs(os.path.abspath('/'), exist_ok=True)
1101
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001102 def test_exist_ok_s_isgid_directory(self):
1103 path = os.path.join(support.TESTFN, 'dir1')
1104 S_ISGID = stat.S_ISGID
1105 mode = 0o777
1106 old_mask = os.umask(0o022)
1107 try:
1108 existing_testfn_mode = stat.S_IMODE(
1109 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001110 try:
1111 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001112 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001113 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001114 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1115 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1116 # The os should apply S_ISGID from the parent dir for us, but
1117 # this test need not depend on that behavior. Be explicit.
1118 os.makedirs(path, mode | S_ISGID)
1119 # http://bugs.python.org/issue14992
1120 # Should not fail when the bit is already set.
1121 os.makedirs(path, mode, exist_ok=True)
1122 # remove the bit.
1123 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001124 # May work even when the bit is not already set when demanded.
1125 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001126 finally:
1127 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001128
1129 def test_exist_ok_existing_regular_file(self):
1130 base = support.TESTFN
1131 path = os.path.join(support.TESTFN, 'dir1')
1132 f = open(path, 'w')
1133 f.write('abc')
1134 f.close()
1135 self.assertRaises(OSError, os.makedirs, path)
1136 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1137 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1138 os.remove(path)
1139
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001140 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001141 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001142 'dir4', 'dir5', 'dir6')
1143 # If the tests failed, the bottom-most directory ('../dir6')
1144 # may not have been created, so we look for the outermost directory
1145 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001146 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001147 path = os.path.dirname(path)
1148
1149 os.removedirs(path)
1150
Andrew Svetlov405faed2012-12-25 12:18:09 +02001151
R David Murrayf2ad1732014-12-25 18:36:56 -05001152@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1153class ChownFileTests(unittest.TestCase):
1154
Berker Peksag036a71b2015-07-21 09:29:48 +03001155 @classmethod
1156 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001157 os.mkdir(support.TESTFN)
1158
1159 def test_chown_uid_gid_arguments_must_be_index(self):
1160 stat = os.stat(support.TESTFN)
1161 uid = stat.st_uid
1162 gid = stat.st_gid
1163 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1164 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1165 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1166 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1167 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1168
1169 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1170 def test_chown(self):
1171 gid_1, gid_2 = groups[:2]
1172 uid = os.stat(support.TESTFN).st_uid
1173 os.chown(support.TESTFN, uid, gid_1)
1174 gid = os.stat(support.TESTFN).st_gid
1175 self.assertEqual(gid, gid_1)
1176 os.chown(support.TESTFN, uid, gid_2)
1177 gid = os.stat(support.TESTFN).st_gid
1178 self.assertEqual(gid, gid_2)
1179
1180 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1181 "test needs root privilege and more than one user")
1182 def test_chown_with_root(self):
1183 uid_1, uid_2 = all_users[:2]
1184 gid = os.stat(support.TESTFN).st_gid
1185 os.chown(support.TESTFN, uid_1, gid)
1186 uid = os.stat(support.TESTFN).st_uid
1187 self.assertEqual(uid, uid_1)
1188 os.chown(support.TESTFN, uid_2, gid)
1189 uid = os.stat(support.TESTFN).st_uid
1190 self.assertEqual(uid, uid_2)
1191
1192 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1193 "test needs non-root account and more than one user")
1194 def test_chown_without_permission(self):
1195 uid_1, uid_2 = all_users[:2]
1196 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001197 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001198 os.chown(support.TESTFN, uid_1, gid)
1199 os.chown(support.TESTFN, uid_2, gid)
1200
Berker Peksag036a71b2015-07-21 09:29:48 +03001201 @classmethod
1202 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001203 os.rmdir(support.TESTFN)
1204
1205
Andrew Svetlov405faed2012-12-25 12:18:09 +02001206class RemoveDirsTests(unittest.TestCase):
1207 def setUp(self):
1208 os.makedirs(support.TESTFN)
1209
1210 def tearDown(self):
1211 support.rmtree(support.TESTFN)
1212
1213 def test_remove_all(self):
1214 dira = os.path.join(support.TESTFN, 'dira')
1215 os.mkdir(dira)
1216 dirb = os.path.join(dira, 'dirb')
1217 os.mkdir(dirb)
1218 os.removedirs(dirb)
1219 self.assertFalse(os.path.exists(dirb))
1220 self.assertFalse(os.path.exists(dira))
1221 self.assertFalse(os.path.exists(support.TESTFN))
1222
1223 def test_remove_partial(self):
1224 dira = os.path.join(support.TESTFN, 'dira')
1225 os.mkdir(dira)
1226 dirb = os.path.join(dira, 'dirb')
1227 os.mkdir(dirb)
1228 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1229 f.write('text')
1230 os.removedirs(dirb)
1231 self.assertFalse(os.path.exists(dirb))
1232 self.assertTrue(os.path.exists(dira))
1233 self.assertTrue(os.path.exists(support.TESTFN))
1234
1235 def test_remove_nothing(self):
1236 dira = os.path.join(support.TESTFN, 'dira')
1237 os.mkdir(dira)
1238 dirb = os.path.join(dira, 'dirb')
1239 os.mkdir(dirb)
1240 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1241 f.write('text')
1242 with self.assertRaises(OSError):
1243 os.removedirs(dirb)
1244 self.assertTrue(os.path.exists(dirb))
1245 self.assertTrue(os.path.exists(dira))
1246 self.assertTrue(os.path.exists(support.TESTFN))
1247
1248
Guido van Rossume7ba4952007-06-06 23:52:48 +00001249class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001250 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001251 with open(os.devnull, 'wb') as f:
1252 f.write(b'hello')
1253 f.close()
1254 with open(os.devnull, 'rb') as f:
1255 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001256
Andrew Svetlov405faed2012-12-25 12:18:09 +02001257
Guido van Rossume7ba4952007-06-06 23:52:48 +00001258class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001259 def test_urandom_length(self):
1260 self.assertEqual(len(os.urandom(0)), 0)
1261 self.assertEqual(len(os.urandom(1)), 1)
1262 self.assertEqual(len(os.urandom(10)), 10)
1263 self.assertEqual(len(os.urandom(100)), 100)
1264 self.assertEqual(len(os.urandom(1000)), 1000)
1265
1266 def test_urandom_value(self):
1267 data1 = os.urandom(16)
1268 data2 = os.urandom(16)
1269 self.assertNotEqual(data1, data2)
1270
1271 def get_urandom_subprocess(self, count):
1272 code = '\n'.join((
1273 'import os, sys',
1274 'data = os.urandom(%s)' % count,
1275 'sys.stdout.buffer.write(data)',
1276 'sys.stdout.buffer.flush()'))
1277 out = assert_python_ok('-c', code)
1278 stdout = out[1]
1279 self.assertEqual(len(stdout), 16)
1280 return stdout
1281
1282 def test_urandom_subprocess(self):
1283 data1 = self.get_urandom_subprocess(16)
1284 data2 = self.get_urandom_subprocess(16)
1285 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001286
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001287
Victor Stinnerbae2d622015-10-01 09:47:30 +02001288# os.urandom() doesn't use a file descriptor when it is implemented with the
1289# getentropy() function, the getrandom() function or the getrandom() syscall
1290OS_URANDOM_DONT_USE_FD = (
1291 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1292 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1293 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001294
Victor Stinnerbae2d622015-10-01 09:47:30 +02001295@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1296 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001297class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001298 @unittest.skipUnless(resource, "test requires the resource module")
1299 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001300 # Check urandom() failing when it is not able to open /dev/random.
1301 # We spawn a new process to make the test more robust (if getrlimit()
1302 # failed to restore the file descriptor limit after this, the whole
1303 # test suite would crash; this actually happened on the OS X Tiger
1304 # buildbot).
1305 code = """if 1:
1306 import errno
1307 import os
1308 import resource
1309
1310 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1311 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1312 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001313 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001314 except OSError as e:
1315 assert e.errno == errno.EMFILE, e.errno
1316 else:
1317 raise AssertionError("OSError not raised")
1318 """
1319 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001320
Antoine Pitroue472aea2014-04-26 14:33:03 +02001321 def test_urandom_fd_closed(self):
1322 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1323 # closed.
1324 code = """if 1:
1325 import os
1326 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001327 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001328 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001329 with test.support.SuppressCrashReport():
1330 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001331 sys.stdout.buffer.write(os.urandom(4))
1332 """
1333 rc, out, err = assert_python_ok('-Sc', code)
1334
1335 def test_urandom_fd_reopened(self):
1336 # Issue #21207: urandom() should detect its fd to /dev/urandom
1337 # changed to something else, and reopen it.
1338 with open(support.TESTFN, 'wb') as f:
1339 f.write(b"x" * 256)
1340 self.addCleanup(os.unlink, support.TESTFN)
1341 code = """if 1:
1342 import os
1343 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001344 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001345 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001346 with test.support.SuppressCrashReport():
1347 for fd in range(3, 256):
1348 try:
1349 os.close(fd)
1350 except OSError:
1351 pass
1352 else:
1353 # Found the urandom fd (XXX hopefully)
1354 break
1355 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001356 with open({TESTFN!r}, 'rb') as f:
1357 os.dup2(f.fileno(), fd)
1358 sys.stdout.buffer.write(os.urandom(4))
1359 sys.stdout.buffer.write(os.urandom(4))
1360 """.format(TESTFN=support.TESTFN)
1361 rc, out, err = assert_python_ok('-Sc', code)
1362 self.assertEqual(len(out), 8)
1363 self.assertNotEqual(out[0:4], out[4:8])
1364 rc, out2, err2 = assert_python_ok('-Sc', code)
1365 self.assertEqual(len(out2), 8)
1366 self.assertNotEqual(out2, out)
1367
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001368
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001369@contextlib.contextmanager
1370def _execvpe_mockup(defpath=None):
1371 """
1372 Stubs out execv and execve functions when used as context manager.
1373 Records exec calls. The mock execv and execve functions always raise an
1374 exception as they would normally never return.
1375 """
1376 # A list of tuples containing (function name, first arg, args)
1377 # of calls to execv or execve that have been made.
1378 calls = []
1379
1380 def mock_execv(name, *args):
1381 calls.append(('execv', name, args))
1382 raise RuntimeError("execv called")
1383
1384 def mock_execve(name, *args):
1385 calls.append(('execve', name, args))
1386 raise OSError(errno.ENOTDIR, "execve called")
1387
1388 try:
1389 orig_execv = os.execv
1390 orig_execve = os.execve
1391 orig_defpath = os.defpath
1392 os.execv = mock_execv
1393 os.execve = mock_execve
1394 if defpath is not None:
1395 os.defpath = defpath
1396 yield calls
1397 finally:
1398 os.execv = orig_execv
1399 os.execve = orig_execve
1400 os.defpath = orig_defpath
1401
Guido van Rossume7ba4952007-06-06 23:52:48 +00001402class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001403 @unittest.skipIf(USING_LINUXTHREADS,
1404 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001405 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001406 self.assertRaises(OSError, os.execvpe, 'no such app-',
1407 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001408
Thomas Heller6790d602007-08-30 17:15:14 +00001409 def test_execvpe_with_bad_arglist(self):
1410 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1411
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001412 @unittest.skipUnless(hasattr(os, '_execvpe'),
1413 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001414 def _test_internal_execvpe(self, test_type):
1415 program_path = os.sep + 'absolutepath'
1416 if test_type is bytes:
1417 program = b'executable'
1418 fullpath = os.path.join(os.fsencode(program_path), program)
1419 native_fullpath = fullpath
1420 arguments = [b'progname', 'arg1', 'arg2']
1421 else:
1422 program = 'executable'
1423 arguments = ['progname', 'arg1', 'arg2']
1424 fullpath = os.path.join(program_path, program)
1425 if os.name != "nt":
1426 native_fullpath = os.fsencode(fullpath)
1427 else:
1428 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001429 env = {'spam': 'beans'}
1430
Victor Stinnerb745a742010-05-18 17:17:23 +00001431 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001432 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001433 self.assertRaises(RuntimeError,
1434 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001435 self.assertEqual(len(calls), 1)
1436 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1437
Victor Stinnerb745a742010-05-18 17:17:23 +00001438 # test os._execvpe() with a relative path:
1439 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001440 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001441 self.assertRaises(OSError,
1442 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001443 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001444 self.assertSequenceEqual(calls[0],
1445 ('execve', native_fullpath, (arguments, env)))
1446
1447 # test os._execvpe() with a relative path:
1448 # os.get_exec_path() reads the 'PATH' variable
1449 with _execvpe_mockup() as calls:
1450 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001451 if test_type is bytes:
1452 env_path[b'PATH'] = program_path
1453 else:
1454 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001455 self.assertRaises(OSError,
1456 os._execvpe, program, arguments, env=env_path)
1457 self.assertEqual(len(calls), 1)
1458 self.assertSequenceEqual(calls[0],
1459 ('execve', native_fullpath, (arguments, env_path)))
1460
1461 def test_internal_execvpe_str(self):
1462 self._test_internal_execvpe(str)
1463 if os.name != "nt":
1464 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001465
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001466
Serhiy Storchaka43767632013-11-03 21:31:38 +02001467@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001468class Win32ErrorTests(unittest.TestCase):
1469 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001470 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001471
1472 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001473 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001474
1475 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001476 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001477
1478 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001479 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001480 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001481 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001482 finally:
1483 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001484 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001485
1486 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001487 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001488
Thomas Wouters477c8d52006-05-27 19:21:47 +00001489 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001490 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001491
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001492class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001493 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001494 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1495 #singles.append("close")
Martin Panter99e843b2016-09-10 10:38:28 +00001496 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001497 def get_single(f):
1498 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001499 if hasattr(os, f):
1500 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001501 return helper
1502 for f in singles:
1503 locals()["test_"+f] = get_single(f)
1504
Benjamin Peterson7522c742009-01-19 21:00:09 +00001505 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001506 try:
1507 f(support.make_bad_fd(), *args)
1508 except OSError as e:
1509 self.assertEqual(e.errno, errno.EBADF)
1510 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001511 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001512 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001513
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001515 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001516 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001517
Serhiy Storchaka43767632013-11-03 21:31:38 +02001518 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001519 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001520 fd = support.make_bad_fd()
1521 # Make sure none of the descriptors we are about to close are
1522 # currently valid (issue 6542).
1523 for i in range(10):
1524 try: os.fstat(fd+i)
1525 except OSError:
1526 pass
1527 else:
1528 break
1529 if i < 2:
1530 raise unittest.SkipTest(
1531 "Unable to acquire a range of invalid file descriptors")
1532 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001533
Serhiy Storchaka43767632013-11-03 21:31:38 +02001534 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001535 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001536 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001537
Serhiy Storchaka43767632013-11-03 21:31:38 +02001538 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001539 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001540 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001541
Serhiy Storchaka43767632013-11-03 21:31:38 +02001542 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001543 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001544 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001545
Serhiy Storchaka43767632013-11-03 21:31:38 +02001546 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001547 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001548 self.check(os.pathconf, "PC_NAME_MAX")
1549 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001550
Serhiy Storchaka43767632013-11-03 21:31:38 +02001551 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001552 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001553 self.check(os.truncate, 0)
1554 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001555
Serhiy Storchaka43767632013-11-03 21:31:38 +02001556 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001557 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001558 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001559
Serhiy Storchaka43767632013-11-03 21:31:38 +02001560 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001561 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001562 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001563
Victor Stinner57ddf782014-01-08 15:21:28 +01001564 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1565 def test_readv(self):
1566 buf = bytearray(10)
1567 self.check(os.readv, [buf])
1568
Serhiy Storchaka43767632013-11-03 21:31:38 +02001569 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001570 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001571 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001572
Serhiy Storchaka43767632013-11-03 21:31:38 +02001573 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001574 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001575 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001576
Victor Stinner57ddf782014-01-08 15:21:28 +01001577 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1578 def test_writev(self):
1579 self.check(os.writev, [b'abc'])
1580
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001581 def test_inheritable(self):
1582 self.check(os.get_inheritable)
1583 self.check(os.set_inheritable, True)
1584
1585 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1586 'needs os.get_blocking() and os.set_blocking()')
1587 def test_blocking(self):
1588 self.check(os.get_blocking)
1589 self.check(os.set_blocking, True)
1590
Brian Curtin1b9df392010-11-24 20:24:31 +00001591
1592class LinkTests(unittest.TestCase):
1593 def setUp(self):
1594 self.file1 = support.TESTFN
1595 self.file2 = os.path.join(support.TESTFN + "2")
1596
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001597 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001598 for file in (self.file1, self.file2):
1599 if os.path.exists(file):
1600 os.unlink(file)
1601
Brian Curtin1b9df392010-11-24 20:24:31 +00001602 def _test_link(self, file1, file2):
1603 with open(file1, "w") as f1:
1604 f1.write("test")
1605
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001606 with warnings.catch_warnings():
1607 warnings.simplefilter("ignore", DeprecationWarning)
1608 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001609 with open(file1, "r") as f1, open(file2, "r") as f2:
1610 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1611
1612 def test_link(self):
1613 self._test_link(self.file1, self.file2)
1614
1615 def test_link_bytes(self):
1616 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1617 bytes(self.file2, sys.getfilesystemencoding()))
1618
Brian Curtinf498b752010-11-30 15:54:04 +00001619 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001620 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001621 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001622 except UnicodeError:
1623 raise unittest.SkipTest("Unable to encode for this platform.")
1624
Brian Curtinf498b752010-11-30 15:54:04 +00001625 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001626 self.file2 = self.file1 + "2"
1627 self._test_link(self.file1, self.file2)
1628
Serhiy Storchaka43767632013-11-03 21:31:38 +02001629@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1630class PosixUidGidTests(unittest.TestCase):
1631 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1632 def test_setuid(self):
1633 if os.getuid() != 0:
1634 self.assertRaises(OSError, os.setuid, 0)
1635 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001636
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1638 def test_setgid(self):
1639 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1640 self.assertRaises(OSError, os.setgid, 0)
1641 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001642
Serhiy Storchaka43767632013-11-03 21:31:38 +02001643 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1644 def test_seteuid(self):
1645 if os.getuid() != 0:
1646 self.assertRaises(OSError, os.seteuid, 0)
1647 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001648
Serhiy Storchaka43767632013-11-03 21:31:38 +02001649 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1650 def test_setegid(self):
1651 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1652 self.assertRaises(OSError, os.setegid, 0)
1653 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001654
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1656 def test_setreuid(self):
1657 if os.getuid() != 0:
1658 self.assertRaises(OSError, os.setreuid, 0, 0)
1659 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1660 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001661
Serhiy Storchaka43767632013-11-03 21:31:38 +02001662 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1663 def test_setreuid_neg1(self):
1664 # Needs to accept -1. We run this in a subprocess to avoid
1665 # altering the test runner's process state (issue8045).
1666 subprocess.check_call([
1667 sys.executable, '-c',
1668 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001669
Serhiy Storchaka43767632013-11-03 21:31:38 +02001670 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1671 def test_setregid(self):
1672 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1673 self.assertRaises(OSError, os.setregid, 0, 0)
1674 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1675 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001676
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1678 def test_setregid_neg1(self):
1679 # Needs to accept -1. We run this in a subprocess to avoid
1680 # altering the test runner's process state (issue8045).
1681 subprocess.check_call([
1682 sys.executable, '-c',
1683 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001684
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1686class Pep383Tests(unittest.TestCase):
1687 def setUp(self):
1688 if support.TESTFN_UNENCODABLE:
1689 self.dir = support.TESTFN_UNENCODABLE
1690 elif support.TESTFN_NONASCII:
1691 self.dir = support.TESTFN_NONASCII
1692 else:
1693 self.dir = support.TESTFN
1694 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001695
Serhiy Storchaka43767632013-11-03 21:31:38 +02001696 bytesfn = []
1697 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001698 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001699 fn = os.fsencode(fn)
1700 except UnicodeEncodeError:
1701 return
1702 bytesfn.append(fn)
1703 add_filename(support.TESTFN_UNICODE)
1704 if support.TESTFN_UNENCODABLE:
1705 add_filename(support.TESTFN_UNENCODABLE)
1706 if support.TESTFN_NONASCII:
1707 add_filename(support.TESTFN_NONASCII)
1708 if not bytesfn:
1709 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001710
Serhiy Storchaka43767632013-11-03 21:31:38 +02001711 self.unicodefn = set()
1712 os.mkdir(self.dir)
1713 try:
1714 for fn in bytesfn:
1715 support.create_empty_file(os.path.join(self.bdir, fn))
1716 fn = os.fsdecode(fn)
1717 if fn in self.unicodefn:
1718 raise ValueError("duplicate filename")
1719 self.unicodefn.add(fn)
1720 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001721 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001723
Serhiy Storchaka43767632013-11-03 21:31:38 +02001724 def tearDown(self):
1725 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001726
Serhiy Storchaka43767632013-11-03 21:31:38 +02001727 def test_listdir(self):
1728 expected = self.unicodefn
1729 found = set(os.listdir(self.dir))
1730 self.assertEqual(found, expected)
1731 # test listdir without arguments
1732 current_directory = os.getcwd()
1733 try:
1734 os.chdir(os.sep)
1735 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1736 finally:
1737 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001738
Serhiy Storchaka43767632013-11-03 21:31:38 +02001739 def test_open(self):
1740 for fn in self.unicodefn:
1741 f = open(os.path.join(self.dir, fn), 'rb')
1742 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001743
Serhiy Storchaka43767632013-11-03 21:31:38 +02001744 @unittest.skipUnless(hasattr(os, 'statvfs'),
1745 "need os.statvfs()")
1746 def test_statvfs(self):
1747 # issue #9645
1748 for fn in self.unicodefn:
1749 # should not fail with file not found error
1750 fullname = os.path.join(self.dir, fn)
1751 os.statvfs(fullname)
1752
1753 def test_stat(self):
1754 for fn in self.unicodefn:
1755 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001756
Brian Curtineb24d742010-04-12 17:16:38 +00001757@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1758class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001759 def _kill(self, sig):
1760 # Start sys.executable as a subprocess and communicate from the
1761 # subprocess to the parent that the interpreter is ready. When it
1762 # becomes ready, send *sig* via os.kill to the subprocess and check
1763 # that the return code is equal to *sig*.
1764 import ctypes
1765 from ctypes import wintypes
1766 import msvcrt
1767
1768 # Since we can't access the contents of the process' stdout until the
1769 # process has exited, use PeekNamedPipe to see what's inside stdout
1770 # without waiting. This is done so we can tell that the interpreter
1771 # is started and running at a point where it could handle a signal.
1772 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1773 PeekNamedPipe.restype = wintypes.BOOL
1774 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1775 ctypes.POINTER(ctypes.c_char), # stdout buf
1776 wintypes.DWORD, # Buffer size
1777 ctypes.POINTER(wintypes.DWORD), # bytes read
1778 ctypes.POINTER(wintypes.DWORD), # bytes avail
1779 ctypes.POINTER(wintypes.DWORD)) # bytes left
1780 msg = "running"
1781 proc = subprocess.Popen([sys.executable, "-c",
1782 "import sys;"
1783 "sys.stdout.write('{}');"
1784 "sys.stdout.flush();"
1785 "input()".format(msg)],
1786 stdout=subprocess.PIPE,
1787 stderr=subprocess.PIPE,
1788 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001789 self.addCleanup(proc.stdout.close)
1790 self.addCleanup(proc.stderr.close)
1791 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001792
1793 count, max = 0, 100
1794 while count < max and proc.poll() is None:
1795 # Create a string buffer to store the result of stdout from the pipe
1796 buf = ctypes.create_string_buffer(len(msg))
1797 # Obtain the text currently in proc.stdout
1798 # Bytes read/avail/left are left as NULL and unused
1799 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1800 buf, ctypes.sizeof(buf), None, None, None)
1801 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1802 if buf.value:
1803 self.assertEqual(msg, buf.value.decode())
1804 break
1805 time.sleep(0.1)
1806 count += 1
1807 else:
1808 self.fail("Did not receive communication from the subprocess")
1809
Brian Curtineb24d742010-04-12 17:16:38 +00001810 os.kill(proc.pid, sig)
1811 self.assertEqual(proc.wait(), sig)
1812
1813 def test_kill_sigterm(self):
1814 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001815 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001816
1817 def test_kill_int(self):
1818 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001819 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001820
1821 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001822 tagname = "test_os_%s" % uuid.uuid1()
1823 m = mmap.mmap(-1, 1, tagname)
1824 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001825 # Run a script which has console control handling enabled.
1826 proc = subprocess.Popen([sys.executable,
1827 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001828 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001829 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1830 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001831 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001832 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001833 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001834 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001835 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001836 count += 1
1837 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001838 # Forcefully kill the process if we weren't able to signal it.
1839 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001840 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001841 os.kill(proc.pid, event)
1842 # proc.send_signal(event) could also be done here.
1843 # Allow time for the signal to be passed and the process to exit.
1844 time.sleep(0.5)
1845 if not proc.poll():
1846 # Forcefully kill the process if we weren't able to signal it.
1847 os.kill(proc.pid, signal.SIGINT)
1848 self.fail("subprocess did not stop on {}".format(name))
1849
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001850 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001851 def test_CTRL_C_EVENT(self):
1852 from ctypes import wintypes
1853 import ctypes
1854
1855 # Make a NULL value by creating a pointer with no argument.
1856 NULL = ctypes.POINTER(ctypes.c_int)()
1857 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1858 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1859 wintypes.BOOL)
1860 SetConsoleCtrlHandler.restype = wintypes.BOOL
1861
1862 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001863 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001864 # by subprocesses.
1865 SetConsoleCtrlHandler(NULL, 0)
1866
1867 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1868
1869 def test_CTRL_BREAK_EVENT(self):
1870 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1871
1872
Brian Curtind40e6f72010-07-08 21:39:08 +00001873@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001874class Win32ListdirTests(unittest.TestCase):
1875 """Test listdir on Windows."""
1876
1877 def setUp(self):
1878 self.created_paths = []
1879 for i in range(2):
1880 dir_name = 'SUB%d' % i
1881 dir_path = os.path.join(support.TESTFN, dir_name)
1882 file_name = 'FILE%d' % i
1883 file_path = os.path.join(support.TESTFN, file_name)
1884 os.makedirs(dir_path)
1885 with open(file_path, 'w') as f:
1886 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1887 self.created_paths.extend([dir_name, file_name])
1888 self.created_paths.sort()
1889
1890 def tearDown(self):
1891 shutil.rmtree(support.TESTFN)
1892
1893 def test_listdir_no_extended_path(self):
1894 """Test when the path is not an "extended" path."""
1895 # unicode
1896 self.assertEqual(
1897 sorted(os.listdir(support.TESTFN)),
1898 self.created_paths)
1899 # bytes
1900 self.assertEqual(
1901 sorted(os.listdir(os.fsencode(support.TESTFN))),
1902 [os.fsencode(path) for path in self.created_paths])
1903
1904 def test_listdir_extended_path(self):
1905 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001906 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001907 # unicode
1908 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1909 self.assertEqual(
1910 sorted(os.listdir(path)),
1911 self.created_paths)
1912 # bytes
1913 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1914 self.assertEqual(
1915 sorted(os.listdir(path)),
1916 [os.fsencode(path) for path in self.created_paths])
1917
1918
1919@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001920@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001921class Win32SymlinkTests(unittest.TestCase):
1922 filelink = 'filelinktest'
1923 filelink_target = os.path.abspath(__file__)
1924 dirlink = 'dirlinktest'
1925 dirlink_target = os.path.dirname(filelink_target)
1926 missing_link = 'missing link'
1927
1928 def setUp(self):
1929 assert os.path.exists(self.dirlink_target)
1930 assert os.path.exists(self.filelink_target)
1931 assert not os.path.exists(self.dirlink)
1932 assert not os.path.exists(self.filelink)
1933 assert not os.path.exists(self.missing_link)
1934
1935 def tearDown(self):
1936 if os.path.exists(self.filelink):
1937 os.remove(self.filelink)
1938 if os.path.exists(self.dirlink):
1939 os.rmdir(self.dirlink)
1940 if os.path.lexists(self.missing_link):
1941 os.remove(self.missing_link)
1942
1943 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001944 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001945 self.assertTrue(os.path.exists(self.dirlink))
1946 self.assertTrue(os.path.isdir(self.dirlink))
1947 self.assertTrue(os.path.islink(self.dirlink))
1948 self.check_stat(self.dirlink, self.dirlink_target)
1949
1950 def test_file_link(self):
1951 os.symlink(self.filelink_target, self.filelink)
1952 self.assertTrue(os.path.exists(self.filelink))
1953 self.assertTrue(os.path.isfile(self.filelink))
1954 self.assertTrue(os.path.islink(self.filelink))
1955 self.check_stat(self.filelink, self.filelink_target)
1956
1957 def _create_missing_dir_link(self):
1958 'Create a "directory" link to a non-existent target'
1959 linkname = self.missing_link
1960 if os.path.lexists(linkname):
1961 os.remove(linkname)
1962 target = r'c:\\target does not exist.29r3c740'
1963 assert not os.path.exists(target)
1964 target_is_dir = True
1965 os.symlink(target, linkname, target_is_dir)
1966
1967 def test_remove_directory_link_to_missing_target(self):
1968 self._create_missing_dir_link()
1969 # For compatibility with Unix, os.remove will check the
1970 # directory status and call RemoveDirectory if the symlink
1971 # was created with target_is_dir==True.
1972 os.remove(self.missing_link)
1973
1974 @unittest.skip("currently fails; consider for improvement")
1975 def test_isdir_on_directory_link_to_missing_target(self):
1976 self._create_missing_dir_link()
1977 # consider having isdir return true for directory links
1978 self.assertTrue(os.path.isdir(self.missing_link))
1979
1980 @unittest.skip("currently fails; consider for improvement")
1981 def test_rmdir_on_directory_link_to_missing_target(self):
1982 self._create_missing_dir_link()
1983 # consider allowing rmdir to remove directory links
1984 os.rmdir(self.missing_link)
1985
1986 def check_stat(self, link, target):
1987 self.assertEqual(os.stat(link), os.stat(target))
1988 self.assertNotEqual(os.lstat(link), os.stat(link))
1989
Brian Curtind25aef52011-06-13 15:16:04 -05001990 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001991 with warnings.catch_warnings():
1992 warnings.simplefilter("ignore", DeprecationWarning)
1993 self.assertEqual(os.stat(bytes_link), os.stat(target))
1994 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001995
1996 def test_12084(self):
1997 level1 = os.path.abspath(support.TESTFN)
1998 level2 = os.path.join(level1, "level2")
1999 level3 = os.path.join(level2, "level3")
2000 try:
2001 os.mkdir(level1)
2002 os.mkdir(level2)
2003 os.mkdir(level3)
2004
2005 file1 = os.path.abspath(os.path.join(level1, "file1"))
2006
2007 with open(file1, "w") as f:
2008 f.write("file1")
2009
2010 orig_dir = os.getcwd()
2011 try:
2012 os.chdir(level2)
2013 link = os.path.join(level2, "link")
2014 os.symlink(os.path.relpath(file1), "link")
2015 self.assertIn("link", os.listdir(os.getcwd()))
2016
2017 # Check os.stat calls from the same dir as the link
2018 self.assertEqual(os.stat(file1), os.stat("link"))
2019
2020 # Check os.stat calls from a dir below the link
2021 os.chdir(level1)
2022 self.assertEqual(os.stat(file1),
2023 os.stat(os.path.relpath(link)))
2024
2025 # Check os.stat calls from a dir above the link
2026 os.chdir(level3)
2027 self.assertEqual(os.stat(file1),
2028 os.stat(os.path.relpath(link)))
2029 finally:
2030 os.chdir(orig_dir)
2031 except OSError as err:
2032 self.fail(err)
2033 finally:
2034 os.remove(file1)
2035 shutil.rmtree(level1)
2036
Brian Curtind40e6f72010-07-08 21:39:08 +00002037
Tim Golden0321cf22014-05-05 19:46:17 +01002038@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2039class Win32JunctionTests(unittest.TestCase):
2040 junction = 'junctiontest'
2041 junction_target = os.path.dirname(os.path.abspath(__file__))
2042
2043 def setUp(self):
2044 assert os.path.exists(self.junction_target)
2045 assert not os.path.exists(self.junction)
2046
2047 def tearDown(self):
2048 if os.path.exists(self.junction):
2049 # os.rmdir delegates to Windows' RemoveDirectoryW,
2050 # which removes junction points safely.
2051 os.rmdir(self.junction)
2052
2053 def test_create_junction(self):
2054 _winapi.CreateJunction(self.junction_target, self.junction)
2055 self.assertTrue(os.path.exists(self.junction))
2056 self.assertTrue(os.path.isdir(self.junction))
2057
2058 # Junctions are not recognized as links.
2059 self.assertFalse(os.path.islink(self.junction))
2060
2061 def test_unlink_removes_junction(self):
2062 _winapi.CreateJunction(self.junction_target, self.junction)
2063 self.assertTrue(os.path.exists(self.junction))
2064
2065 os.unlink(self.junction)
2066 self.assertFalse(os.path.exists(self.junction))
2067
2068
Jason R. Coombs3a092862013-05-27 23:21:28 -04002069@support.skip_unless_symlink
2070class NonLocalSymlinkTests(unittest.TestCase):
2071
2072 def setUp(self):
2073 """
2074 Create this structure:
2075
2076 base
2077 \___ some_dir
2078 """
2079 os.makedirs('base/some_dir')
2080
2081 def tearDown(self):
2082 shutil.rmtree('base')
2083
2084 def test_directory_link_nonlocal(self):
2085 """
2086 The symlink target should resolve relative to the link, not relative
2087 to the current directory.
2088
2089 Then, link base/some_link -> base/some_dir and ensure that some_link
2090 is resolved as a directory.
2091
2092 In issue13772, it was discovered that directory detection failed if
2093 the symlink target was not specified relative to the current
2094 directory, which was a defect in the implementation.
2095 """
2096 src = os.path.join('base', 'some_link')
2097 os.symlink('some_dir', src)
2098 assert os.path.isdir(src)
2099
2100
Victor Stinnere8d51452010-08-19 01:05:19 +00002101class FSEncodingTests(unittest.TestCase):
2102 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002103 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2104 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002105
Victor Stinnere8d51452010-08-19 01:05:19 +00002106 def test_identity(self):
2107 # assert fsdecode(fsencode(x)) == x
2108 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2109 try:
2110 bytesfn = os.fsencode(fn)
2111 except UnicodeEncodeError:
2112 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002114
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002115
Brett Cannonefb00c02012-02-29 18:31:31 -05002116
2117class DeviceEncodingTests(unittest.TestCase):
2118
2119 def test_bad_fd(self):
2120 # Return None when an fd doesn't actually exist.
2121 self.assertIsNone(os.device_encoding(123456))
2122
Philip Jenveye308b7c2012-02-29 16:16:15 -08002123 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2124 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002125 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002126 def test_device_encoding(self):
2127 encoding = os.device_encoding(0)
2128 self.assertIsNotNone(encoding)
2129 self.assertTrue(codecs.lookup(encoding))
2130
2131
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002132class PidTests(unittest.TestCase):
2133 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2134 def test_getppid(self):
2135 p = subprocess.Popen([sys.executable, '-c',
2136 'import os; print(os.getppid())'],
2137 stdout=subprocess.PIPE)
2138 stdout, _ = p.communicate()
2139 # We are the parent of our subprocess
2140 self.assertEqual(int(stdout), os.getpid())
2141
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002142 def test_waitpid(self):
2143 args = [sys.executable, '-c', 'pass']
2144 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2145 status = os.waitpid(pid, 0)
2146 self.assertEqual(status, (pid, 0))
2147
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002148
Brian Curtin0151b8e2010-09-24 13:43:43 +00002149# The introduction of this TestCase caused at least two different errors on
2150# *nix buildbots. Temporarily skip this to let the buildbots move along.
2151@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002152@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2153class LoginTests(unittest.TestCase):
2154 def test_getlogin(self):
2155 user_name = os.getlogin()
2156 self.assertNotEqual(len(user_name), 0)
2157
2158
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002159@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2160 "needs os.getpriority and os.setpriority")
2161class ProgramPriorityTests(unittest.TestCase):
2162 """Tests for os.getpriority() and os.setpriority()."""
2163
2164 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002165
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002166 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2167 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2168 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002169 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2170 if base >= 19 and new_prio <= 19:
2171 raise unittest.SkipTest(
2172 "unable to reliably test setpriority at current nice level of %s" % base)
2173 else:
2174 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002175 finally:
2176 try:
2177 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2178 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002179 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002180 raise
2181
2182
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002183if threading is not None:
2184 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002185
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002186 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002187
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002188 def __init__(self, conn):
2189 asynchat.async_chat.__init__(self, conn)
2190 self.in_buffer = []
2191 self.closed = False
2192 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002193
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002194 def handle_read(self):
2195 data = self.recv(4096)
2196 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002197
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002198 def get_data(self):
2199 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002200
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002201 def handle_close(self):
2202 self.close()
2203 self.closed = True
2204
2205 def handle_error(self):
2206 raise
2207
2208 def __init__(self, address):
2209 threading.Thread.__init__(self)
2210 asyncore.dispatcher.__init__(self)
2211 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2212 self.bind(address)
2213 self.listen(5)
2214 self.host, self.port = self.socket.getsockname()[:2]
2215 self.handler_instance = None
2216 self._active = False
2217 self._active_lock = threading.Lock()
2218
2219 # --- public API
2220
2221 @property
2222 def running(self):
2223 return self._active
2224
2225 def start(self):
2226 assert not self.running
2227 self.__flag = threading.Event()
2228 threading.Thread.start(self)
2229 self.__flag.wait()
2230
2231 def stop(self):
2232 assert self.running
2233 self._active = False
2234 self.join()
2235
2236 def wait(self):
2237 # wait for handler connection to be closed, then stop the server
2238 while not getattr(self.handler_instance, "closed", False):
2239 time.sleep(0.001)
2240 self.stop()
2241
2242 # --- internals
2243
2244 def run(self):
2245 self._active = True
2246 self.__flag.set()
2247 while self._active and asyncore.socket_map:
2248 self._active_lock.acquire()
2249 asyncore.loop(timeout=0.001, count=1)
2250 self._active_lock.release()
2251 asyncore.close_all()
2252
2253 def handle_accept(self):
2254 conn, addr = self.accept()
2255 self.handler_instance = self.Handler(conn)
2256
2257 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002258 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002259 handle_read = handle_connect
2260
2261 def writable(self):
2262 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002263
2264 def handle_error(self):
2265 raise
2266
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002267
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002268@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002269@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2270class TestSendfile(unittest.TestCase):
2271
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002272 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002273 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002274 not sys.platform.startswith("solaris") and \
2275 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002276 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2277 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002278
2279 @classmethod
2280 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002281 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002282 with open(support.TESTFN, "wb") as f:
2283 f.write(cls.DATA)
2284
2285 @classmethod
2286 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002287 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002288 support.unlink(support.TESTFN)
2289
2290 def setUp(self):
2291 self.server = SendfileTestServer((support.HOST, 0))
2292 self.server.start()
2293 self.client = socket.socket()
2294 self.client.connect((self.server.host, self.server.port))
2295 self.client.settimeout(1)
2296 # synchronize by waiting for "220 ready" response
2297 self.client.recv(1024)
2298 self.sockno = self.client.fileno()
2299 self.file = open(support.TESTFN, 'rb')
2300 self.fileno = self.file.fileno()
2301
2302 def tearDown(self):
2303 self.file.close()
2304 self.client.close()
2305 if self.server.running:
2306 self.server.stop()
2307
2308 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2309 """A higher level wrapper representing how an application is
2310 supposed to use sendfile().
2311 """
2312 while 1:
2313 try:
2314 if self.SUPPORT_HEADERS_TRAILERS:
2315 return os.sendfile(sock, file, offset, nbytes, headers,
2316 trailers)
2317 else:
2318 return os.sendfile(sock, file, offset, nbytes)
2319 except OSError as err:
2320 if err.errno == errno.ECONNRESET:
2321 # disconnected
2322 raise
2323 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2324 # we have to retry send data
2325 continue
2326 else:
2327 raise
2328
2329 def test_send_whole_file(self):
2330 # normal send
2331 total_sent = 0
2332 offset = 0
2333 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002334 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002335 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2336 if sent == 0:
2337 break
2338 offset += sent
2339 total_sent += sent
2340 self.assertTrue(sent <= nbytes)
2341 self.assertEqual(offset, total_sent)
2342
2343 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002344 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002345 self.client.close()
2346 self.server.wait()
2347 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002348 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002349 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002350
2351 def test_send_at_certain_offset(self):
2352 # start sending a file at a certain offset
2353 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002354 offset = len(self.DATA) // 2
2355 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002356 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002357 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002358 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2359 if sent == 0:
2360 break
2361 offset += sent
2362 total_sent += sent
2363 self.assertTrue(sent <= nbytes)
2364
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002365 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002366 self.client.close()
2367 self.server.wait()
2368 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002369 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002370 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002371 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002372 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002373
2374 def test_offset_overflow(self):
2375 # specify an offset > file size
2376 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002377 try:
2378 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2379 except OSError as e:
2380 # Solaris can raise EINVAL if offset >= file length, ignore.
2381 if e.errno != errno.EINVAL:
2382 raise
2383 else:
2384 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002385 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002386 self.client.close()
2387 self.server.wait()
2388 data = self.server.handler_instance.get_data()
2389 self.assertEqual(data, b'')
2390
2391 def test_invalid_offset(self):
2392 with self.assertRaises(OSError) as cm:
2393 os.sendfile(self.sockno, self.fileno, -1, 4096)
2394 self.assertEqual(cm.exception.errno, errno.EINVAL)
2395
Martin Panterbf19d162015-09-09 01:01:13 +00002396 def test_keywords(self):
2397 # Keyword arguments should be supported
2398 os.sendfile(out=self.sockno, offset=0, count=4096,
2399 **{'in': self.fileno})
2400 if self.SUPPORT_HEADERS_TRAILERS:
2401 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002402 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002403
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002404 # --- headers / trailers tests
2405
Serhiy Storchaka43767632013-11-03 21:31:38 +02002406 @requires_headers_trailers
2407 def test_headers(self):
2408 total_sent = 0
2409 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2410 headers=[b"x" * 512])
2411 total_sent += sent
2412 offset = 4096
2413 nbytes = 4096
2414 while 1:
2415 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2416 offset, nbytes)
2417 if sent == 0:
2418 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002419 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002420 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002421
Serhiy Storchaka43767632013-11-03 21:31:38 +02002422 expected_data = b"x" * 512 + self.DATA
2423 self.assertEqual(total_sent, len(expected_data))
2424 self.client.close()
2425 self.server.wait()
2426 data = self.server.handler_instance.get_data()
2427 self.assertEqual(hash(data), hash(expected_data))
2428
2429 @requires_headers_trailers
2430 def test_trailers(self):
2431 TESTFN2 = support.TESTFN + "2"
2432 file_data = b"abcdef"
2433 with open(TESTFN2, 'wb') as f:
2434 f.write(file_data)
2435 with open(TESTFN2, 'rb')as f:
2436 self.addCleanup(os.remove, TESTFN2)
2437 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2438 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002439 self.client.close()
2440 self.server.wait()
2441 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002442 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002443
Serhiy Storchaka43767632013-11-03 21:31:38 +02002444 @requires_headers_trailers
2445 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2446 'test needs os.SF_NODISKIO')
2447 def test_flags(self):
2448 try:
2449 os.sendfile(self.sockno, self.fileno, 0, 4096,
2450 flags=os.SF_NODISKIO)
2451 except OSError as err:
2452 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2453 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002454
2455
Larry Hastings9cf065c2012-06-22 16:30:09 -07002456def supports_extended_attributes():
2457 if not hasattr(os, "setxattr"):
2458 return False
2459 try:
2460 with open(support.TESTFN, "wb") as fp:
2461 try:
2462 os.setxattr(fp.fileno(), b"user.test", b"")
2463 except OSError:
2464 return False
2465 finally:
2466 support.unlink(support.TESTFN)
2467 # Kernels < 2.6.39 don't respect setxattr flags.
2468 kernel_version = platform.release()
2469 m = re.match("2.6.(\d{1,2})", kernel_version)
2470 return m is None or int(m.group(1)) >= 39
2471
2472
2473@unittest.skipUnless(supports_extended_attributes(),
2474 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002475class ExtendedAttributeTests(unittest.TestCase):
2476
2477 def tearDown(self):
2478 support.unlink(support.TESTFN)
2479
Larry Hastings9cf065c2012-06-22 16:30:09 -07002480 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002481 fn = support.TESTFN
2482 open(fn, "wb").close()
2483 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002484 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002485 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002486 init_xattr = listxattr(fn)
2487 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002488 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002489 xattr = set(init_xattr)
2490 xattr.add("user.test")
2491 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002492 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2493 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2494 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002495 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002496 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002497 self.assertEqual(cm.exception.errno, errno.EEXIST)
2498 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002499 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002500 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002501 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002502 xattr.add("user.test2")
2503 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002504 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002505 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002506 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002507 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002508 xattr.remove("user.test")
2509 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002510 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2511 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2512 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2513 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002514 many = sorted("user.test{}".format(i) for i in range(100))
2515 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002516 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002517 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002518
Larry Hastings9cf065c2012-06-22 16:30:09 -07002519 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002520 def make_bytes(s):
2521 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002522 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002523 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002524 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002525
2526 def test_simple(self):
2527 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2528 os.listxattr)
2529
2530 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002531 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2532 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002533
2534 def test_fds(self):
2535 def getxattr(path, *args):
2536 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002537 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002538 def setxattr(path, *args):
2539 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002540 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002541 def removexattr(path, *args):
2542 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002543 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002544 def listxattr(path, *args):
2545 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002546 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002547 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2548
2549
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002550@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2551class Win32DeprecatedBytesAPI(unittest.TestCase):
2552 def test_deprecated(self):
2553 import nt
2554 filename = os.fsencode(support.TESTFN)
2555 with warnings.catch_warnings():
2556 warnings.simplefilter("error", DeprecationWarning)
2557 for func, *args in (
2558 (nt._getfullpathname, filename),
2559 (nt._isdir, filename),
2560 (os.access, filename, os.R_OK),
2561 (os.chdir, filename),
2562 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002563 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002564 (os.link, filename, filename),
2565 (os.listdir, filename),
2566 (os.lstat, filename),
2567 (os.mkdir, filename),
2568 (os.open, filename, os.O_RDONLY),
2569 (os.rename, filename, filename),
2570 (os.rmdir, filename),
2571 (os.startfile, filename),
2572 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002573 (os.unlink, filename),
2574 (os.utime, filename),
2575 ):
2576 self.assertRaises(DeprecationWarning, func, *args)
2577
Victor Stinner28216442011-11-16 00:34:44 +01002578 @support.skip_unless_symlink
2579 def test_symlink(self):
2580 filename = os.fsencode(support.TESTFN)
2581 with warnings.catch_warnings():
2582 warnings.simplefilter("error", DeprecationWarning)
2583 self.assertRaises(DeprecationWarning,
2584 os.symlink, filename, filename)
2585
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002586
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002587@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2588class TermsizeTests(unittest.TestCase):
2589 def test_does_not_crash(self):
2590 """Check if get_terminal_size() returns a meaningful value.
2591
2592 There's no easy portable way to actually check the size of the
2593 terminal, so let's check if it returns something sensible instead.
2594 """
2595 try:
2596 size = os.get_terminal_size()
2597 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002598 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002599 # Under win32 a generic OSError can be thrown if the
2600 # handle cannot be retrieved
2601 self.skipTest("failed to query terminal size")
2602 raise
2603
Antoine Pitroucfade362012-02-08 23:48:59 +01002604 self.assertGreaterEqual(size.columns, 0)
2605 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002606
2607 def test_stty_match(self):
2608 """Check if stty returns the same results
2609
2610 stty actually tests stdin, so get_terminal_size is invoked on
2611 stdin explicitly. If stty succeeded, then get_terminal_size()
2612 should work too.
2613 """
2614 try:
2615 size = subprocess.check_output(['stty', 'size']).decode().split()
2616 except (FileNotFoundError, subprocess.CalledProcessError):
2617 self.skipTest("stty invocation failed")
2618 expected = (int(size[1]), int(size[0])) # reversed order
2619
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002620 try:
2621 actual = os.get_terminal_size(sys.__stdin__.fileno())
2622 except OSError as e:
2623 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2624 # Under win32 a generic OSError can be thrown if the
2625 # handle cannot be retrieved
2626 self.skipTest("failed to query terminal size")
2627 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002628 self.assertEqual(expected, actual)
2629
2630
Victor Stinner292c8352012-10-30 02:17:38 +01002631class OSErrorTests(unittest.TestCase):
2632 def setUp(self):
2633 class Str(str):
2634 pass
2635
Victor Stinnerafe17062012-10-31 22:47:43 +01002636 self.bytes_filenames = []
2637 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002638 if support.TESTFN_UNENCODABLE is not None:
2639 decoded = support.TESTFN_UNENCODABLE
2640 else:
2641 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002642 self.unicode_filenames.append(decoded)
2643 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002644 if support.TESTFN_UNDECODABLE is not None:
2645 encoded = support.TESTFN_UNDECODABLE
2646 else:
2647 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002648 self.bytes_filenames.append(encoded)
2649 self.bytes_filenames.append(memoryview(encoded))
2650
2651 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002652
2653 def test_oserror_filename(self):
2654 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002655 (self.filenames, os.chdir,),
2656 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002657 (self.filenames, os.lstat,),
2658 (self.filenames, os.open, os.O_RDONLY),
2659 (self.filenames, os.rmdir,),
2660 (self.filenames, os.stat,),
2661 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002662 ]
2663 if sys.platform == "win32":
2664 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002665 (self.bytes_filenames, os.rename, b"dst"),
2666 (self.bytes_filenames, os.replace, b"dst"),
2667 (self.unicode_filenames, os.rename, "dst"),
2668 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002669 # Issue #16414: Don't test undecodable names with listdir()
2670 # because of a Windows bug.
2671 #
2672 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2673 # empty list (instead of failing), whereas os.listdir(b'\xff')
2674 # raises a FileNotFoundError. It looks like a Windows bug:
2675 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2676 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2677 # ERROR_PATH_NOT_FOUND (3).
2678 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002679 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002680 else:
2681 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002682 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002683 (self.filenames, os.rename, "dst"),
2684 (self.filenames, os.replace, "dst"),
2685 ))
2686 if hasattr(os, "chown"):
2687 funcs.append((self.filenames, os.chown, 0, 0))
2688 if hasattr(os, "lchown"):
2689 funcs.append((self.filenames, os.lchown, 0, 0))
2690 if hasattr(os, "truncate"):
2691 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002692 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002693 funcs.append((self.filenames, os.chflags, 0))
2694 if hasattr(os, "lchflags"):
2695 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002696 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002697 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002698 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002699 if sys.platform == "win32":
2700 funcs.append((self.bytes_filenames, os.link, b"dst"))
2701 funcs.append((self.unicode_filenames, os.link, "dst"))
2702 else:
2703 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002704 if hasattr(os, "listxattr"):
2705 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002706 (self.filenames, os.listxattr,),
2707 (self.filenames, os.getxattr, "user.test"),
2708 (self.filenames, os.setxattr, "user.test", b'user'),
2709 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002710 ))
2711 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002712 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002713 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002714 if sys.platform == "win32":
2715 funcs.append((self.unicode_filenames, os.readlink,))
2716 else:
2717 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002718
Victor Stinnerafe17062012-10-31 22:47:43 +01002719 for filenames, func, *func_args in funcs:
2720 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002721 try:
2722 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002723 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002724 self.assertIs(err.filename, name)
2725 else:
2726 self.fail("No exception thrown by {}".format(func))
2727
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002728class CPUCountTests(unittest.TestCase):
2729 def test_cpu_count(self):
2730 cpus = os.cpu_count()
2731 if cpus is not None:
2732 self.assertIsInstance(cpus, int)
2733 self.assertGreater(cpus, 0)
2734 else:
2735 self.skipTest("Could not determine the number of CPUs")
2736
Victor Stinnerdaf45552013-08-28 00:53:59 +02002737
2738class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002739 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002740 fd = os.open(__file__, os.O_RDONLY)
2741 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002742 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002743
Victor Stinnerdaf45552013-08-28 00:53:59 +02002744 os.set_inheritable(fd, True)
2745 self.assertEqual(os.get_inheritable(fd), True)
2746
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002747 @unittest.skipIf(fcntl is None, "need fcntl")
2748 def test_get_inheritable_cloexec(self):
2749 fd = os.open(__file__, os.O_RDONLY)
2750 self.addCleanup(os.close, fd)
2751 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002752
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002753 # clear FD_CLOEXEC flag
2754 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2755 flags &= ~fcntl.FD_CLOEXEC
2756 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002757
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002758 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002759
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002760 @unittest.skipIf(fcntl is None, "need fcntl")
2761 def test_set_inheritable_cloexec(self):
2762 fd = os.open(__file__, os.O_RDONLY)
2763 self.addCleanup(os.close, fd)
2764 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2765 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002766
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002767 os.set_inheritable(fd, True)
2768 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2769 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002770
Victor Stinnerdaf45552013-08-28 00:53:59 +02002771 def test_open(self):
2772 fd = os.open(__file__, os.O_RDONLY)
2773 self.addCleanup(os.close, fd)
2774 self.assertEqual(os.get_inheritable(fd), False)
2775
2776 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2777 def test_pipe(self):
2778 rfd, wfd = os.pipe()
2779 self.addCleanup(os.close, rfd)
2780 self.addCleanup(os.close, wfd)
2781 self.assertEqual(os.get_inheritable(rfd), False)
2782 self.assertEqual(os.get_inheritable(wfd), False)
2783
2784 def test_dup(self):
2785 fd1 = os.open(__file__, os.O_RDONLY)
2786 self.addCleanup(os.close, fd1)
2787
2788 fd2 = os.dup(fd1)
2789 self.addCleanup(os.close, fd2)
2790 self.assertEqual(os.get_inheritable(fd2), False)
2791
2792 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2793 def test_dup2(self):
2794 fd = os.open(__file__, os.O_RDONLY)
2795 self.addCleanup(os.close, fd)
2796
2797 # inheritable by default
2798 fd2 = os.open(__file__, os.O_RDONLY)
2799 try:
2800 os.dup2(fd, fd2)
2801 self.assertEqual(os.get_inheritable(fd2), True)
2802 finally:
2803 os.close(fd2)
2804
2805 # force non-inheritable
2806 fd3 = os.open(__file__, os.O_RDONLY)
2807 try:
2808 os.dup2(fd, fd3, inheritable=False)
2809 self.assertEqual(os.get_inheritable(fd3), False)
2810 finally:
2811 os.close(fd3)
2812
2813 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2814 def test_openpty(self):
2815 master_fd, slave_fd = os.openpty()
2816 self.addCleanup(os.close, master_fd)
2817 self.addCleanup(os.close, slave_fd)
2818 self.assertEqual(os.get_inheritable(master_fd), False)
2819 self.assertEqual(os.get_inheritable(slave_fd), False)
2820
2821
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002822@unittest.skipUnless(hasattr(os, 'get_blocking'),
2823 'needs os.get_blocking() and os.set_blocking()')
2824class BlockingTests(unittest.TestCase):
2825 def test_blocking(self):
2826 fd = os.open(__file__, os.O_RDONLY)
2827 self.addCleanup(os.close, fd)
2828 self.assertEqual(os.get_blocking(fd), True)
2829
2830 os.set_blocking(fd, False)
2831 self.assertEqual(os.get_blocking(fd), False)
2832
2833 os.set_blocking(fd, True)
2834 self.assertEqual(os.get_blocking(fd), True)
2835
2836
Yury Selivanov97e2e062014-09-26 12:33:06 -04002837
2838class ExportsTests(unittest.TestCase):
2839 def test_os_all(self):
2840 self.assertIn('open', os.__all__)
2841 self.assertIn('walk', os.__all__)
2842
2843
Victor Stinner6036e442015-03-08 01:58:04 +01002844class TestScandir(unittest.TestCase):
2845 def setUp(self):
2846 self.path = os.path.realpath(support.TESTFN)
2847 self.addCleanup(support.rmtree, self.path)
2848 os.mkdir(self.path)
2849
2850 def create_file(self, name="file.txt"):
2851 filename = os.path.join(self.path, name)
2852 with open(filename, "wb") as fp:
2853 fp.write(b'python')
2854 return filename
2855
2856 def get_entries(self, names):
2857 entries = dict((entry.name, entry)
2858 for entry in os.scandir(self.path))
2859 self.assertEqual(sorted(entries.keys()), names)
2860 return entries
2861
2862 def assert_stat_equal(self, stat1, stat2, skip_fields):
2863 if skip_fields:
2864 for attr in dir(stat1):
2865 if not attr.startswith("st_"):
2866 continue
2867 if attr in ("st_dev", "st_ino", "st_nlink"):
2868 continue
2869 self.assertEqual(getattr(stat1, attr),
2870 getattr(stat2, attr),
2871 (stat1, stat2, attr))
2872 else:
2873 self.assertEqual(stat1, stat2)
2874
2875 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2876 self.assertEqual(entry.name, name)
2877 self.assertEqual(entry.path, os.path.join(self.path, name))
2878 self.assertEqual(entry.inode(),
2879 os.stat(entry.path, follow_symlinks=False).st_ino)
2880
2881 entry_stat = os.stat(entry.path)
2882 self.assertEqual(entry.is_dir(),
2883 stat.S_ISDIR(entry_stat.st_mode))
2884 self.assertEqual(entry.is_file(),
2885 stat.S_ISREG(entry_stat.st_mode))
2886 self.assertEqual(entry.is_symlink(),
2887 os.path.islink(entry.path))
2888
2889 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2890 self.assertEqual(entry.is_dir(follow_symlinks=False),
2891 stat.S_ISDIR(entry_lstat.st_mode))
2892 self.assertEqual(entry.is_file(follow_symlinks=False),
2893 stat.S_ISREG(entry_lstat.st_mode))
2894
2895 self.assert_stat_equal(entry.stat(),
2896 entry_stat,
2897 os.name == 'nt' and not is_symlink)
2898 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2899 entry_lstat,
2900 os.name == 'nt')
2901
2902 def test_attributes(self):
2903 link = hasattr(os, 'link')
2904 symlink = support.can_symlink()
2905
2906 dirname = os.path.join(self.path, "dir")
2907 os.mkdir(dirname)
2908 filename = self.create_file("file.txt")
2909 if link:
2910 os.link(filename, os.path.join(self.path, "link_file.txt"))
2911 if symlink:
2912 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2913 target_is_directory=True)
2914 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2915
2916 names = ['dir', 'file.txt']
2917 if link:
2918 names.append('link_file.txt')
2919 if symlink:
2920 names.extend(('symlink_dir', 'symlink_file.txt'))
2921 entries = self.get_entries(names)
2922
2923 entry = entries['dir']
2924 self.check_entry(entry, 'dir', True, False, False)
2925
2926 entry = entries['file.txt']
2927 self.check_entry(entry, 'file.txt', False, True, False)
2928
2929 if link:
2930 entry = entries['link_file.txt']
2931 self.check_entry(entry, 'link_file.txt', False, True, False)
2932
2933 if symlink:
2934 entry = entries['symlink_dir']
2935 self.check_entry(entry, 'symlink_dir', True, False, True)
2936
2937 entry = entries['symlink_file.txt']
2938 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2939
2940 def get_entry(self, name):
2941 entries = list(os.scandir(self.path))
2942 self.assertEqual(len(entries), 1)
2943
2944 entry = entries[0]
2945 self.assertEqual(entry.name, name)
2946 return entry
2947
2948 def create_file_entry(self):
2949 filename = self.create_file()
2950 return self.get_entry(os.path.basename(filename))
2951
2952 def test_current_directory(self):
2953 filename = self.create_file()
2954 old_dir = os.getcwd()
2955 try:
2956 os.chdir(self.path)
2957
2958 # call scandir() without parameter: it must list the content
2959 # of the current directory
2960 entries = dict((entry.name, entry) for entry in os.scandir())
2961 self.assertEqual(sorted(entries.keys()),
2962 [os.path.basename(filename)])
2963 finally:
2964 os.chdir(old_dir)
2965
2966 def test_repr(self):
2967 entry = self.create_file_entry()
2968 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2969
2970 def test_removed_dir(self):
2971 path = os.path.join(self.path, 'dir')
2972
2973 os.mkdir(path)
2974 entry = self.get_entry('dir')
2975 os.rmdir(path)
2976
2977 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2978 if os.name == 'nt':
2979 self.assertTrue(entry.is_dir())
2980 self.assertFalse(entry.is_file())
2981 self.assertFalse(entry.is_symlink())
2982 if os.name == 'nt':
2983 self.assertRaises(FileNotFoundError, entry.inode)
2984 # don't fail
2985 entry.stat()
2986 entry.stat(follow_symlinks=False)
2987 else:
2988 self.assertGreater(entry.inode(), 0)
2989 self.assertRaises(FileNotFoundError, entry.stat)
2990 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2991
2992 def test_removed_file(self):
2993 entry = self.create_file_entry()
2994 os.unlink(entry.path)
2995
2996 self.assertFalse(entry.is_dir())
2997 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2998 if os.name == 'nt':
2999 self.assertTrue(entry.is_file())
3000 self.assertFalse(entry.is_symlink())
3001 if os.name == 'nt':
3002 self.assertRaises(FileNotFoundError, entry.inode)
3003 # don't fail
3004 entry.stat()
3005 entry.stat(follow_symlinks=False)
3006 else:
3007 self.assertGreater(entry.inode(), 0)
3008 self.assertRaises(FileNotFoundError, entry.stat)
3009 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3010
3011 def test_broken_symlink(self):
3012 if not support.can_symlink():
3013 return self.skipTest('cannot create symbolic link')
3014
3015 filename = self.create_file("file.txt")
3016 os.symlink(filename,
3017 os.path.join(self.path, "symlink.txt"))
3018 entries = self.get_entries(['file.txt', 'symlink.txt'])
3019 entry = entries['symlink.txt']
3020 os.unlink(filename)
3021
3022 self.assertGreater(entry.inode(), 0)
3023 self.assertFalse(entry.is_dir())
3024 self.assertFalse(entry.is_file()) # broken symlink returns False
3025 self.assertFalse(entry.is_dir(follow_symlinks=False))
3026 self.assertFalse(entry.is_file(follow_symlinks=False))
3027 self.assertTrue(entry.is_symlink())
3028 self.assertRaises(FileNotFoundError, entry.stat)
3029 # don't fail
3030 entry.stat(follow_symlinks=False)
3031
3032 def test_bytes(self):
3033 if os.name == "nt":
3034 # On Windows, os.scandir(bytes) must raise an exception
3035 self.assertRaises(TypeError, os.scandir, b'.')
3036 return
3037
3038 self.create_file("file.txt")
3039
3040 path_bytes = os.fsencode(self.path)
3041 entries = list(os.scandir(path_bytes))
3042 self.assertEqual(len(entries), 1, entries)
3043 entry = entries[0]
3044
3045 self.assertEqual(entry.name, b'file.txt')
3046 self.assertEqual(entry.path,
3047 os.fsencode(os.path.join(self.path, 'file.txt')))
3048
3049 def test_empty_path(self):
3050 self.assertRaises(FileNotFoundError, os.scandir, '')
3051
3052 def test_consume_iterator_twice(self):
3053 self.create_file("file.txt")
3054 iterator = os.scandir(self.path)
3055
3056 entries = list(iterator)
3057 self.assertEqual(len(entries), 1, entries)
3058
3059 # check than consuming the iterator twice doesn't raise exception
3060 entries2 = list(iterator)
3061 self.assertEqual(len(entries2), 0, entries2)
3062
3063 def test_bad_path_type(self):
3064 for obj in [1234, 1.234, {}, []]:
3065 self.assertRaises(TypeError, os.scandir, obj)
3066
3067
Fred Drake2e2be372001-09-20 21:33:42 +00003068if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003069 unittest.main()