blob: 07682f2bf9acdcbfcd7670cb4e5fe5461fb537ff [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner47aacc82015-06-12 17:26:23 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200225
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226# Test attributes on return values from os.*stat* family.
227class StatAttributeTests(unittest.TestCase):
228 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200229 self.fname = support.TESTFN
230 self.addCleanup(support.unlink, self.fname)
231 with open(self.fname, 'wb') as fp:
232 fp.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233
Serhiy Storchaka43767632013-11-03 21:31:38 +0200234 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000235 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000236 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237
238 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(result[stat.ST_SIZE], 3)
240 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 # Make sure all the attributes are there
243 members = dir(result)
244 for name in dir(stat):
245 if name[:3] == 'ST_':
246 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000247 if name.endswith("TIME"):
248 def trunc(x): return int(x)
249 else:
250 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
Larry Hastings6fe20b32012-04-19 15:07:49 -0700255 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700256 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700257 for name in 'st_atime st_mtime st_ctime'.split():
258 floaty = int(getattr(result, name) * 100000)
259 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700260 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700261
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000262 try:
263 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200264 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000265 except IndexError:
266 pass
267
268 # Make sure that assignment fails
269 try:
270 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000272 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 pass
274
275 try:
276 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200277 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000278 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000279 pass
280
281 try:
282 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200283 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 except AttributeError:
285 pass
286
287 # Use the stat_result constructor with a too-short tuple.
288 try:
289 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except TypeError:
292 pass
293
Ezio Melotti42da6632011-03-15 05:18:48 +0200294 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 try:
296 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
297 except TypeError:
298 pass
299
Antoine Pitrou38425292010-09-21 18:19:07 +0000300 def test_stat_attributes(self):
301 self.check_stat_attributes(self.fname)
302
303 def test_stat_attributes_bytes(self):
304 try:
305 fname = self.fname.encode(sys.getfilesystemencoding())
306 except UnicodeEncodeError:
307 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100308 with warnings.catch_warnings():
309 warnings.simplefilter("ignore", DeprecationWarning)
310 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000311
Christian Heimes25827622013-10-12 01:27:08 +0200312 def test_stat_result_pickle(self):
313 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200314 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
315 p = pickle.dumps(result, proto)
316 self.assertIn(b'stat_result', p)
317 if proto < 4:
318 self.assertIn(b'cos\nstat_result\n', p)
319 unpickled = pickle.loads(p)
320 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200321
Serhiy Storchaka43767632013-11-03 21:31:38 +0200322 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000323 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000324 try:
325 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000326 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000327 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200329 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
340 # Make sure that assignment really fails
341 try:
342 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200343 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000344 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 pass
346
347 try:
348 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200349 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 except AttributeError:
351 pass
352
353 # Use the constructor with a too-short tuple.
354 try:
355 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200356 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000357 except TypeError:
358 pass
359
Ezio Melotti42da6632011-03-15 05:18:48 +0200360 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 try:
362 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
363 except TypeError:
364 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000365
Christian Heimes25827622013-10-12 01:27:08 +0200366 @unittest.skipUnless(hasattr(os, 'statvfs'),
367 "need os.statvfs()")
368 def test_statvfs_result_pickle(self):
369 try:
370 result = os.statvfs(self.fname)
371 except OSError as e:
372 # On AtheOS, glibc always returns ENOSYS
373 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200374 self.skipTest('os.statvfs() failed with ENOSYS')
375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Victor Stinner47aacc82015-06-12 17:26:23 +0200434
435class UtimeTests(unittest.TestCase):
436 def setUp(self):
437 self.dirname = support.TESTFN
438 self.fname = os.path.join(self.dirname, "f1")
439
440 self.addCleanup(support.rmtree, self.dirname)
441 os.mkdir(self.dirname)
442 with open(self.fname, 'wb') as fp:
443 fp.write(b"ABC")
444
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200445 def restore_float_times(state):
446 with warnings.catch_warnings():
447 warnings.simplefilter("ignore", DeprecationWarning)
448
449 os.stat_float_times(state)
450
Victor Stinner47aacc82015-06-12 17:26:23 +0200451 # ensure that st_atime and st_mtime are float
452 with warnings.catch_warnings():
453 warnings.simplefilter("ignore", DeprecationWarning)
454
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200455 old_float_times = os.stat_float_times(-1)
456 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200457
458 os.stat_float_times(True)
459
460 def support_subsecond(self, filename):
461 # Heuristic to check if the filesystem supports timestamp with
462 # subsecond resolution: check if float and int timestamps are different
463 st = os.stat(filename)
464 return ((st.st_atime != st[7])
465 or (st.st_mtime != st[8])
466 or (st.st_ctime != st[9]))
467
468 def _test_utime(self, set_time, filename=None):
469 if not filename:
470 filename = self.fname
471
472 support_subsecond = self.support_subsecond(filename)
473 if support_subsecond:
474 # Timestamp with a resolution of 1 microsecond (10^-6).
475 #
476 # The resolution of the C internal function used by os.utime()
477 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
478 # test with a resolution of 1 ns requires more work:
479 # see the issue #15745.
480 atime_ns = 1002003000 # 1.002003 seconds
481 mtime_ns = 4005006000 # 4.005006 seconds
482 else:
483 # use a resolution of 1 second
484 atime_ns = 5 * 10**9
485 mtime_ns = 8 * 10**9
486
487 set_time(filename, (atime_ns, mtime_ns))
488 st = os.stat(filename)
489
490 if support_subsecond:
491 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
492 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
493 else:
494 self.assertEqual(st.st_atime, atime_ns * 1e-9)
495 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
496 self.assertEqual(st.st_atime_ns, atime_ns)
497 self.assertEqual(st.st_mtime_ns, mtime_ns)
498
499 def test_utime(self):
500 def set_time(filename, ns):
501 # test the ns keyword parameter
502 os.utime(filename, ns=ns)
503 self._test_utime(set_time)
504
505 @staticmethod
506 def ns_to_sec(ns):
507 # Convert a number of nanosecond (int) to a number of seconds (float).
508 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
509 # issue, os.utime() rounds towards minus infinity.
510 return (ns * 1e-9) + 0.5e-9
511
512 def test_utime_by_indexed(self):
513 # pass times as floating point seconds as the second indexed parameter
514 def set_time(filename, ns):
515 atime_ns, mtime_ns = ns
516 atime = self.ns_to_sec(atime_ns)
517 mtime = self.ns_to_sec(mtime_ns)
518 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
519 # or utime(time_t)
520 os.utime(filename, (atime, mtime))
521 self._test_utime(set_time)
522
523 def test_utime_by_times(self):
524 def set_time(filename, ns):
525 atime_ns, mtime_ns = ns
526 atime = self.ns_to_sec(atime_ns)
527 mtime = self.ns_to_sec(mtime_ns)
528 # test the times keyword parameter
529 os.utime(filename, times=(atime, mtime))
530 self._test_utime(set_time)
531
532 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
533 "follow_symlinks support for utime required "
534 "for this test.")
535 def test_utime_nofollow_symlinks(self):
536 def set_time(filename, ns):
537 # use follow_symlinks=False to test utimensat(timespec)
538 # or lutimes(timeval)
539 os.utime(filename, ns=ns, follow_symlinks=False)
540 self._test_utime(set_time)
541
542 @unittest.skipUnless(os.utime in os.supports_fd,
543 "fd support for utime required for this test.")
544 def test_utime_fd(self):
545 def set_time(filename, ns):
546 with open(filename, 'wb') as fp:
547 # use a file descriptor to test futimens(timespec)
548 # or futimes(timeval)
549 os.utime(fp.fileno(), ns=ns)
550 self._test_utime(set_time)
551
552 @unittest.skipUnless(os.utime in os.supports_dir_fd,
553 "dir_fd support for utime required for this test.")
554 def test_utime_dir_fd(self):
555 def set_time(filename, ns):
556 dirname, name = os.path.split(filename)
557 dirfd = os.open(dirname, os.O_RDONLY)
558 try:
559 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
560 os.utime(name, dir_fd=dirfd, ns=ns)
561 finally:
562 os.close(dirfd)
563 self._test_utime(set_time)
564
565 def test_utime_directory(self):
566 def set_time(filename, ns):
567 # test calling os.utime() on a directory
568 os.utime(filename, ns=ns)
569 self._test_utime(set_time, filename=self.dirname)
570
571 def _test_utime_current(self, set_time):
572 # Get the system clock
573 current = time.time()
574
575 # Call os.utime() to set the timestamp to the current system clock
576 set_time(self.fname)
577
578 if not self.support_subsecond(self.fname):
579 delta = 1.0
580 else:
581 # On Windows, the usual resolution of time.time() is 15.6 ms
582 delta = 0.020
583 st = os.stat(self.fname)
584 msg = ("st_time=%r, current=%r, dt=%r"
585 % (st.st_mtime, current, st.st_mtime - current))
586 self.assertAlmostEqual(st.st_mtime, current,
587 delta=delta, msg=msg)
588
589 def test_utime_current(self):
590 def set_time(filename):
591 # Set to the current time in the new way
592 os.utime(self.fname)
593 self._test_utime_current(set_time)
594
595 def test_utime_current_old(self):
596 def set_time(filename):
597 # Set to the current time in the old explicit way.
598 os.utime(self.fname, None)
599 self._test_utime_current(set_time)
600
601 def get_file_system(self, path):
602 if sys.platform == 'win32':
603 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
604 import ctypes
605 kernel32 = ctypes.windll.kernel32
606 buf = ctypes.create_unicode_buffer("", 100)
607 ok = kernel32.GetVolumeInformationW(root, None, 0,
608 None, None, None,
609 buf, len(buf))
610 if ok:
611 return buf.value
612 # return None if the filesystem is unknown
613
614 def test_large_time(self):
615 # Many filesystems are limited to the year 2038. At least, the test
616 # pass with NTFS filesystem.
617 if self.get_file_system(self.dirname) != "NTFS":
618 self.skipTest("requires NTFS")
619
620 large = 5000000000 # some day in 2128
621 os.utime(self.fname, (large, large))
622 self.assertEqual(os.stat(self.fname).st_mtime, large)
623
624 def test_utime_invalid_arguments(self):
625 # seconds and nanoseconds parameters are mutually exclusive
626 with self.assertRaises(ValueError):
627 os.utime(self.fname, (5, 5), ns=(5, 5))
628
629
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000630from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000631
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000632class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000633 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000634 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000635
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000636 def setUp(self):
637 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000638 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000639 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000640 for key, value in self._reference().items():
641 os.environ[key] = value
642
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000643 def tearDown(self):
644 os.environ.clear()
645 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000646 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000647 os.environb.clear()
648 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000649
Christian Heimes90333392007-11-01 19:08:42 +0000650 def _reference(self):
651 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
652
653 def _empty_mapping(self):
654 os.environ.clear()
655 return os.environ
656
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000657 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300658 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000659 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000660 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300661 os.environ.update(HELLO="World")
662 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
663 value = popen.read().strip()
664 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000665
Ezio Melottic7e139b2012-09-26 20:01:34 +0300666 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000667 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300668 with os.popen(
669 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
670 it = iter(popen)
671 self.assertEqual(next(it), "line1\n")
672 self.assertEqual(next(it), "line2\n")
673 self.assertEqual(next(it), "line3\n")
674 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000675
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000676 # Verify environ keys and values from the OS are of the
677 # correct str type.
678 def test_keyvalue_types(self):
679 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000680 self.assertEqual(type(key), str)
681 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000682
Christian Heimes90333392007-11-01 19:08:42 +0000683 def test_items(self):
684 for key, value in self._reference().items():
685 self.assertEqual(os.environ.get(key), value)
686
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000687 # Issue 7310
688 def test___repr__(self):
689 """Check that the repr() of os.environ looks like environ({...})."""
690 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000691 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
692 '{!r}: {!r}'.format(key, value)
693 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000694
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000695 def test_get_exec_path(self):
696 defpath_list = os.defpath.split(os.pathsep)
697 test_path = ['/monty', '/python', '', '/flying/circus']
698 test_env = {'PATH': os.pathsep.join(test_path)}
699
700 saved_environ = os.environ
701 try:
702 os.environ = dict(test_env)
703 # Test that defaulting to os.environ works.
704 self.assertSequenceEqual(test_path, os.get_exec_path())
705 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
706 finally:
707 os.environ = saved_environ
708
709 # No PATH environment variable
710 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
711 # Empty PATH environment variable
712 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
713 # Supplied PATH environment variable
714 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
715
Victor Stinnerb745a742010-05-18 17:17:23 +0000716 if os.supports_bytes_environ:
717 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000718 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000719 # ignore BytesWarning warning
720 with warnings.catch_warnings(record=True):
721 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000722 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000723 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000724 pass
725 else:
726 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000727
728 # bytes key and/or value
729 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
730 ['abc'])
731 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
732 ['abc'])
733 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
734 ['abc'])
735
736 @unittest.skipUnless(os.supports_bytes_environ,
737 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000738 def test_environb(self):
739 # os.environ -> os.environb
740 value = 'euro\u20ac'
741 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000742 value_bytes = value.encode(sys.getfilesystemencoding(),
743 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000744 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000745 msg = "U+20AC character is not encodable to %s" % (
746 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000747 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000748 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000749 self.assertEqual(os.environ['unicode'], value)
750 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000751
752 # os.environb -> os.environ
753 value = b'\xff'
754 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000755 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000756 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000757 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000758
Charles-François Natali2966f102011-11-26 11:32:46 +0100759 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
760 # #13415).
761 @support.requires_freebsd_version(7)
762 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100763 def test_unset_error(self):
764 if sys.platform == "win32":
765 # an environment variable is limited to 32,767 characters
766 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100767 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100768 else:
769 # "=" is not allowed in a variable name
770 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100771 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100772
Victor Stinner6d101392013-04-14 16:35:04 +0200773 def test_key_type(self):
774 missing = 'missingkey'
775 self.assertNotIn(missing, os.environ)
776
Victor Stinner839e5ea2013-04-14 16:43:03 +0200777 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200778 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200779 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200780 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200781
Victor Stinner839e5ea2013-04-14 16:43:03 +0200782 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200783 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200784 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200785 self.assertTrue(cm.exception.__suppress_context__)
786
Victor Stinner6d101392013-04-14 16:35:04 +0200787
Tim Petersc4e09402003-04-25 07:11:48 +0000788class WalkTests(unittest.TestCase):
789 """Tests for os.walk()."""
790
Victor Stinner0561c532015-03-12 10:28:24 +0100791 # Wrapper to hide minor differences between os.walk and os.fwalk
792 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200793 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200794 if 'follow_symlinks' in kwargs:
795 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200796 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100797
Charles-François Natali7372b062012-02-05 15:15:38 +0100798 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100799 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000800
801 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000802 # TESTFN/
803 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000804 # tmp1
805 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000806 # tmp2
807 # SUB11/ no kids
808 # SUB2/ a file kid and a dirsymlink kid
809 # tmp3
810 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200811 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000812 # TEST2/
813 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100814 self.walk_path = join(support.TESTFN, "TEST1")
815 self.sub1_path = join(self.walk_path, "SUB1")
816 self.sub11_path = join(self.sub1_path, "SUB11")
817 sub2_path = join(self.walk_path, "SUB2")
818 tmp1_path = join(self.walk_path, "tmp1")
819 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000820 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100821 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000822 t2_path = join(support.TESTFN, "TEST2")
823 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200824 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000825
826 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100827 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000828 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000829 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100830
Guido van Rossumd8faa362007-04-27 19:54:29 +0000831 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000832 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000833 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
834 f.close()
835
Victor Stinner0561c532015-03-12 10:28:24 +0100836 if support.can_symlink():
837 os.symlink(os.path.abspath(t2_path), self.link_path)
838 os.symlink('broken', broken_link_path, True)
839 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
840 else:
841 self.sub2_tree = (sub2_path, [], ["tmp3"])
842
843 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000844 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100845 all = list(os.walk(self.walk_path))
846
Tim Petersc4e09402003-04-25 07:11:48 +0000847 self.assertEqual(len(all), 4)
848 # We can't know which order SUB1 and SUB2 will appear in.
849 # Not flipped: TESTFN, SUB1, SUB11, SUB2
850 # flipped: TESTFN, SUB2, SUB1, SUB11
851 flipped = all[0][1][0] != "SUB1"
852 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200853 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100854 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
855 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
856 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
857 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000858
Victor Stinner0561c532015-03-12 10:28:24 +0100859 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000860 # Prune the search.
861 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100862 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000863 all.append((root, dirs, files))
864 # Don't descend into SUB1.
865 if 'SUB1' in dirs:
866 # Note that this also mutates the dirs we appended to all!
867 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000868
Victor Stinner0561c532015-03-12 10:28:24 +0100869 self.assertEqual(len(all), 2)
870 self.assertEqual(all[0],
871 (self.walk_path, ["SUB2"], ["tmp1"]))
872
873 all[1][-1].sort()
874 self.assertEqual(all[1], self.sub2_tree)
875
876 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000877 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100878 all = list(self.walk(self.walk_path, topdown=False))
879
Tim Petersc4e09402003-04-25 07:11:48 +0000880 self.assertEqual(len(all), 4)
881 # We can't know which order SUB1 and SUB2 will appear in.
882 # Not flipped: SUB11, SUB1, SUB2, TESTFN
883 # flipped: SUB2, SUB11, SUB1, TESTFN
884 flipped = all[3][1][0] != "SUB1"
885 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200886 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100887 self.assertEqual(all[3],
888 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
889 self.assertEqual(all[flipped],
890 (self.sub11_path, [], []))
891 self.assertEqual(all[flipped + 1],
892 (self.sub1_path, ["SUB11"], ["tmp2"]))
893 self.assertEqual(all[2 - 2 * flipped],
894 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000895
Victor Stinner0561c532015-03-12 10:28:24 +0100896 def test_walk_symlink(self):
897 if not support.can_symlink():
898 self.skipTest("need symlink support")
899
900 # Walk, following symlinks.
901 walk_it = self.walk(self.walk_path, follow_symlinks=True)
902 for root, dirs, files in walk_it:
903 if root == self.link_path:
904 self.assertEqual(dirs, [])
905 self.assertEqual(files, ["tmp4"])
906 break
907 else:
908 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000909
910 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000911 # Tear everything down. This is a decent use for bottom-up on
912 # Windows, which doesn't have a recursive delete command. The
913 # (not so) subtlety is that rmdir will fail unless the dir's
914 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000915 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000916 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000917 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000918 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000919 dirname = os.path.join(root, name)
920 if not os.path.islink(dirname):
921 os.rmdir(dirname)
922 else:
923 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000924 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000925
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200926 def test_walk_bad_dir(self):
927 # Walk top-down.
928 errors = []
929 walk_it = self.walk(self.walk_path, onerror=errors.append)
930 root, dirs, files = next(walk_it)
931 self.assertFalse(errors)
932 dir1 = dirs[0]
933 dir1new = dir1 + '.new'
934 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
935 roots = [r for r, d, f in walk_it]
936 self.assertTrue(errors)
937 self.assertNotIn(os.path.join(root, dir1), roots)
938 self.assertNotIn(os.path.join(root, dir1new), roots)
939 for dir2 in dirs[1:]:
940 self.assertIn(os.path.join(root, dir2), roots)
941
Charles-François Natali7372b062012-02-05 15:15:38 +0100942
943@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
944class FwalkTests(WalkTests):
945 """Tests for os.fwalk()."""
946
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200947 def walk(self, top, **kwargs):
948 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100949 yield (root, dirs, files)
950
Larry Hastingsc48fe982012-06-25 04:49:05 -0700951 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
952 """
953 compare with walk() results.
954 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700955 walk_kwargs = walk_kwargs.copy()
956 fwalk_kwargs = fwalk_kwargs.copy()
957 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
958 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
959 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700960
Charles-François Natali7372b062012-02-05 15:15:38 +0100961 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700962 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100963 expected[root] = (set(dirs), set(files))
964
Larry Hastingsc48fe982012-06-25 04:49:05 -0700965 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100966 self.assertIn(root, expected)
967 self.assertEqual(expected[root], (set(dirs), set(files)))
968
Larry Hastingsc48fe982012-06-25 04:49:05 -0700969 def test_compare_to_walk(self):
970 kwargs = {'top': support.TESTFN}
971 self._compare_to_walk(kwargs, kwargs)
972
Charles-François Natali7372b062012-02-05 15:15:38 +0100973 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700974 try:
975 fd = os.open(".", os.O_RDONLY)
976 walk_kwargs = {'top': support.TESTFN}
977 fwalk_kwargs = walk_kwargs.copy()
978 fwalk_kwargs['dir_fd'] = fd
979 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
980 finally:
981 os.close(fd)
982
983 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100984 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700985 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
986 args = support.TESTFN, topdown, None
987 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100988 # check that the FD is valid
989 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700990 # redundant check
991 os.stat(rootfd)
992 # check that listdir() returns consistent information
993 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100994
995 def test_fd_leak(self):
996 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
997 # we both check that calling fwalk() a large number of times doesn't
998 # yield EMFILE, and that the minimum allocated FD hasn't changed.
999 minfd = os.dup(1)
1000 os.close(minfd)
1001 for i in range(256):
1002 for x in os.fwalk(support.TESTFN):
1003 pass
1004 newfd = os.dup(1)
1005 self.addCleanup(os.close, newfd)
1006 self.assertEqual(newfd, minfd)
1007
1008 def tearDown(self):
1009 # cleanup
1010 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1011 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001012 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001013 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001014 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001015 if stat.S_ISDIR(st.st_mode):
1016 os.rmdir(name, dir_fd=rootfd)
1017 else:
1018 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001019 os.rmdir(support.TESTFN)
1020
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001021class BytesWalkTests(WalkTests):
1022 """Tests for os.walk() with bytes."""
1023 def walk(self, top, **kwargs):
1024 if 'follow_symlinks' in kwargs:
1025 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1026 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1027 root = os.fsdecode(broot)
1028 dirs = list(map(os.fsdecode, bdirs))
1029 files = list(map(os.fsdecode, bfiles))
1030 yield (root, dirs, files)
1031 bdirs[:] = list(map(os.fsencode, dirs))
1032 bfiles[:] = list(map(os.fsencode, files))
1033
Charles-François Natali7372b062012-02-05 15:15:38 +01001034
Guido van Rossume7ba4952007-06-06 23:52:48 +00001035class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001036 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001037 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001038
1039 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001040 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001041 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1042 os.makedirs(path) # Should work
1043 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1044 os.makedirs(path)
1045
1046 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001047 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001048 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1049 os.makedirs(path)
1050 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1051 'dir5', 'dir6')
1052 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001053
Terry Reedy5a22b652010-12-02 07:05:56 +00001054 def test_exist_ok_existing_directory(self):
1055 path = os.path.join(support.TESTFN, 'dir1')
1056 mode = 0o777
1057 old_mask = os.umask(0o022)
1058 os.makedirs(path, mode)
1059 self.assertRaises(OSError, os.makedirs, path, mode)
1060 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001061 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001062 os.makedirs(path, mode=mode, exist_ok=True)
1063 os.umask(old_mask)
1064
Martin Pantera82642f2015-11-19 04:48:44 +00001065 # Issue #25583: A drive root could raise PermissionError on Windows
1066 os.makedirs(os.path.abspath('/'), exist_ok=True)
1067
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001068 def test_exist_ok_s_isgid_directory(self):
1069 path = os.path.join(support.TESTFN, 'dir1')
1070 S_ISGID = stat.S_ISGID
1071 mode = 0o777
1072 old_mask = os.umask(0o022)
1073 try:
1074 existing_testfn_mode = stat.S_IMODE(
1075 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001076 try:
1077 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001078 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001079 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001080 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1081 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1082 # The os should apply S_ISGID from the parent dir for us, but
1083 # this test need not depend on that behavior. Be explicit.
1084 os.makedirs(path, mode | S_ISGID)
1085 # http://bugs.python.org/issue14992
1086 # Should not fail when the bit is already set.
1087 os.makedirs(path, mode, exist_ok=True)
1088 # remove the bit.
1089 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001090 # May work even when the bit is not already set when demanded.
1091 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001092 finally:
1093 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001094
1095 def test_exist_ok_existing_regular_file(self):
1096 base = support.TESTFN
1097 path = os.path.join(support.TESTFN, 'dir1')
1098 f = open(path, 'w')
1099 f.write('abc')
1100 f.close()
1101 self.assertRaises(OSError, os.makedirs, path)
1102 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1103 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1104 os.remove(path)
1105
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001106 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001107 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001108 'dir4', 'dir5', 'dir6')
1109 # If the tests failed, the bottom-most directory ('../dir6')
1110 # may not have been created, so we look for the outermost directory
1111 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001112 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001113 path = os.path.dirname(path)
1114
1115 os.removedirs(path)
1116
Andrew Svetlov405faed2012-12-25 12:18:09 +02001117
R David Murrayf2ad1732014-12-25 18:36:56 -05001118@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1119class ChownFileTests(unittest.TestCase):
1120
Berker Peksag036a71b2015-07-21 09:29:48 +03001121 @classmethod
1122 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001123 os.mkdir(support.TESTFN)
1124
1125 def test_chown_uid_gid_arguments_must_be_index(self):
1126 stat = os.stat(support.TESTFN)
1127 uid = stat.st_uid
1128 gid = stat.st_gid
1129 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1130 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1131 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1132 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1133 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1134
1135 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1136 def test_chown(self):
1137 gid_1, gid_2 = groups[:2]
1138 uid = os.stat(support.TESTFN).st_uid
1139 os.chown(support.TESTFN, uid, gid_1)
1140 gid = os.stat(support.TESTFN).st_gid
1141 self.assertEqual(gid, gid_1)
1142 os.chown(support.TESTFN, uid, gid_2)
1143 gid = os.stat(support.TESTFN).st_gid
1144 self.assertEqual(gid, gid_2)
1145
1146 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1147 "test needs root privilege and more than one user")
1148 def test_chown_with_root(self):
1149 uid_1, uid_2 = all_users[:2]
1150 gid = os.stat(support.TESTFN).st_gid
1151 os.chown(support.TESTFN, uid_1, gid)
1152 uid = os.stat(support.TESTFN).st_uid
1153 self.assertEqual(uid, uid_1)
1154 os.chown(support.TESTFN, uid_2, gid)
1155 uid = os.stat(support.TESTFN).st_uid
1156 self.assertEqual(uid, uid_2)
1157
1158 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1159 "test needs non-root account and more than one user")
1160 def test_chown_without_permission(self):
1161 uid_1, uid_2 = all_users[:2]
1162 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001163 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001164 os.chown(support.TESTFN, uid_1, gid)
1165 os.chown(support.TESTFN, uid_2, gid)
1166
Berker Peksag036a71b2015-07-21 09:29:48 +03001167 @classmethod
1168 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001169 os.rmdir(support.TESTFN)
1170
1171
Andrew Svetlov405faed2012-12-25 12:18:09 +02001172class RemoveDirsTests(unittest.TestCase):
1173 def setUp(self):
1174 os.makedirs(support.TESTFN)
1175
1176 def tearDown(self):
1177 support.rmtree(support.TESTFN)
1178
1179 def test_remove_all(self):
1180 dira = os.path.join(support.TESTFN, 'dira')
1181 os.mkdir(dira)
1182 dirb = os.path.join(dira, 'dirb')
1183 os.mkdir(dirb)
1184 os.removedirs(dirb)
1185 self.assertFalse(os.path.exists(dirb))
1186 self.assertFalse(os.path.exists(dira))
1187 self.assertFalse(os.path.exists(support.TESTFN))
1188
1189 def test_remove_partial(self):
1190 dira = os.path.join(support.TESTFN, 'dira')
1191 os.mkdir(dira)
1192 dirb = os.path.join(dira, 'dirb')
1193 os.mkdir(dirb)
1194 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1195 f.write('text')
1196 os.removedirs(dirb)
1197 self.assertFalse(os.path.exists(dirb))
1198 self.assertTrue(os.path.exists(dira))
1199 self.assertTrue(os.path.exists(support.TESTFN))
1200
1201 def test_remove_nothing(self):
1202 dira = os.path.join(support.TESTFN, 'dira')
1203 os.mkdir(dira)
1204 dirb = os.path.join(dira, 'dirb')
1205 os.mkdir(dirb)
1206 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1207 f.write('text')
1208 with self.assertRaises(OSError):
1209 os.removedirs(dirb)
1210 self.assertTrue(os.path.exists(dirb))
1211 self.assertTrue(os.path.exists(dira))
1212 self.assertTrue(os.path.exists(support.TESTFN))
1213
1214
Guido van Rossume7ba4952007-06-06 23:52:48 +00001215class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001216 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001217 with open(os.devnull, 'wb') as f:
1218 f.write(b'hello')
1219 f.close()
1220 with open(os.devnull, 'rb') as f:
1221 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001222
Andrew Svetlov405faed2012-12-25 12:18:09 +02001223
Guido van Rossume7ba4952007-06-06 23:52:48 +00001224class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001225 def test_urandom_length(self):
1226 self.assertEqual(len(os.urandom(0)), 0)
1227 self.assertEqual(len(os.urandom(1)), 1)
1228 self.assertEqual(len(os.urandom(10)), 10)
1229 self.assertEqual(len(os.urandom(100)), 100)
1230 self.assertEqual(len(os.urandom(1000)), 1000)
1231
1232 def test_urandom_value(self):
1233 data1 = os.urandom(16)
1234 data2 = os.urandom(16)
1235 self.assertNotEqual(data1, data2)
1236
1237 def get_urandom_subprocess(self, count):
1238 code = '\n'.join((
1239 'import os, sys',
1240 'data = os.urandom(%s)' % count,
1241 'sys.stdout.buffer.write(data)',
1242 'sys.stdout.buffer.flush()'))
1243 out = assert_python_ok('-c', code)
1244 stdout = out[1]
1245 self.assertEqual(len(stdout), 16)
1246 return stdout
1247
1248 def test_urandom_subprocess(self):
1249 data1 = self.get_urandom_subprocess(16)
1250 data2 = self.get_urandom_subprocess(16)
1251 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001252
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001253
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001254# os.urandom() doesn't use a file descriptor when it is implemented with the
1255# getentropy() function, the getrandom() function or the getrandom() syscall
1256OS_URANDOM_DONT_USE_FD = (
1257 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1258 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1259 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001260
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001261@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1262 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001263class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001264 @unittest.skipUnless(resource, "test requires the resource module")
1265 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001266 # Check urandom() failing when it is not able to open /dev/random.
1267 # We spawn a new process to make the test more robust (if getrlimit()
1268 # failed to restore the file descriptor limit after this, the whole
1269 # test suite would crash; this actually happened on the OS X Tiger
1270 # buildbot).
1271 code = """if 1:
1272 import errno
1273 import os
1274 import resource
1275
1276 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1277 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1278 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001279 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001280 except OSError as e:
1281 assert e.errno == errno.EMFILE, e.errno
1282 else:
1283 raise AssertionError("OSError not raised")
1284 """
1285 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001286
Antoine Pitroue472aea2014-04-26 14:33:03 +02001287 def test_urandom_fd_closed(self):
1288 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1289 # closed.
1290 code = """if 1:
1291 import os
1292 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001293 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001294 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001295 with test.support.SuppressCrashReport():
1296 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001297 sys.stdout.buffer.write(os.urandom(4))
1298 """
1299 rc, out, err = assert_python_ok('-Sc', code)
1300
1301 def test_urandom_fd_reopened(self):
1302 # Issue #21207: urandom() should detect its fd to /dev/urandom
1303 # changed to something else, and reopen it.
1304 with open(support.TESTFN, 'wb') as f:
1305 f.write(b"x" * 256)
1306 self.addCleanup(os.unlink, support.TESTFN)
1307 code = """if 1:
1308 import os
1309 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001310 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001311 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001312 with test.support.SuppressCrashReport():
1313 for fd in range(3, 256):
1314 try:
1315 os.close(fd)
1316 except OSError:
1317 pass
1318 else:
1319 # Found the urandom fd (XXX hopefully)
1320 break
1321 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001322 with open({TESTFN!r}, 'rb') as f:
1323 os.dup2(f.fileno(), fd)
1324 sys.stdout.buffer.write(os.urandom(4))
1325 sys.stdout.buffer.write(os.urandom(4))
1326 """.format(TESTFN=support.TESTFN)
1327 rc, out, err = assert_python_ok('-Sc', code)
1328 self.assertEqual(len(out), 8)
1329 self.assertNotEqual(out[0:4], out[4:8])
1330 rc, out2, err2 = assert_python_ok('-Sc', code)
1331 self.assertEqual(len(out2), 8)
1332 self.assertNotEqual(out2, out)
1333
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001334
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001335@contextlib.contextmanager
1336def _execvpe_mockup(defpath=None):
1337 """
1338 Stubs out execv and execve functions when used as context manager.
1339 Records exec calls. The mock execv and execve functions always raise an
1340 exception as they would normally never return.
1341 """
1342 # A list of tuples containing (function name, first arg, args)
1343 # of calls to execv or execve that have been made.
1344 calls = []
1345
1346 def mock_execv(name, *args):
1347 calls.append(('execv', name, args))
1348 raise RuntimeError("execv called")
1349
1350 def mock_execve(name, *args):
1351 calls.append(('execve', name, args))
1352 raise OSError(errno.ENOTDIR, "execve called")
1353
1354 try:
1355 orig_execv = os.execv
1356 orig_execve = os.execve
1357 orig_defpath = os.defpath
1358 os.execv = mock_execv
1359 os.execve = mock_execve
1360 if defpath is not None:
1361 os.defpath = defpath
1362 yield calls
1363 finally:
1364 os.execv = orig_execv
1365 os.execve = orig_execve
1366 os.defpath = orig_defpath
1367
Guido van Rossume7ba4952007-06-06 23:52:48 +00001368class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001369 @unittest.skipIf(USING_LINUXTHREADS,
1370 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001371 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001372 self.assertRaises(OSError, os.execvpe, 'no such app-',
1373 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001374
Thomas Heller6790d602007-08-30 17:15:14 +00001375 def test_execvpe_with_bad_arglist(self):
1376 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1377
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001378 @unittest.skipUnless(hasattr(os, '_execvpe'),
1379 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001380 def _test_internal_execvpe(self, test_type):
1381 program_path = os.sep + 'absolutepath'
1382 if test_type is bytes:
1383 program = b'executable'
1384 fullpath = os.path.join(os.fsencode(program_path), program)
1385 native_fullpath = fullpath
1386 arguments = [b'progname', 'arg1', 'arg2']
1387 else:
1388 program = 'executable'
1389 arguments = ['progname', 'arg1', 'arg2']
1390 fullpath = os.path.join(program_path, program)
1391 if os.name != "nt":
1392 native_fullpath = os.fsencode(fullpath)
1393 else:
1394 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001395 env = {'spam': 'beans'}
1396
Victor Stinnerb745a742010-05-18 17:17:23 +00001397 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001398 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001399 self.assertRaises(RuntimeError,
1400 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001401 self.assertEqual(len(calls), 1)
1402 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1403
Victor Stinnerb745a742010-05-18 17:17:23 +00001404 # test os._execvpe() with a relative path:
1405 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001406 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001407 self.assertRaises(OSError,
1408 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001409 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001410 self.assertSequenceEqual(calls[0],
1411 ('execve', native_fullpath, (arguments, env)))
1412
1413 # test os._execvpe() with a relative path:
1414 # os.get_exec_path() reads the 'PATH' variable
1415 with _execvpe_mockup() as calls:
1416 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001417 if test_type is bytes:
1418 env_path[b'PATH'] = program_path
1419 else:
1420 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001421 self.assertRaises(OSError,
1422 os._execvpe, program, arguments, env=env_path)
1423 self.assertEqual(len(calls), 1)
1424 self.assertSequenceEqual(calls[0],
1425 ('execve', native_fullpath, (arguments, env_path)))
1426
1427 def test_internal_execvpe_str(self):
1428 self._test_internal_execvpe(str)
1429 if os.name != "nt":
1430 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001431
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001432
Serhiy Storchaka43767632013-11-03 21:31:38 +02001433@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001434class Win32ErrorTests(unittest.TestCase):
1435 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001436 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001437
1438 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001439 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001440
1441 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001442 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001443
1444 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001445 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001446 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001447 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001448 finally:
1449 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001450 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001451
1452 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001453 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001454
Thomas Wouters477c8d52006-05-27 19:21:47 +00001455 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001456 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001457
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001458class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001459 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001460 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1461 #singles.append("close")
1462 #We omit close because it doesn'r raise an exception on some platforms
1463 def get_single(f):
1464 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001465 if hasattr(os, f):
1466 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001467 return helper
1468 for f in singles:
1469 locals()["test_"+f] = get_single(f)
1470
Benjamin Peterson7522c742009-01-19 21:00:09 +00001471 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001472 try:
1473 f(support.make_bad_fd(), *args)
1474 except OSError as e:
1475 self.assertEqual(e.errno, errno.EBADF)
1476 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001477 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001478 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001479
Serhiy Storchaka43767632013-11-03 21:31:38 +02001480 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001481 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001482 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001483
Serhiy Storchaka43767632013-11-03 21:31:38 +02001484 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001485 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001486 fd = support.make_bad_fd()
1487 # Make sure none of the descriptors we are about to close are
1488 # currently valid (issue 6542).
1489 for i in range(10):
1490 try: os.fstat(fd+i)
1491 except OSError:
1492 pass
1493 else:
1494 break
1495 if i < 2:
1496 raise unittest.SkipTest(
1497 "Unable to acquire a range of invalid file descriptors")
1498 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001499
Serhiy Storchaka43767632013-11-03 21:31:38 +02001500 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001501 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001502 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001503
Serhiy Storchaka43767632013-11-03 21:31:38 +02001504 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001505 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001506 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001507
Serhiy Storchaka43767632013-11-03 21:31:38 +02001508 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001509 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001510 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001511
Serhiy Storchaka43767632013-11-03 21:31:38 +02001512 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001513 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 self.check(os.pathconf, "PC_NAME_MAX")
1515 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001516
Serhiy Storchaka43767632013-11-03 21:31:38 +02001517 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001518 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001519 self.check(os.truncate, 0)
1520 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001521
Serhiy Storchaka43767632013-11-03 21:31:38 +02001522 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001523 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001524 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001525
Serhiy Storchaka43767632013-11-03 21:31:38 +02001526 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001527 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001528 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001529
Victor Stinner57ddf782014-01-08 15:21:28 +01001530 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1531 def test_readv(self):
1532 buf = bytearray(10)
1533 self.check(os.readv, [buf])
1534
Serhiy Storchaka43767632013-11-03 21:31:38 +02001535 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001536 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001537 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001538
Serhiy Storchaka43767632013-11-03 21:31:38 +02001539 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001540 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001541 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001542
Victor Stinner57ddf782014-01-08 15:21:28 +01001543 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1544 def test_writev(self):
1545 self.check(os.writev, [b'abc'])
1546
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001547 def test_inheritable(self):
1548 self.check(os.get_inheritable)
1549 self.check(os.set_inheritable, True)
1550
1551 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1552 'needs os.get_blocking() and os.set_blocking()')
1553 def test_blocking(self):
1554 self.check(os.get_blocking)
1555 self.check(os.set_blocking, True)
1556
Brian Curtin1b9df392010-11-24 20:24:31 +00001557
1558class LinkTests(unittest.TestCase):
1559 def setUp(self):
1560 self.file1 = support.TESTFN
1561 self.file2 = os.path.join(support.TESTFN + "2")
1562
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001563 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001564 for file in (self.file1, self.file2):
1565 if os.path.exists(file):
1566 os.unlink(file)
1567
Brian Curtin1b9df392010-11-24 20:24:31 +00001568 def _test_link(self, file1, file2):
1569 with open(file1, "w") as f1:
1570 f1.write("test")
1571
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001572 with warnings.catch_warnings():
1573 warnings.simplefilter("ignore", DeprecationWarning)
1574 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001575 with open(file1, "r") as f1, open(file2, "r") as f2:
1576 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1577
1578 def test_link(self):
1579 self._test_link(self.file1, self.file2)
1580
1581 def test_link_bytes(self):
1582 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1583 bytes(self.file2, sys.getfilesystemencoding()))
1584
Brian Curtinf498b752010-11-30 15:54:04 +00001585 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001586 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001587 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001588 except UnicodeError:
1589 raise unittest.SkipTest("Unable to encode for this platform.")
1590
Brian Curtinf498b752010-11-30 15:54:04 +00001591 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001592 self.file2 = self.file1 + "2"
1593 self._test_link(self.file1, self.file2)
1594
Serhiy Storchaka43767632013-11-03 21:31:38 +02001595@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1596class PosixUidGidTests(unittest.TestCase):
1597 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1598 def test_setuid(self):
1599 if os.getuid() != 0:
1600 self.assertRaises(OSError, os.setuid, 0)
1601 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001602
Serhiy Storchaka43767632013-11-03 21:31:38 +02001603 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1604 def test_setgid(self):
1605 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1606 self.assertRaises(OSError, os.setgid, 0)
1607 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001608
Serhiy Storchaka43767632013-11-03 21:31:38 +02001609 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1610 def test_seteuid(self):
1611 if os.getuid() != 0:
1612 self.assertRaises(OSError, os.seteuid, 0)
1613 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001614
Serhiy Storchaka43767632013-11-03 21:31:38 +02001615 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1616 def test_setegid(self):
1617 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1618 self.assertRaises(OSError, os.setegid, 0)
1619 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001620
Serhiy Storchaka43767632013-11-03 21:31:38 +02001621 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1622 def test_setreuid(self):
1623 if os.getuid() != 0:
1624 self.assertRaises(OSError, os.setreuid, 0, 0)
1625 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1626 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001627
Serhiy Storchaka43767632013-11-03 21:31:38 +02001628 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1629 def test_setreuid_neg1(self):
1630 # Needs to accept -1. We run this in a subprocess to avoid
1631 # altering the test runner's process state (issue8045).
1632 subprocess.check_call([
1633 sys.executable, '-c',
1634 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001635
Serhiy Storchaka43767632013-11-03 21:31:38 +02001636 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1637 def test_setregid(self):
1638 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1639 self.assertRaises(OSError, os.setregid, 0, 0)
1640 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1641 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001642
Serhiy Storchaka43767632013-11-03 21:31:38 +02001643 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1644 def test_setregid_neg1(self):
1645 # Needs to accept -1. We run this in a subprocess to avoid
1646 # altering the test runner's process state (issue8045).
1647 subprocess.check_call([
1648 sys.executable, '-c',
1649 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001650
Serhiy Storchaka43767632013-11-03 21:31:38 +02001651@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1652class Pep383Tests(unittest.TestCase):
1653 def setUp(self):
1654 if support.TESTFN_UNENCODABLE:
1655 self.dir = support.TESTFN_UNENCODABLE
1656 elif support.TESTFN_NONASCII:
1657 self.dir = support.TESTFN_NONASCII
1658 else:
1659 self.dir = support.TESTFN
1660 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001661
Serhiy Storchaka43767632013-11-03 21:31:38 +02001662 bytesfn = []
1663 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001664 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 fn = os.fsencode(fn)
1666 except UnicodeEncodeError:
1667 return
1668 bytesfn.append(fn)
1669 add_filename(support.TESTFN_UNICODE)
1670 if support.TESTFN_UNENCODABLE:
1671 add_filename(support.TESTFN_UNENCODABLE)
1672 if support.TESTFN_NONASCII:
1673 add_filename(support.TESTFN_NONASCII)
1674 if not bytesfn:
1675 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001676
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 self.unicodefn = set()
1678 os.mkdir(self.dir)
1679 try:
1680 for fn in bytesfn:
1681 support.create_empty_file(os.path.join(self.bdir, fn))
1682 fn = os.fsdecode(fn)
1683 if fn in self.unicodefn:
1684 raise ValueError("duplicate filename")
1685 self.unicodefn.add(fn)
1686 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001687 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001688 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001689
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 def tearDown(self):
1691 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001692
Serhiy Storchaka43767632013-11-03 21:31:38 +02001693 def test_listdir(self):
1694 expected = self.unicodefn
1695 found = set(os.listdir(self.dir))
1696 self.assertEqual(found, expected)
1697 # test listdir without arguments
1698 current_directory = os.getcwd()
1699 try:
1700 os.chdir(os.sep)
1701 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1702 finally:
1703 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001704
Serhiy Storchaka43767632013-11-03 21:31:38 +02001705 def test_open(self):
1706 for fn in self.unicodefn:
1707 f = open(os.path.join(self.dir, fn), 'rb')
1708 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001709
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 @unittest.skipUnless(hasattr(os, 'statvfs'),
1711 "need os.statvfs()")
1712 def test_statvfs(self):
1713 # issue #9645
1714 for fn in self.unicodefn:
1715 # should not fail with file not found error
1716 fullname = os.path.join(self.dir, fn)
1717 os.statvfs(fullname)
1718
1719 def test_stat(self):
1720 for fn in self.unicodefn:
1721 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001722
Brian Curtineb24d742010-04-12 17:16:38 +00001723@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1724class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001725 def _kill(self, sig):
1726 # Start sys.executable as a subprocess and communicate from the
1727 # subprocess to the parent that the interpreter is ready. When it
1728 # becomes ready, send *sig* via os.kill to the subprocess and check
1729 # that the return code is equal to *sig*.
1730 import ctypes
1731 from ctypes import wintypes
1732 import msvcrt
1733
1734 # Since we can't access the contents of the process' stdout until the
1735 # process has exited, use PeekNamedPipe to see what's inside stdout
1736 # without waiting. This is done so we can tell that the interpreter
1737 # is started and running at a point where it could handle a signal.
1738 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1739 PeekNamedPipe.restype = wintypes.BOOL
1740 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1741 ctypes.POINTER(ctypes.c_char), # stdout buf
1742 wintypes.DWORD, # Buffer size
1743 ctypes.POINTER(wintypes.DWORD), # bytes read
1744 ctypes.POINTER(wintypes.DWORD), # bytes avail
1745 ctypes.POINTER(wintypes.DWORD)) # bytes left
1746 msg = "running"
1747 proc = subprocess.Popen([sys.executable, "-c",
1748 "import sys;"
1749 "sys.stdout.write('{}');"
1750 "sys.stdout.flush();"
1751 "input()".format(msg)],
1752 stdout=subprocess.PIPE,
1753 stderr=subprocess.PIPE,
1754 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001755 self.addCleanup(proc.stdout.close)
1756 self.addCleanup(proc.stderr.close)
1757 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001758
1759 count, max = 0, 100
1760 while count < max and proc.poll() is None:
1761 # Create a string buffer to store the result of stdout from the pipe
1762 buf = ctypes.create_string_buffer(len(msg))
1763 # Obtain the text currently in proc.stdout
1764 # Bytes read/avail/left are left as NULL and unused
1765 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1766 buf, ctypes.sizeof(buf), None, None, None)
1767 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1768 if buf.value:
1769 self.assertEqual(msg, buf.value.decode())
1770 break
1771 time.sleep(0.1)
1772 count += 1
1773 else:
1774 self.fail("Did not receive communication from the subprocess")
1775
Brian Curtineb24d742010-04-12 17:16:38 +00001776 os.kill(proc.pid, sig)
1777 self.assertEqual(proc.wait(), sig)
1778
1779 def test_kill_sigterm(self):
1780 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001781 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001782
1783 def test_kill_int(self):
1784 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001785 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001786
1787 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001788 tagname = "test_os_%s" % uuid.uuid1()
1789 m = mmap.mmap(-1, 1, tagname)
1790 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001791 # Run a script which has console control handling enabled.
1792 proc = subprocess.Popen([sys.executable,
1793 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001794 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001795 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1796 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001797 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001798 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001799 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001800 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001801 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001802 count += 1
1803 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001804 # Forcefully kill the process if we weren't able to signal it.
1805 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001806 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001807 os.kill(proc.pid, event)
1808 # proc.send_signal(event) could also be done here.
1809 # Allow time for the signal to be passed and the process to exit.
1810 time.sleep(0.5)
1811 if not proc.poll():
1812 # Forcefully kill the process if we weren't able to signal it.
1813 os.kill(proc.pid, signal.SIGINT)
1814 self.fail("subprocess did not stop on {}".format(name))
1815
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001816 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001817 def test_CTRL_C_EVENT(self):
1818 from ctypes import wintypes
1819 import ctypes
1820
1821 # Make a NULL value by creating a pointer with no argument.
1822 NULL = ctypes.POINTER(ctypes.c_int)()
1823 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1824 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1825 wintypes.BOOL)
1826 SetConsoleCtrlHandler.restype = wintypes.BOOL
1827
1828 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001829 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001830 # by subprocesses.
1831 SetConsoleCtrlHandler(NULL, 0)
1832
1833 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1834
1835 def test_CTRL_BREAK_EVENT(self):
1836 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1837
1838
Brian Curtind40e6f72010-07-08 21:39:08 +00001839@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001840class Win32ListdirTests(unittest.TestCase):
1841 """Test listdir on Windows."""
1842
1843 def setUp(self):
1844 self.created_paths = []
1845 for i in range(2):
1846 dir_name = 'SUB%d' % i
1847 dir_path = os.path.join(support.TESTFN, dir_name)
1848 file_name = 'FILE%d' % i
1849 file_path = os.path.join(support.TESTFN, file_name)
1850 os.makedirs(dir_path)
1851 with open(file_path, 'w') as f:
1852 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1853 self.created_paths.extend([dir_name, file_name])
1854 self.created_paths.sort()
1855
1856 def tearDown(self):
1857 shutil.rmtree(support.TESTFN)
1858
1859 def test_listdir_no_extended_path(self):
1860 """Test when the path is not an "extended" path."""
1861 # unicode
1862 self.assertEqual(
1863 sorted(os.listdir(support.TESTFN)),
1864 self.created_paths)
1865 # bytes
1866 self.assertEqual(
1867 sorted(os.listdir(os.fsencode(support.TESTFN))),
1868 [os.fsencode(path) for path in self.created_paths])
1869
1870 def test_listdir_extended_path(self):
1871 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001872 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001873 # unicode
1874 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1875 self.assertEqual(
1876 sorted(os.listdir(path)),
1877 self.created_paths)
1878 # bytes
1879 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1880 self.assertEqual(
1881 sorted(os.listdir(path)),
1882 [os.fsencode(path) for path in self.created_paths])
1883
1884
1885@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001886@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001887class Win32SymlinkTests(unittest.TestCase):
1888 filelink = 'filelinktest'
1889 filelink_target = os.path.abspath(__file__)
1890 dirlink = 'dirlinktest'
1891 dirlink_target = os.path.dirname(filelink_target)
1892 missing_link = 'missing link'
1893
1894 def setUp(self):
1895 assert os.path.exists(self.dirlink_target)
1896 assert os.path.exists(self.filelink_target)
1897 assert not os.path.exists(self.dirlink)
1898 assert not os.path.exists(self.filelink)
1899 assert not os.path.exists(self.missing_link)
1900
1901 def tearDown(self):
1902 if os.path.exists(self.filelink):
1903 os.remove(self.filelink)
1904 if os.path.exists(self.dirlink):
1905 os.rmdir(self.dirlink)
1906 if os.path.lexists(self.missing_link):
1907 os.remove(self.missing_link)
1908
1909 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001910 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001911 self.assertTrue(os.path.exists(self.dirlink))
1912 self.assertTrue(os.path.isdir(self.dirlink))
1913 self.assertTrue(os.path.islink(self.dirlink))
1914 self.check_stat(self.dirlink, self.dirlink_target)
1915
1916 def test_file_link(self):
1917 os.symlink(self.filelink_target, self.filelink)
1918 self.assertTrue(os.path.exists(self.filelink))
1919 self.assertTrue(os.path.isfile(self.filelink))
1920 self.assertTrue(os.path.islink(self.filelink))
1921 self.check_stat(self.filelink, self.filelink_target)
1922
1923 def _create_missing_dir_link(self):
1924 'Create a "directory" link to a non-existent target'
1925 linkname = self.missing_link
1926 if os.path.lexists(linkname):
1927 os.remove(linkname)
1928 target = r'c:\\target does not exist.29r3c740'
1929 assert not os.path.exists(target)
1930 target_is_dir = True
1931 os.symlink(target, linkname, target_is_dir)
1932
1933 def test_remove_directory_link_to_missing_target(self):
1934 self._create_missing_dir_link()
1935 # For compatibility with Unix, os.remove will check the
1936 # directory status and call RemoveDirectory if the symlink
1937 # was created with target_is_dir==True.
1938 os.remove(self.missing_link)
1939
1940 @unittest.skip("currently fails; consider for improvement")
1941 def test_isdir_on_directory_link_to_missing_target(self):
1942 self._create_missing_dir_link()
1943 # consider having isdir return true for directory links
1944 self.assertTrue(os.path.isdir(self.missing_link))
1945
1946 @unittest.skip("currently fails; consider for improvement")
1947 def test_rmdir_on_directory_link_to_missing_target(self):
1948 self._create_missing_dir_link()
1949 # consider allowing rmdir to remove directory links
1950 os.rmdir(self.missing_link)
1951
1952 def check_stat(self, link, target):
1953 self.assertEqual(os.stat(link), os.stat(target))
1954 self.assertNotEqual(os.lstat(link), os.stat(link))
1955
Brian Curtind25aef52011-06-13 15:16:04 -05001956 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001957 with warnings.catch_warnings():
1958 warnings.simplefilter("ignore", DeprecationWarning)
1959 self.assertEqual(os.stat(bytes_link), os.stat(target))
1960 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001961
1962 def test_12084(self):
1963 level1 = os.path.abspath(support.TESTFN)
1964 level2 = os.path.join(level1, "level2")
1965 level3 = os.path.join(level2, "level3")
1966 try:
1967 os.mkdir(level1)
1968 os.mkdir(level2)
1969 os.mkdir(level3)
1970
1971 file1 = os.path.abspath(os.path.join(level1, "file1"))
1972
1973 with open(file1, "w") as f:
1974 f.write("file1")
1975
1976 orig_dir = os.getcwd()
1977 try:
1978 os.chdir(level2)
1979 link = os.path.join(level2, "link")
1980 os.symlink(os.path.relpath(file1), "link")
1981 self.assertIn("link", os.listdir(os.getcwd()))
1982
1983 # Check os.stat calls from the same dir as the link
1984 self.assertEqual(os.stat(file1), os.stat("link"))
1985
1986 # Check os.stat calls from a dir below the link
1987 os.chdir(level1)
1988 self.assertEqual(os.stat(file1),
1989 os.stat(os.path.relpath(link)))
1990
1991 # Check os.stat calls from a dir above the link
1992 os.chdir(level3)
1993 self.assertEqual(os.stat(file1),
1994 os.stat(os.path.relpath(link)))
1995 finally:
1996 os.chdir(orig_dir)
1997 except OSError as err:
1998 self.fail(err)
1999 finally:
2000 os.remove(file1)
2001 shutil.rmtree(level1)
2002
Brian Curtind40e6f72010-07-08 21:39:08 +00002003
Tim Golden0321cf22014-05-05 19:46:17 +01002004@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2005class Win32JunctionTests(unittest.TestCase):
2006 junction = 'junctiontest'
2007 junction_target = os.path.dirname(os.path.abspath(__file__))
2008
2009 def setUp(self):
2010 assert os.path.exists(self.junction_target)
2011 assert not os.path.exists(self.junction)
2012
2013 def tearDown(self):
2014 if os.path.exists(self.junction):
2015 # os.rmdir delegates to Windows' RemoveDirectoryW,
2016 # which removes junction points safely.
2017 os.rmdir(self.junction)
2018
2019 def test_create_junction(self):
2020 _winapi.CreateJunction(self.junction_target, self.junction)
2021 self.assertTrue(os.path.exists(self.junction))
2022 self.assertTrue(os.path.isdir(self.junction))
2023
2024 # Junctions are not recognized as links.
2025 self.assertFalse(os.path.islink(self.junction))
2026
2027 def test_unlink_removes_junction(self):
2028 _winapi.CreateJunction(self.junction_target, self.junction)
2029 self.assertTrue(os.path.exists(self.junction))
2030
2031 os.unlink(self.junction)
2032 self.assertFalse(os.path.exists(self.junction))
2033
2034
Jason R. Coombs3a092862013-05-27 23:21:28 -04002035@support.skip_unless_symlink
2036class NonLocalSymlinkTests(unittest.TestCase):
2037
2038 def setUp(self):
2039 """
2040 Create this structure:
2041
2042 base
2043 \___ some_dir
2044 """
2045 os.makedirs('base/some_dir')
2046
2047 def tearDown(self):
2048 shutil.rmtree('base')
2049
2050 def test_directory_link_nonlocal(self):
2051 """
2052 The symlink target should resolve relative to the link, not relative
2053 to the current directory.
2054
2055 Then, link base/some_link -> base/some_dir and ensure that some_link
2056 is resolved as a directory.
2057
2058 In issue13772, it was discovered that directory detection failed if
2059 the symlink target was not specified relative to the current
2060 directory, which was a defect in the implementation.
2061 """
2062 src = os.path.join('base', 'some_link')
2063 os.symlink('some_dir', src)
2064 assert os.path.isdir(src)
2065
2066
Victor Stinnere8d51452010-08-19 01:05:19 +00002067class FSEncodingTests(unittest.TestCase):
2068 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002069 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2070 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002071
Victor Stinnere8d51452010-08-19 01:05:19 +00002072 def test_identity(self):
2073 # assert fsdecode(fsencode(x)) == x
2074 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2075 try:
2076 bytesfn = os.fsencode(fn)
2077 except UnicodeEncodeError:
2078 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002079 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002080
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002081
Brett Cannonefb00c02012-02-29 18:31:31 -05002082
2083class DeviceEncodingTests(unittest.TestCase):
2084
2085 def test_bad_fd(self):
2086 # Return None when an fd doesn't actually exist.
2087 self.assertIsNone(os.device_encoding(123456))
2088
Philip Jenveye308b7c2012-02-29 16:16:15 -08002089 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2090 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002091 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002092 def test_device_encoding(self):
2093 encoding = os.device_encoding(0)
2094 self.assertIsNotNone(encoding)
2095 self.assertTrue(codecs.lookup(encoding))
2096
2097
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002098class PidTests(unittest.TestCase):
2099 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2100 def test_getppid(self):
2101 p = subprocess.Popen([sys.executable, '-c',
2102 'import os; print(os.getppid())'],
2103 stdout=subprocess.PIPE)
2104 stdout, _ = p.communicate()
2105 # We are the parent of our subprocess
2106 self.assertEqual(int(stdout), os.getpid())
2107
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002108 def test_waitpid(self):
2109 args = [sys.executable, '-c', 'pass']
2110 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2111 status = os.waitpid(pid, 0)
2112 self.assertEqual(status, (pid, 0))
2113
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002114
Brian Curtin0151b8e2010-09-24 13:43:43 +00002115# The introduction of this TestCase caused at least two different errors on
2116# *nix buildbots. Temporarily skip this to let the buildbots move along.
2117@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002118@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2119class LoginTests(unittest.TestCase):
2120 def test_getlogin(self):
2121 user_name = os.getlogin()
2122 self.assertNotEqual(len(user_name), 0)
2123
2124
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002125@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2126 "needs os.getpriority and os.setpriority")
2127class ProgramPriorityTests(unittest.TestCase):
2128 """Tests for os.getpriority() and os.setpriority()."""
2129
2130 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002131
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002132 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2133 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2134 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002135 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2136 if base >= 19 and new_prio <= 19:
2137 raise unittest.SkipTest(
2138 "unable to reliably test setpriority at current nice level of %s" % base)
2139 else:
2140 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002141 finally:
2142 try:
2143 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2144 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002145 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002146 raise
2147
2148
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002149if threading is not None:
2150 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002151
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002152 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002153
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002154 def __init__(self, conn):
2155 asynchat.async_chat.__init__(self, conn)
2156 self.in_buffer = []
2157 self.closed = False
2158 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002159
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002160 def handle_read(self):
2161 data = self.recv(4096)
2162 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002163
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002164 def get_data(self):
2165 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002166
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002167 def handle_close(self):
2168 self.close()
2169 self.closed = True
2170
2171 def handle_error(self):
2172 raise
2173
2174 def __init__(self, address):
2175 threading.Thread.__init__(self)
2176 asyncore.dispatcher.__init__(self)
2177 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2178 self.bind(address)
2179 self.listen(5)
2180 self.host, self.port = self.socket.getsockname()[:2]
2181 self.handler_instance = None
2182 self._active = False
2183 self._active_lock = threading.Lock()
2184
2185 # --- public API
2186
2187 @property
2188 def running(self):
2189 return self._active
2190
2191 def start(self):
2192 assert not self.running
2193 self.__flag = threading.Event()
2194 threading.Thread.start(self)
2195 self.__flag.wait()
2196
2197 def stop(self):
2198 assert self.running
2199 self._active = False
2200 self.join()
2201
2202 def wait(self):
2203 # wait for handler connection to be closed, then stop the server
2204 while not getattr(self.handler_instance, "closed", False):
2205 time.sleep(0.001)
2206 self.stop()
2207
2208 # --- internals
2209
2210 def run(self):
2211 self._active = True
2212 self.__flag.set()
2213 while self._active and asyncore.socket_map:
2214 self._active_lock.acquire()
2215 asyncore.loop(timeout=0.001, count=1)
2216 self._active_lock.release()
2217 asyncore.close_all()
2218
2219 def handle_accept(self):
2220 conn, addr = self.accept()
2221 self.handler_instance = self.Handler(conn)
2222
2223 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002224 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002225 handle_read = handle_connect
2226
2227 def writable(self):
2228 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002229
2230 def handle_error(self):
2231 raise
2232
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002233
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002234@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002235@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2236class TestSendfile(unittest.TestCase):
2237
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002238 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002239 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002240 not sys.platform.startswith("solaris") and \
2241 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002242 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2243 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002244
2245 @classmethod
2246 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002247 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002248 with open(support.TESTFN, "wb") as f:
2249 f.write(cls.DATA)
2250
2251 @classmethod
2252 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002253 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002254 support.unlink(support.TESTFN)
2255
2256 def setUp(self):
2257 self.server = SendfileTestServer((support.HOST, 0))
2258 self.server.start()
2259 self.client = socket.socket()
2260 self.client.connect((self.server.host, self.server.port))
2261 self.client.settimeout(1)
2262 # synchronize by waiting for "220 ready" response
2263 self.client.recv(1024)
2264 self.sockno = self.client.fileno()
2265 self.file = open(support.TESTFN, 'rb')
2266 self.fileno = self.file.fileno()
2267
2268 def tearDown(self):
2269 self.file.close()
2270 self.client.close()
2271 if self.server.running:
2272 self.server.stop()
2273
2274 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2275 """A higher level wrapper representing how an application is
2276 supposed to use sendfile().
2277 """
2278 while 1:
2279 try:
2280 if self.SUPPORT_HEADERS_TRAILERS:
2281 return os.sendfile(sock, file, offset, nbytes, headers,
2282 trailers)
2283 else:
2284 return os.sendfile(sock, file, offset, nbytes)
2285 except OSError as err:
2286 if err.errno == errno.ECONNRESET:
2287 # disconnected
2288 raise
2289 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2290 # we have to retry send data
2291 continue
2292 else:
2293 raise
2294
2295 def test_send_whole_file(self):
2296 # normal send
2297 total_sent = 0
2298 offset = 0
2299 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002300 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002301 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2302 if sent == 0:
2303 break
2304 offset += sent
2305 total_sent += sent
2306 self.assertTrue(sent <= nbytes)
2307 self.assertEqual(offset, total_sent)
2308
2309 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002310 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002311 self.client.close()
2312 self.server.wait()
2313 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002314 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002315 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002316
2317 def test_send_at_certain_offset(self):
2318 # start sending a file at a certain offset
2319 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002320 offset = len(self.DATA) // 2
2321 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002322 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002323 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002324 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2325 if sent == 0:
2326 break
2327 offset += sent
2328 total_sent += sent
2329 self.assertTrue(sent <= nbytes)
2330
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002331 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002332 self.client.close()
2333 self.server.wait()
2334 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002335 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002336 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002337 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002338 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002339
2340 def test_offset_overflow(self):
2341 # specify an offset > file size
2342 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002343 try:
2344 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2345 except OSError as e:
2346 # Solaris can raise EINVAL if offset >= file length, ignore.
2347 if e.errno != errno.EINVAL:
2348 raise
2349 else:
2350 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002351 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002352 self.client.close()
2353 self.server.wait()
2354 data = self.server.handler_instance.get_data()
2355 self.assertEqual(data, b'')
2356
2357 def test_invalid_offset(self):
2358 with self.assertRaises(OSError) as cm:
2359 os.sendfile(self.sockno, self.fileno, -1, 4096)
2360 self.assertEqual(cm.exception.errno, errno.EINVAL)
2361
Martin Panterbf19d162015-09-09 01:01:13 +00002362 def test_keywords(self):
2363 # Keyword arguments should be supported
2364 os.sendfile(out=self.sockno, offset=0, count=4096,
2365 **{'in': self.fileno})
2366 if self.SUPPORT_HEADERS_TRAILERS:
2367 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002368 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002369
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002370 # --- headers / trailers tests
2371
Serhiy Storchaka43767632013-11-03 21:31:38 +02002372 @requires_headers_trailers
2373 def test_headers(self):
2374 total_sent = 0
2375 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2376 headers=[b"x" * 512])
2377 total_sent += sent
2378 offset = 4096
2379 nbytes = 4096
2380 while 1:
2381 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2382 offset, nbytes)
2383 if sent == 0:
2384 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002385 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002386 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002387
Serhiy Storchaka43767632013-11-03 21:31:38 +02002388 expected_data = b"x" * 512 + self.DATA
2389 self.assertEqual(total_sent, len(expected_data))
2390 self.client.close()
2391 self.server.wait()
2392 data = self.server.handler_instance.get_data()
2393 self.assertEqual(hash(data), hash(expected_data))
2394
2395 @requires_headers_trailers
2396 def test_trailers(self):
2397 TESTFN2 = support.TESTFN + "2"
2398 file_data = b"abcdef"
2399 with open(TESTFN2, 'wb') as f:
2400 f.write(file_data)
2401 with open(TESTFN2, 'rb')as f:
2402 self.addCleanup(os.remove, TESTFN2)
2403 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2404 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002405 self.client.close()
2406 self.server.wait()
2407 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002408 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002409
Serhiy Storchaka43767632013-11-03 21:31:38 +02002410 @requires_headers_trailers
2411 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2412 'test needs os.SF_NODISKIO')
2413 def test_flags(self):
2414 try:
2415 os.sendfile(self.sockno, self.fileno, 0, 4096,
2416 flags=os.SF_NODISKIO)
2417 except OSError as err:
2418 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2419 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002420
2421
Larry Hastings9cf065c2012-06-22 16:30:09 -07002422def supports_extended_attributes():
2423 if not hasattr(os, "setxattr"):
2424 return False
2425 try:
2426 with open(support.TESTFN, "wb") as fp:
2427 try:
2428 os.setxattr(fp.fileno(), b"user.test", b"")
2429 except OSError:
2430 return False
2431 finally:
2432 support.unlink(support.TESTFN)
2433 # Kernels < 2.6.39 don't respect setxattr flags.
2434 kernel_version = platform.release()
2435 m = re.match("2.6.(\d{1,2})", kernel_version)
2436 return m is None or int(m.group(1)) >= 39
2437
2438
2439@unittest.skipUnless(supports_extended_attributes(),
2440 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002441class ExtendedAttributeTests(unittest.TestCase):
2442
2443 def tearDown(self):
2444 support.unlink(support.TESTFN)
2445
Larry Hastings9cf065c2012-06-22 16:30:09 -07002446 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002447 fn = support.TESTFN
2448 open(fn, "wb").close()
2449 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002450 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002451 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002452 init_xattr = listxattr(fn)
2453 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002454 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002455 xattr = set(init_xattr)
2456 xattr.add("user.test")
2457 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002458 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2459 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2460 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002461 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002462 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002463 self.assertEqual(cm.exception.errno, errno.EEXIST)
2464 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002465 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002466 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002467 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002468 xattr.add("user.test2")
2469 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002470 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002471 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002472 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002473 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002474 xattr.remove("user.test")
2475 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002476 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2477 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2478 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2479 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002480 many = sorted("user.test{}".format(i) for i in range(100))
2481 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002482 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002483 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002484
Larry Hastings9cf065c2012-06-22 16:30:09 -07002485 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002486 def make_bytes(s):
2487 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002488 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002489 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002490 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002491
2492 def test_simple(self):
2493 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2494 os.listxattr)
2495
2496 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002497 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2498 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002499
2500 def test_fds(self):
2501 def getxattr(path, *args):
2502 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002503 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002504 def setxattr(path, *args):
2505 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002506 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002507 def removexattr(path, *args):
2508 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002509 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002510 def listxattr(path, *args):
2511 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002512 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002513 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2514
2515
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002516@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2517class Win32DeprecatedBytesAPI(unittest.TestCase):
2518 def test_deprecated(self):
2519 import nt
2520 filename = os.fsencode(support.TESTFN)
2521 with warnings.catch_warnings():
2522 warnings.simplefilter("error", DeprecationWarning)
2523 for func, *args in (
2524 (nt._getfullpathname, filename),
2525 (nt._isdir, filename),
2526 (os.access, filename, os.R_OK),
2527 (os.chdir, filename),
2528 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002529 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002530 (os.link, filename, filename),
2531 (os.listdir, filename),
2532 (os.lstat, filename),
2533 (os.mkdir, filename),
2534 (os.open, filename, os.O_RDONLY),
2535 (os.rename, filename, filename),
2536 (os.rmdir, filename),
2537 (os.startfile, filename),
2538 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002539 (os.unlink, filename),
2540 (os.utime, filename),
2541 ):
2542 self.assertRaises(DeprecationWarning, func, *args)
2543
Victor Stinner28216442011-11-16 00:34:44 +01002544 @support.skip_unless_symlink
2545 def test_symlink(self):
2546 filename = os.fsencode(support.TESTFN)
2547 with warnings.catch_warnings():
2548 warnings.simplefilter("error", DeprecationWarning)
2549 self.assertRaises(DeprecationWarning,
2550 os.symlink, filename, filename)
2551
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002552
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002553@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2554class TermsizeTests(unittest.TestCase):
2555 def test_does_not_crash(self):
2556 """Check if get_terminal_size() returns a meaningful value.
2557
2558 There's no easy portable way to actually check the size of the
2559 terminal, so let's check if it returns something sensible instead.
2560 """
2561 try:
2562 size = os.get_terminal_size()
2563 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002564 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002565 # Under win32 a generic OSError can be thrown if the
2566 # handle cannot be retrieved
2567 self.skipTest("failed to query terminal size")
2568 raise
2569
Antoine Pitroucfade362012-02-08 23:48:59 +01002570 self.assertGreaterEqual(size.columns, 0)
2571 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002572
2573 def test_stty_match(self):
2574 """Check if stty returns the same results
2575
2576 stty actually tests stdin, so get_terminal_size is invoked on
2577 stdin explicitly. If stty succeeded, then get_terminal_size()
2578 should work too.
2579 """
2580 try:
2581 size = subprocess.check_output(['stty', 'size']).decode().split()
2582 except (FileNotFoundError, subprocess.CalledProcessError):
2583 self.skipTest("stty invocation failed")
2584 expected = (int(size[1]), int(size[0])) # reversed order
2585
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002586 try:
2587 actual = os.get_terminal_size(sys.__stdin__.fileno())
2588 except OSError as e:
2589 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2590 # Under win32 a generic OSError can be thrown if the
2591 # handle cannot be retrieved
2592 self.skipTest("failed to query terminal size")
2593 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002594 self.assertEqual(expected, actual)
2595
2596
Victor Stinner292c8352012-10-30 02:17:38 +01002597class OSErrorTests(unittest.TestCase):
2598 def setUp(self):
2599 class Str(str):
2600 pass
2601
Victor Stinnerafe17062012-10-31 22:47:43 +01002602 self.bytes_filenames = []
2603 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002604 if support.TESTFN_UNENCODABLE is not None:
2605 decoded = support.TESTFN_UNENCODABLE
2606 else:
2607 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002608 self.unicode_filenames.append(decoded)
2609 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002610 if support.TESTFN_UNDECODABLE is not None:
2611 encoded = support.TESTFN_UNDECODABLE
2612 else:
2613 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002614 self.bytes_filenames.append(encoded)
2615 self.bytes_filenames.append(memoryview(encoded))
2616
2617 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002618
2619 def test_oserror_filename(self):
2620 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002621 (self.filenames, os.chdir,),
2622 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002623 (self.filenames, os.lstat,),
2624 (self.filenames, os.open, os.O_RDONLY),
2625 (self.filenames, os.rmdir,),
2626 (self.filenames, os.stat,),
2627 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002628 ]
2629 if sys.platform == "win32":
2630 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002631 (self.bytes_filenames, os.rename, b"dst"),
2632 (self.bytes_filenames, os.replace, b"dst"),
2633 (self.unicode_filenames, os.rename, "dst"),
2634 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002635 # Issue #16414: Don't test undecodable names with listdir()
2636 # because of a Windows bug.
2637 #
2638 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2639 # empty list (instead of failing), whereas os.listdir(b'\xff')
2640 # raises a FileNotFoundError. It looks like a Windows bug:
2641 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2642 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2643 # ERROR_PATH_NOT_FOUND (3).
2644 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002645 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002646 else:
2647 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002648 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002649 (self.filenames, os.rename, "dst"),
2650 (self.filenames, os.replace, "dst"),
2651 ))
2652 if hasattr(os, "chown"):
2653 funcs.append((self.filenames, os.chown, 0, 0))
2654 if hasattr(os, "lchown"):
2655 funcs.append((self.filenames, os.lchown, 0, 0))
2656 if hasattr(os, "truncate"):
2657 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002658 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002659 funcs.append((self.filenames, os.chflags, 0))
2660 if hasattr(os, "lchflags"):
2661 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002662 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002663 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002664 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002665 if sys.platform == "win32":
2666 funcs.append((self.bytes_filenames, os.link, b"dst"))
2667 funcs.append((self.unicode_filenames, os.link, "dst"))
2668 else:
2669 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002670 if hasattr(os, "listxattr"):
2671 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002672 (self.filenames, os.listxattr,),
2673 (self.filenames, os.getxattr, "user.test"),
2674 (self.filenames, os.setxattr, "user.test", b'user'),
2675 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002676 ))
2677 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002678 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002679 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002680 if sys.platform == "win32":
2681 funcs.append((self.unicode_filenames, os.readlink,))
2682 else:
2683 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002684
Victor Stinnerafe17062012-10-31 22:47:43 +01002685 for filenames, func, *func_args in funcs:
2686 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002687 try:
2688 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002689 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002690 self.assertIs(err.filename, name)
2691 else:
2692 self.fail("No exception thrown by {}".format(func))
2693
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002694class CPUCountTests(unittest.TestCase):
2695 def test_cpu_count(self):
2696 cpus = os.cpu_count()
2697 if cpus is not None:
2698 self.assertIsInstance(cpus, int)
2699 self.assertGreater(cpus, 0)
2700 else:
2701 self.skipTest("Could not determine the number of CPUs")
2702
Victor Stinnerdaf45552013-08-28 00:53:59 +02002703
2704class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002705 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002706 fd = os.open(__file__, os.O_RDONLY)
2707 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002708 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002709
Victor Stinnerdaf45552013-08-28 00:53:59 +02002710 os.set_inheritable(fd, True)
2711 self.assertEqual(os.get_inheritable(fd), True)
2712
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002713 @unittest.skipIf(fcntl is None, "need fcntl")
2714 def test_get_inheritable_cloexec(self):
2715 fd = os.open(__file__, os.O_RDONLY)
2716 self.addCleanup(os.close, fd)
2717 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002718
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002719 # clear FD_CLOEXEC flag
2720 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2721 flags &= ~fcntl.FD_CLOEXEC
2722 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002723
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002724 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002725
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002726 @unittest.skipIf(fcntl is None, "need fcntl")
2727 def test_set_inheritable_cloexec(self):
2728 fd = os.open(__file__, os.O_RDONLY)
2729 self.addCleanup(os.close, fd)
2730 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2731 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002732
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002733 os.set_inheritable(fd, True)
2734 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2735 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002736
Victor Stinnerdaf45552013-08-28 00:53:59 +02002737 def test_open(self):
2738 fd = os.open(__file__, os.O_RDONLY)
2739 self.addCleanup(os.close, fd)
2740 self.assertEqual(os.get_inheritable(fd), False)
2741
2742 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2743 def test_pipe(self):
2744 rfd, wfd = os.pipe()
2745 self.addCleanup(os.close, rfd)
2746 self.addCleanup(os.close, wfd)
2747 self.assertEqual(os.get_inheritable(rfd), False)
2748 self.assertEqual(os.get_inheritable(wfd), False)
2749
2750 def test_dup(self):
2751 fd1 = os.open(__file__, os.O_RDONLY)
2752 self.addCleanup(os.close, fd1)
2753
2754 fd2 = os.dup(fd1)
2755 self.addCleanup(os.close, fd2)
2756 self.assertEqual(os.get_inheritable(fd2), False)
2757
2758 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2759 def test_dup2(self):
2760 fd = os.open(__file__, os.O_RDONLY)
2761 self.addCleanup(os.close, fd)
2762
2763 # inheritable by default
2764 fd2 = os.open(__file__, os.O_RDONLY)
2765 try:
2766 os.dup2(fd, fd2)
2767 self.assertEqual(os.get_inheritable(fd2), True)
2768 finally:
2769 os.close(fd2)
2770
2771 # force non-inheritable
2772 fd3 = os.open(__file__, os.O_RDONLY)
2773 try:
2774 os.dup2(fd, fd3, inheritable=False)
2775 self.assertEqual(os.get_inheritable(fd3), False)
2776 finally:
2777 os.close(fd3)
2778
2779 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2780 def test_openpty(self):
2781 master_fd, slave_fd = os.openpty()
2782 self.addCleanup(os.close, master_fd)
2783 self.addCleanup(os.close, slave_fd)
2784 self.assertEqual(os.get_inheritable(master_fd), False)
2785 self.assertEqual(os.get_inheritable(slave_fd), False)
2786
2787
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002788@unittest.skipUnless(hasattr(os, 'get_blocking'),
2789 'needs os.get_blocking() and os.set_blocking()')
2790class BlockingTests(unittest.TestCase):
2791 def test_blocking(self):
2792 fd = os.open(__file__, os.O_RDONLY)
2793 self.addCleanup(os.close, fd)
2794 self.assertEqual(os.get_blocking(fd), True)
2795
2796 os.set_blocking(fd, False)
2797 self.assertEqual(os.get_blocking(fd), False)
2798
2799 os.set_blocking(fd, True)
2800 self.assertEqual(os.get_blocking(fd), True)
2801
2802
Yury Selivanov97e2e062014-09-26 12:33:06 -04002803
2804class ExportsTests(unittest.TestCase):
2805 def test_os_all(self):
2806 self.assertIn('open', os.__all__)
2807 self.assertIn('walk', os.__all__)
2808
2809
Victor Stinner6036e442015-03-08 01:58:04 +01002810class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02002811 check_no_resource_warning = support.check_no_resource_warning
2812
Victor Stinner6036e442015-03-08 01:58:04 +01002813 def setUp(self):
2814 self.path = os.path.realpath(support.TESTFN)
2815 self.addCleanup(support.rmtree, self.path)
2816 os.mkdir(self.path)
2817
2818 def create_file(self, name="file.txt"):
2819 filename = os.path.join(self.path, name)
2820 with open(filename, "wb") as fp:
2821 fp.write(b'python')
2822 return filename
2823
2824 def get_entries(self, names):
2825 entries = dict((entry.name, entry)
2826 for entry in os.scandir(self.path))
2827 self.assertEqual(sorted(entries.keys()), names)
2828 return entries
2829
2830 def assert_stat_equal(self, stat1, stat2, skip_fields):
2831 if skip_fields:
2832 for attr in dir(stat1):
2833 if not attr.startswith("st_"):
2834 continue
2835 if attr in ("st_dev", "st_ino", "st_nlink"):
2836 continue
2837 self.assertEqual(getattr(stat1, attr),
2838 getattr(stat2, attr),
2839 (stat1, stat2, attr))
2840 else:
2841 self.assertEqual(stat1, stat2)
2842
2843 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2844 self.assertEqual(entry.name, name)
2845 self.assertEqual(entry.path, os.path.join(self.path, name))
2846 self.assertEqual(entry.inode(),
2847 os.stat(entry.path, follow_symlinks=False).st_ino)
2848
2849 entry_stat = os.stat(entry.path)
2850 self.assertEqual(entry.is_dir(),
2851 stat.S_ISDIR(entry_stat.st_mode))
2852 self.assertEqual(entry.is_file(),
2853 stat.S_ISREG(entry_stat.st_mode))
2854 self.assertEqual(entry.is_symlink(),
2855 os.path.islink(entry.path))
2856
2857 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2858 self.assertEqual(entry.is_dir(follow_symlinks=False),
2859 stat.S_ISDIR(entry_lstat.st_mode))
2860 self.assertEqual(entry.is_file(follow_symlinks=False),
2861 stat.S_ISREG(entry_lstat.st_mode))
2862
2863 self.assert_stat_equal(entry.stat(),
2864 entry_stat,
2865 os.name == 'nt' and not is_symlink)
2866 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2867 entry_lstat,
2868 os.name == 'nt')
2869
2870 def test_attributes(self):
2871 link = hasattr(os, 'link')
2872 symlink = support.can_symlink()
2873
2874 dirname = os.path.join(self.path, "dir")
2875 os.mkdir(dirname)
2876 filename = self.create_file("file.txt")
2877 if link:
2878 os.link(filename, os.path.join(self.path, "link_file.txt"))
2879 if symlink:
2880 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2881 target_is_directory=True)
2882 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2883
2884 names = ['dir', 'file.txt']
2885 if link:
2886 names.append('link_file.txt')
2887 if symlink:
2888 names.extend(('symlink_dir', 'symlink_file.txt'))
2889 entries = self.get_entries(names)
2890
2891 entry = entries['dir']
2892 self.check_entry(entry, 'dir', True, False, False)
2893
2894 entry = entries['file.txt']
2895 self.check_entry(entry, 'file.txt', False, True, False)
2896
2897 if link:
2898 entry = entries['link_file.txt']
2899 self.check_entry(entry, 'link_file.txt', False, True, False)
2900
2901 if symlink:
2902 entry = entries['symlink_dir']
2903 self.check_entry(entry, 'symlink_dir', True, False, True)
2904
2905 entry = entries['symlink_file.txt']
2906 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2907
2908 def get_entry(self, name):
2909 entries = list(os.scandir(self.path))
2910 self.assertEqual(len(entries), 1)
2911
2912 entry = entries[0]
2913 self.assertEqual(entry.name, name)
2914 return entry
2915
2916 def create_file_entry(self):
2917 filename = self.create_file()
2918 return self.get_entry(os.path.basename(filename))
2919
2920 def test_current_directory(self):
2921 filename = self.create_file()
2922 old_dir = os.getcwd()
2923 try:
2924 os.chdir(self.path)
2925
2926 # call scandir() without parameter: it must list the content
2927 # of the current directory
2928 entries = dict((entry.name, entry) for entry in os.scandir())
2929 self.assertEqual(sorted(entries.keys()),
2930 [os.path.basename(filename)])
2931 finally:
2932 os.chdir(old_dir)
2933
2934 def test_repr(self):
2935 entry = self.create_file_entry()
2936 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2937
2938 def test_removed_dir(self):
2939 path = os.path.join(self.path, 'dir')
2940
2941 os.mkdir(path)
2942 entry = self.get_entry('dir')
2943 os.rmdir(path)
2944
2945 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2946 if os.name == 'nt':
2947 self.assertTrue(entry.is_dir())
2948 self.assertFalse(entry.is_file())
2949 self.assertFalse(entry.is_symlink())
2950 if os.name == 'nt':
2951 self.assertRaises(FileNotFoundError, entry.inode)
2952 # don't fail
2953 entry.stat()
2954 entry.stat(follow_symlinks=False)
2955 else:
2956 self.assertGreater(entry.inode(), 0)
2957 self.assertRaises(FileNotFoundError, entry.stat)
2958 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2959
2960 def test_removed_file(self):
2961 entry = self.create_file_entry()
2962 os.unlink(entry.path)
2963
2964 self.assertFalse(entry.is_dir())
2965 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2966 if os.name == 'nt':
2967 self.assertTrue(entry.is_file())
2968 self.assertFalse(entry.is_symlink())
2969 if os.name == 'nt':
2970 self.assertRaises(FileNotFoundError, entry.inode)
2971 # don't fail
2972 entry.stat()
2973 entry.stat(follow_symlinks=False)
2974 else:
2975 self.assertGreater(entry.inode(), 0)
2976 self.assertRaises(FileNotFoundError, entry.stat)
2977 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2978
2979 def test_broken_symlink(self):
2980 if not support.can_symlink():
2981 return self.skipTest('cannot create symbolic link')
2982
2983 filename = self.create_file("file.txt")
2984 os.symlink(filename,
2985 os.path.join(self.path, "symlink.txt"))
2986 entries = self.get_entries(['file.txt', 'symlink.txt'])
2987 entry = entries['symlink.txt']
2988 os.unlink(filename)
2989
2990 self.assertGreater(entry.inode(), 0)
2991 self.assertFalse(entry.is_dir())
2992 self.assertFalse(entry.is_file()) # broken symlink returns False
2993 self.assertFalse(entry.is_dir(follow_symlinks=False))
2994 self.assertFalse(entry.is_file(follow_symlinks=False))
2995 self.assertTrue(entry.is_symlink())
2996 self.assertRaises(FileNotFoundError, entry.stat)
2997 # don't fail
2998 entry.stat(follow_symlinks=False)
2999
3000 def test_bytes(self):
3001 if os.name == "nt":
3002 # On Windows, os.scandir(bytes) must raise an exception
3003 self.assertRaises(TypeError, os.scandir, b'.')
3004 return
3005
3006 self.create_file("file.txt")
3007
3008 path_bytes = os.fsencode(self.path)
3009 entries = list(os.scandir(path_bytes))
3010 self.assertEqual(len(entries), 1, entries)
3011 entry = entries[0]
3012
3013 self.assertEqual(entry.name, b'file.txt')
3014 self.assertEqual(entry.path,
3015 os.fsencode(os.path.join(self.path, 'file.txt')))
3016
3017 def test_empty_path(self):
3018 self.assertRaises(FileNotFoundError, os.scandir, '')
3019
3020 def test_consume_iterator_twice(self):
3021 self.create_file("file.txt")
3022 iterator = os.scandir(self.path)
3023
3024 entries = list(iterator)
3025 self.assertEqual(len(entries), 1, entries)
3026
3027 # check than consuming the iterator twice doesn't raise exception
3028 entries2 = list(iterator)
3029 self.assertEqual(len(entries2), 0, entries2)
3030
3031 def test_bad_path_type(self):
3032 for obj in [1234, 1.234, {}, []]:
3033 self.assertRaises(TypeError, os.scandir, obj)
3034
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003035 def test_close(self):
3036 self.create_file("file.txt")
3037 self.create_file("file2.txt")
3038 iterator = os.scandir(self.path)
3039 next(iterator)
3040 iterator.close()
3041 # multiple closes
3042 iterator.close()
3043 with self.check_no_resource_warning():
3044 del iterator
3045
3046 def test_context_manager(self):
3047 self.create_file("file.txt")
3048 self.create_file("file2.txt")
3049 with os.scandir(self.path) as iterator:
3050 next(iterator)
3051 with self.check_no_resource_warning():
3052 del iterator
3053
3054 def test_context_manager_close(self):
3055 self.create_file("file.txt")
3056 self.create_file("file2.txt")
3057 with os.scandir(self.path) as iterator:
3058 next(iterator)
3059 iterator.close()
3060
3061 def test_context_manager_exception(self):
3062 self.create_file("file.txt")
3063 self.create_file("file2.txt")
3064 with self.assertRaises(ZeroDivisionError):
3065 with os.scandir(self.path) as iterator:
3066 next(iterator)
3067 1/0
3068 with self.check_no_resource_warning():
3069 del iterator
3070
3071 def test_resource_warning(self):
3072 self.create_file("file.txt")
3073 self.create_file("file2.txt")
3074 iterator = os.scandir(self.path)
3075 next(iterator)
3076 with self.assertWarns(ResourceWarning):
3077 del iterator
3078 support.gc_collect()
3079 # exhausted iterator
3080 iterator = os.scandir(self.path)
3081 list(iterator)
3082 with self.check_no_resource_warning():
3083 del iterator
3084
Victor Stinner6036e442015-03-08 01:58:04 +01003085
Fred Drake2e2be372001-09-20 21:33:42 +00003086if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003087 unittest.main()