blob: 08575610d17424702e3382b674a874341dd43222 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020025import time
26import unittest
27import uuid
28import warnings
29from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import grp
48 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
49 if hasattr(os, 'getgid'):
50 process_gid = os.getgid()
51 if process_gid not in groups:
52 groups.append(process_gid)
53except ImportError:
54 groups = []
55try:
56 import pwd
57 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010058except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050059 all_users = []
60try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020062except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020066from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000067
Victor Stinner923590e2016-03-24 09:11:48 +010068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Victor Stinner923590e2016-03-24 09:11:48 +010085
86@contextlib.contextmanager
87def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
Berker Peksag4af23d72016-09-15 20:32:44 +030092def requires_os_func(name):
93 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
94
95
Brett Cannonec6ce872016-09-06 15:50:29 -070096class _PathLike(os.PathLike):
97
98 def __init__(self, path=""):
99 self.path = path
100
101 def __str__(self):
102 return str(self.path)
103
104 def __fspath__(self):
105 if isinstance(self.path, BaseException):
106 raise self.path
107 else:
108 return self.path
109
110
Victor Stinnerae39d232016-03-24 17:12:55 +0100111def create_file(filename, content=b'content'):
112 with open(filename, "xb", 0) as fp:
113 fp.write(content)
114
115
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116# Tests creating TESTFN
117class FileTests(unittest.TestCase):
118 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000119 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000120 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121 tearDown = setUp
122
123 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Christian Heimesfdab48e2008-01-20 09:06:41 +0000128 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000129 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
130 # We must allocate two consecutive file descriptors, otherwise
131 # it will mess up other file descriptors (perhaps even the three
132 # standard ones).
133 second = os.dup(first)
134 try:
135 retries = 0
136 while second != first + 1:
137 os.close(first)
138 retries += 1
139 if retries > 10:
140 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000141 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000142 first, second = second, os.dup(second)
143 finally:
144 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000145 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000146 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000147 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000148
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000149 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000150 def test_rename(self):
151 path = support.TESTFN
152 old = sys.getrefcount(path)
153 self.assertRaises(TypeError, os.rename, path, 0)
154 new = sys.getrefcount(path)
155 self.assertEqual(old, new)
156
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157 def test_read(self):
158 with open(support.TESTFN, "w+b") as fobj:
159 fobj.write(b"spam")
160 fobj.flush()
161 fd = fobj.fileno()
162 os.lseek(fd, 0, 0)
163 s = os.read(fd, 4)
164 self.assertEqual(type(s), bytes)
165 self.assertEqual(s, b"spam")
166
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200167 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200168 # Skip the test on 32-bit platforms: the number of bytes must fit in a
169 # Py_ssize_t type
170 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
171 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200172 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
173 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200174 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100175 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200176
177 # Issue #21932: Make sure that os.read() does not raise an
178 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200179 with open(support.TESTFN, "rb") as fp:
180 data = os.read(fp.fileno(), size)
181
182 # The test does not try to read more than 2 GB at once because the
183 # operating system is free to return less bytes than requested.
184 self.assertEqual(data, b'test')
185
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000186 def test_write(self):
187 # os.write() accepts bytes- and buffer-like objects but not strings
188 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
189 self.assertRaises(TypeError, os.write, fd, "beans")
190 os.write(fd, b"bacon\n")
191 os.write(fd, bytearray(b"eggs\n"))
192 os.write(fd, memoryview(b"spam\n"))
193 os.close(fd)
194 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000195 self.assertEqual(fobj.read().splitlines(),
196 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000197
Victor Stinnere0daff12011-03-20 23:36:35 +0100198 def write_windows_console(self, *args):
199 retcode = subprocess.call(args,
200 # use a new console to not flood the test output
201 creationflags=subprocess.CREATE_NEW_CONSOLE,
202 # use a shell to hide the console window (SW_HIDE)
203 shell=True)
204 self.assertEqual(retcode, 0)
205
206 @unittest.skipUnless(sys.platform == 'win32',
207 'test specific to the Windows console')
208 def test_write_windows_console(self):
209 # Issue #11395: the Windows console returns an error (12: not enough
210 # space error) on writing into stdout if stdout mode is binary and the
211 # length is greater than 66,000 bytes (or less, depending on heap
212 # usage).
213 code = "print('x' * 100000)"
214 self.write_windows_console(sys.executable, "-c", code)
215 self.write_windows_console(sys.executable, "-u", "-c", code)
216
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000217 def fdopen_helper(self, *args):
218 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200219 f = os.fdopen(fd, *args)
220 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000221
222 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200223 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
224 os.close(fd)
225
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000226 self.fdopen_helper()
227 self.fdopen_helper('r')
228 self.fdopen_helper('r', 100)
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 def test_replace(self):
231 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100232 self.addCleanup(support.unlink, support.TESTFN)
233 self.addCleanup(support.unlink, TESTFN2)
234
235 create_file(support.TESTFN, b"1")
236 create_file(TESTFN2, b"2")
237
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100238 os.replace(support.TESTFN, TESTFN2)
239 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
240 with open(TESTFN2, 'r') as f:
241 self.assertEqual(f.read(), "1")
242
Martin Panterbf19d162015-09-09 01:01:13 +0000243 def test_open_keywords(self):
244 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
245 dir_fd=None)
246 os.close(f)
247
248 def test_symlink_keywords(self):
249 symlink = support.get_attribute(os, "symlink")
250 try:
251 symlink(src='target', dst=support.TESTFN,
252 target_is_directory=False, dir_fd=None)
253 except (NotImplementedError, OSError):
254 pass # No OS support or unprivileged user
255
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257# Test attributes on return values from os.*stat* family.
258class StatAttributeTests(unittest.TestCase):
259 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200260 self.fname = support.TESTFN
261 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100262 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Serhiy Storchaka43767632013-11-03 21:31:38 +0200264 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000265 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result[stat.ST_SIZE], 3)
270 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 # Make sure all the attributes are there
273 members = dir(result)
274 for name in dir(stat):
275 if name[:3] == 'ST_':
276 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000277 if name.endswith("TIME"):
278 def trunc(x): return int(x)
279 else:
280 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000281 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Larry Hastings6fe20b32012-04-19 15:07:49 -0700285 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700286 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700287 for name in 'st_atime st_mtime st_ctime'.split():
288 floaty = int(getattr(result, name) * 100000)
289 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700290 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700291
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except IndexError:
296 pass
297
298 # Make sure that assignment fails
299 try:
300 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000302 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 pass
304
305 try:
306 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000308 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309 pass
310
311 try:
312 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200313 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000314 except AttributeError:
315 pass
316
317 # Use the stat_result constructor with a too-short tuple.
318 try:
319 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200320 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 except TypeError:
322 pass
323
Ezio Melotti42da6632011-03-15 05:18:48 +0200324 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 try:
326 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
327 except TypeError:
328 pass
329
Antoine Pitrou38425292010-09-21 18:19:07 +0000330 def test_stat_attributes(self):
331 self.check_stat_attributes(self.fname)
332
333 def test_stat_attributes_bytes(self):
334 try:
335 fname = self.fname.encode(sys.getfilesystemencoding())
336 except UnicodeEncodeError:
337 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700338 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000339
Christian Heimes25827622013-10-12 01:27:08 +0200340 def test_stat_result_pickle(self):
341 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200342 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
343 p = pickle.dumps(result, proto)
344 self.assertIn(b'stat_result', p)
345 if proto < 4:
346 self.assertIn(b'cos\nstat_result\n', p)
347 unpickled = pickle.loads(p)
348 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200349
Serhiy Storchaka43767632013-11-03 21:31:38 +0200350 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000352 try:
353 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000354 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000355 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200357 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000358
359 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000362 # Make sure all the attributes are there.
363 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
364 'ffree', 'favail', 'flag', 'namemax')
365 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000366 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000367
368 # Make sure that assignment really fails
369 try:
370 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000372 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000373 pass
374
375 try:
376 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except AttributeError:
379 pass
380
381 # Use the constructor with a too-short tuple.
382 try:
383 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200384 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385 except TypeError:
386 pass
387
Ezio Melotti42da6632011-03-15 05:18:48 +0200388 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 try:
390 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
391 except TypeError:
392 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000393
Christian Heimes25827622013-10-12 01:27:08 +0200394 @unittest.skipUnless(hasattr(os, 'statvfs'),
395 "need os.statvfs()")
396 def test_statvfs_result_pickle(self):
397 try:
398 result = os.statvfs(self.fname)
399 except OSError as e:
400 # On AtheOS, glibc always returns ENOSYS
401 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200402 self.skipTest('os.statvfs() failed with ENOSYS')
403
Serhiy Storchakabad12572014-12-15 14:03:42 +0200404 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405 p = pickle.dumps(result, proto)
406 self.assertIn(b'statvfs_result', p)
407 if proto < 4:
408 self.assertIn(b'cos\nstatvfs_result\n', p)
409 unpickled = pickle.loads(p)
410 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200411
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
413 def test_1686475(self):
414 # Verify that an open file can be stat'ed
415 try:
416 os.stat(r"c:\pagefile.sys")
417 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600418 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200419 except OSError as e:
420 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000421
Serhiy Storchaka43767632013-11-03 21:31:38 +0200422 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
423 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
424 def test_15261(self):
425 # Verify that stat'ing a closed fd does not cause crash
426 r, w = os.pipe()
427 try:
428 os.stat(r) # should not raise error
429 finally:
430 os.close(r)
431 os.close(w)
432 with self.assertRaises(OSError) as ctx:
433 os.stat(r)
434 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100435
Zachary Ware63f277b2014-06-19 09:46:37 -0500436 def check_file_attributes(self, result):
437 self.assertTrue(hasattr(result, 'st_file_attributes'))
438 self.assertTrue(isinstance(result.st_file_attributes, int))
439 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
440
441 @unittest.skipUnless(sys.platform == "win32",
442 "st_file_attributes is Win32 specific")
443 def test_file_attributes(self):
444 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
445 result = os.stat(self.fname)
446 self.check_file_attributes(result)
447 self.assertEqual(
448 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
449 0)
450
451 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200452 dirname = support.TESTFN + "dir"
453 os.mkdir(dirname)
454 self.addCleanup(os.rmdir, dirname)
455
456 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500457 self.check_file_attributes(result)
458 self.assertEqual(
459 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
460 stat.FILE_ATTRIBUTE_DIRECTORY)
461
Berker Peksag0b4dc482016-09-17 15:49:59 +0300462 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
463 def test_access_denied(self):
464 # Default to FindFirstFile WIN32_FIND_DATA when access is
465 # denied. See issue 28075.
466 # os.environ['TEMP'] should be located on a volume that
467 # supports file ACLs.
468 fname = os.path.join(os.environ['TEMP'], self.fname)
469 self.addCleanup(support.unlink, fname)
470 create_file(fname, b'ABC')
471 # Deny the right to [S]YNCHRONIZE on the file to
472 # force CreateFile to fail with ERROR_ACCESS_DENIED.
473 DETACHED_PROCESS = 8
474 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500475 # bpo-30584: Use security identifier *S-1-5-32-545 instead
476 # of localized "Users" to not depend on the locale.
477 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300478 creationflags=DETACHED_PROCESS
479 )
480 result = os.stat(fname)
481 self.assertNotEqual(result.st_size, 0)
482
Victor Stinner47aacc82015-06-12 17:26:23 +0200483
484class UtimeTests(unittest.TestCase):
485 def setUp(self):
486 self.dirname = support.TESTFN
487 self.fname = os.path.join(self.dirname, "f1")
488
489 self.addCleanup(support.rmtree, self.dirname)
490 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100491 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200492
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200493 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100494 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200495 os.stat_float_times(state)
496
Victor Stinner47aacc82015-06-12 17:26:23 +0200497 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100498 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200499 old_float_times = os.stat_float_times(-1)
500 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200501
502 os.stat_float_times(True)
503
504 def support_subsecond(self, filename):
505 # Heuristic to check if the filesystem supports timestamp with
506 # subsecond resolution: check if float and int timestamps are different
507 st = os.stat(filename)
508 return ((st.st_atime != st[7])
509 or (st.st_mtime != st[8])
510 or (st.st_ctime != st[9]))
511
512 def _test_utime(self, set_time, filename=None):
513 if not filename:
514 filename = self.fname
515
516 support_subsecond = self.support_subsecond(filename)
517 if support_subsecond:
518 # Timestamp with a resolution of 1 microsecond (10^-6).
519 #
520 # The resolution of the C internal function used by os.utime()
521 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
522 # test with a resolution of 1 ns requires more work:
523 # see the issue #15745.
524 atime_ns = 1002003000 # 1.002003 seconds
525 mtime_ns = 4005006000 # 4.005006 seconds
526 else:
527 # use a resolution of 1 second
528 atime_ns = 5 * 10**9
529 mtime_ns = 8 * 10**9
530
531 set_time(filename, (atime_ns, mtime_ns))
532 st = os.stat(filename)
533
534 if support_subsecond:
535 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
536 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
537 else:
538 self.assertEqual(st.st_atime, atime_ns * 1e-9)
539 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
540 self.assertEqual(st.st_atime_ns, atime_ns)
541 self.assertEqual(st.st_mtime_ns, mtime_ns)
542
543 def test_utime(self):
544 def set_time(filename, ns):
545 # test the ns keyword parameter
546 os.utime(filename, ns=ns)
547 self._test_utime(set_time)
548
549 @staticmethod
550 def ns_to_sec(ns):
551 # Convert a number of nanosecond (int) to a number of seconds (float).
552 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
553 # issue, os.utime() rounds towards minus infinity.
554 return (ns * 1e-9) + 0.5e-9
555
556 def test_utime_by_indexed(self):
557 # pass times as floating point seconds as the second indexed parameter
558 def set_time(filename, ns):
559 atime_ns, mtime_ns = ns
560 atime = self.ns_to_sec(atime_ns)
561 mtime = self.ns_to_sec(mtime_ns)
562 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
563 # or utime(time_t)
564 os.utime(filename, (atime, mtime))
565 self._test_utime(set_time)
566
567 def test_utime_by_times(self):
568 def set_time(filename, ns):
569 atime_ns, mtime_ns = ns
570 atime = self.ns_to_sec(atime_ns)
571 mtime = self.ns_to_sec(mtime_ns)
572 # test the times keyword parameter
573 os.utime(filename, times=(atime, mtime))
574 self._test_utime(set_time)
575
576 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
577 "follow_symlinks support for utime required "
578 "for this test.")
579 def test_utime_nofollow_symlinks(self):
580 def set_time(filename, ns):
581 # use follow_symlinks=False to test utimensat(timespec)
582 # or lutimes(timeval)
583 os.utime(filename, ns=ns, follow_symlinks=False)
584 self._test_utime(set_time)
585
586 @unittest.skipUnless(os.utime in os.supports_fd,
587 "fd support for utime required for this test.")
588 def test_utime_fd(self):
589 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100590 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200591 # use a file descriptor to test futimens(timespec)
592 # or futimes(timeval)
593 os.utime(fp.fileno(), ns=ns)
594 self._test_utime(set_time)
595
596 @unittest.skipUnless(os.utime in os.supports_dir_fd,
597 "dir_fd support for utime required for this test.")
598 def test_utime_dir_fd(self):
599 def set_time(filename, ns):
600 dirname, name = os.path.split(filename)
601 dirfd = os.open(dirname, os.O_RDONLY)
602 try:
603 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
604 os.utime(name, dir_fd=dirfd, ns=ns)
605 finally:
606 os.close(dirfd)
607 self._test_utime(set_time)
608
609 def test_utime_directory(self):
610 def set_time(filename, ns):
611 # test calling os.utime() on a directory
612 os.utime(filename, ns=ns)
613 self._test_utime(set_time, filename=self.dirname)
614
615 def _test_utime_current(self, set_time):
616 # Get the system clock
617 current = time.time()
618
619 # Call os.utime() to set the timestamp to the current system clock
620 set_time(self.fname)
621
622 if not self.support_subsecond(self.fname):
623 delta = 1.0
Victor Stinnerc94caca2017-06-13 23:48:27 +0200624 elif os.name == 'nt':
625 # On Windows, the usual resolution of time.time() is 15.6 ms.
626 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
627 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200628 else:
Victor Stinner3402f722017-06-14 11:55:17 +0200629 # bpo-30649: PPC64 Fedora 3.x buildbot requires
630 # at least a delta of 14 ms
631 delta = 0.020
Victor Stinner47aacc82015-06-12 17:26:23 +0200632 st = os.stat(self.fname)
633 msg = ("st_time=%r, current=%r, dt=%r"
634 % (st.st_mtime, current, st.st_mtime - current))
635 self.assertAlmostEqual(st.st_mtime, current,
636 delta=delta, msg=msg)
637
638 def test_utime_current(self):
639 def set_time(filename):
640 # Set to the current time in the new way
641 os.utime(self.fname)
642 self._test_utime_current(set_time)
643
644 def test_utime_current_old(self):
645 def set_time(filename):
646 # Set to the current time in the old explicit way.
647 os.utime(self.fname, None)
648 self._test_utime_current(set_time)
649
650 def get_file_system(self, path):
651 if sys.platform == 'win32':
652 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
653 import ctypes
654 kernel32 = ctypes.windll.kernel32
655 buf = ctypes.create_unicode_buffer("", 100)
656 ok = kernel32.GetVolumeInformationW(root, None, 0,
657 None, None, None,
658 buf, len(buf))
659 if ok:
660 return buf.value
661 # return None if the filesystem is unknown
662
663 def test_large_time(self):
664 # Many filesystems are limited to the year 2038. At least, the test
665 # pass with NTFS filesystem.
666 if self.get_file_system(self.dirname) != "NTFS":
667 self.skipTest("requires NTFS")
668
669 large = 5000000000 # some day in 2128
670 os.utime(self.fname, (large, large))
671 self.assertEqual(os.stat(self.fname).st_mtime, large)
672
673 def test_utime_invalid_arguments(self):
674 # seconds and nanoseconds parameters are mutually exclusive
675 with self.assertRaises(ValueError):
676 os.utime(self.fname, (5, 5), ns=(5, 5))
677
678
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000679from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000680
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000681class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000682 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000683 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000684
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000685 def setUp(self):
686 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000687 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000688 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000689 for key, value in self._reference().items():
690 os.environ[key] = value
691
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000692 def tearDown(self):
693 os.environ.clear()
694 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000695 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000696 os.environb.clear()
697 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000698
Christian Heimes90333392007-11-01 19:08:42 +0000699 def _reference(self):
700 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
701
702 def _empty_mapping(self):
703 os.environ.clear()
704 return os.environ
705
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000706 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200707 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
708 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000709 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000710 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300711 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200712 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300713 value = popen.read().strip()
714 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000715
Xavier de Gayed1415312016-07-22 12:15:29 +0200716 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
717 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000718 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200719 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
720 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300721 it = iter(popen)
722 self.assertEqual(next(it), "line1\n")
723 self.assertEqual(next(it), "line2\n")
724 self.assertEqual(next(it), "line3\n")
725 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000726
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000727 # Verify environ keys and values from the OS are of the
728 # correct str type.
729 def test_keyvalue_types(self):
730 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000731 self.assertEqual(type(key), str)
732 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000733
Christian Heimes90333392007-11-01 19:08:42 +0000734 def test_items(self):
735 for key, value in self._reference().items():
736 self.assertEqual(os.environ.get(key), value)
737
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000738 # Issue 7310
739 def test___repr__(self):
740 """Check that the repr() of os.environ looks like environ({...})."""
741 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000742 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
743 '{!r}: {!r}'.format(key, value)
744 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000745
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000746 def test_get_exec_path(self):
747 defpath_list = os.defpath.split(os.pathsep)
748 test_path = ['/monty', '/python', '', '/flying/circus']
749 test_env = {'PATH': os.pathsep.join(test_path)}
750
751 saved_environ = os.environ
752 try:
753 os.environ = dict(test_env)
754 # Test that defaulting to os.environ works.
755 self.assertSequenceEqual(test_path, os.get_exec_path())
756 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
757 finally:
758 os.environ = saved_environ
759
760 # No PATH environment variable
761 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
762 # Empty PATH environment variable
763 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
764 # Supplied PATH environment variable
765 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
766
Victor Stinnerb745a742010-05-18 17:17:23 +0000767 if os.supports_bytes_environ:
768 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000769 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000770 # ignore BytesWarning warning
771 with warnings.catch_warnings(record=True):
772 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000773 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000774 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000775 pass
776 else:
777 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000778
779 # bytes key and/or value
780 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
781 ['abc'])
782 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
783 ['abc'])
784 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
785 ['abc'])
786
787 @unittest.skipUnless(os.supports_bytes_environ,
788 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000789 def test_environb(self):
790 # os.environ -> os.environb
791 value = 'euro\u20ac'
792 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000793 value_bytes = value.encode(sys.getfilesystemencoding(),
794 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000795 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000796 msg = "U+20AC character is not encodable to %s" % (
797 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000798 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000799 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000800 self.assertEqual(os.environ['unicode'], value)
801 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000802
803 # os.environb -> os.environ
804 value = b'\xff'
805 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000807 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000808 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000809
Charles-François Natali2966f102011-11-26 11:32:46 +0100810 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
811 # #13415).
812 @support.requires_freebsd_version(7)
813 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100814 def test_unset_error(self):
815 if sys.platform == "win32":
816 # an environment variable is limited to 32,767 characters
817 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100818 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100819 else:
820 # "=" is not allowed in a variable name
821 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100822 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100823
Victor Stinner6d101392013-04-14 16:35:04 +0200824 def test_key_type(self):
825 missing = 'missingkey'
826 self.assertNotIn(missing, os.environ)
827
Victor Stinner839e5ea2013-04-14 16:43:03 +0200828 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200829 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200830 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200831 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200832
Victor Stinner839e5ea2013-04-14 16:43:03 +0200833 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200834 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200835 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200836 self.assertTrue(cm.exception.__suppress_context__)
837
Victor Stinner6d101392013-04-14 16:35:04 +0200838
Tim Petersc4e09402003-04-25 07:11:48 +0000839class WalkTests(unittest.TestCase):
840 """Tests for os.walk()."""
841
Victor Stinner0561c532015-03-12 10:28:24 +0100842 # Wrapper to hide minor differences between os.walk and os.fwalk
843 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200844 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200845 if 'follow_symlinks' in kwargs:
846 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200847 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100848
Charles-François Natali7372b062012-02-05 15:15:38 +0100849 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100850 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100851 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000852
853 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000854 # TESTFN/
855 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000856 # tmp1
857 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000858 # tmp2
859 # SUB11/ no kids
860 # SUB2/ a file kid and a dirsymlink kid
861 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300862 # SUB21/ not readable
863 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000864 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200865 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300866 # broken_link2
867 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000868 # TEST2/
869 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100870 self.walk_path = join(support.TESTFN, "TEST1")
871 self.sub1_path = join(self.walk_path, "SUB1")
872 self.sub11_path = join(self.sub1_path, "SUB11")
873 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300874 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100875 tmp1_path = join(self.walk_path, "tmp1")
876 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000877 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300878 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100879 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000880 t2_path = join(support.TESTFN, "TEST2")
881 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200882 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300883 broken_link2_path = join(sub2_path, "broken_link2")
884 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000885
886 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100887 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000888 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300889 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000890 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100891
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300892 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100893 with open(path, "x") as f:
894 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000895
Victor Stinner0561c532015-03-12 10:28:24 +0100896 if support.can_symlink():
897 os.symlink(os.path.abspath(t2_path), self.link_path)
898 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300899 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
900 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300901 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300902 ["broken_link", "broken_link2", "broken_link3",
903 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100904 else:
905 self.sub2_tree = (sub2_path, [], ["tmp3"])
906
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300907 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300908 try:
909 os.listdir(sub21_path)
910 except PermissionError:
911 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
912 else:
913 os.chmod(sub21_path, stat.S_IRWXU)
914 os.unlink(tmp5_path)
915 os.rmdir(sub21_path)
916 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300917
Victor Stinner0561c532015-03-12 10:28:24 +0100918 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000919 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300920 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100921
Tim Petersc4e09402003-04-25 07:11:48 +0000922 self.assertEqual(len(all), 4)
923 # We can't know which order SUB1 and SUB2 will appear in.
924 # Not flipped: TESTFN, SUB1, SUB11, SUB2
925 # flipped: TESTFN, SUB2, SUB1, SUB11
926 flipped = all[0][1][0] != "SUB1"
927 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200928 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300929 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100930 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
931 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
932 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
933 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000934
Brett Cannon3f9183b2016-08-26 14:44:48 -0700935 def test_walk_prune(self, walk_path=None):
936 if walk_path is None:
937 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000938 # Prune the search.
939 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700940 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000941 all.append((root, dirs, files))
942 # Don't descend into SUB1.
943 if 'SUB1' in dirs:
944 # Note that this also mutates the dirs we appended to all!
945 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000946
Victor Stinner0561c532015-03-12 10:28:24 +0100947 self.assertEqual(len(all), 2)
948 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700949 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100950
951 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300952 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100953 self.assertEqual(all[1], self.sub2_tree)
954
Brett Cannon3f9183b2016-08-26 14:44:48 -0700955 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700956 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700957
Victor Stinner0561c532015-03-12 10:28:24 +0100958 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000959 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100960 all = list(self.walk(self.walk_path, topdown=False))
961
Victor Stinner53b0a412016-03-26 01:12:36 +0100962 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000963 # We can't know which order SUB1 and SUB2 will appear in.
964 # Not flipped: SUB11, SUB1, SUB2, TESTFN
965 # flipped: SUB2, SUB11, SUB1, TESTFN
966 flipped = all[3][1][0] != "SUB1"
967 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200968 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300969 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100970 self.assertEqual(all[3],
971 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
972 self.assertEqual(all[flipped],
973 (self.sub11_path, [], []))
974 self.assertEqual(all[flipped + 1],
975 (self.sub1_path, ["SUB11"], ["tmp2"]))
976 self.assertEqual(all[2 - 2 * flipped],
977 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000978
Victor Stinner0561c532015-03-12 10:28:24 +0100979 def test_walk_symlink(self):
980 if not support.can_symlink():
981 self.skipTest("need symlink support")
982
983 # Walk, following symlinks.
984 walk_it = self.walk(self.walk_path, follow_symlinks=True)
985 for root, dirs, files in walk_it:
986 if root == self.link_path:
987 self.assertEqual(dirs, [])
988 self.assertEqual(files, ["tmp4"])
989 break
990 else:
991 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000992
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200993 def test_walk_bad_dir(self):
994 # Walk top-down.
995 errors = []
996 walk_it = self.walk(self.walk_path, onerror=errors.append)
997 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300998 self.assertEqual(errors, [])
999 dir1 = 'SUB1'
1000 path1 = os.path.join(root, dir1)
1001 path1new = os.path.join(root, dir1 + '.new')
1002 os.rename(path1, path1new)
1003 try:
1004 roots = [r for r, d, f in walk_it]
1005 self.assertTrue(errors)
1006 self.assertNotIn(path1, roots)
1007 self.assertNotIn(path1new, roots)
1008 for dir2 in dirs:
1009 if dir2 != dir1:
1010 self.assertIn(os.path.join(root, dir2), roots)
1011 finally:
1012 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001013
Charles-François Natali7372b062012-02-05 15:15:38 +01001014
1015@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1016class FwalkTests(WalkTests):
1017 """Tests for os.fwalk()."""
1018
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001019 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001020 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001021 yield (root, dirs, files)
1022
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001023 def fwalk(self, *args, **kwargs):
1024 return os.fwalk(*args, **kwargs)
1025
Larry Hastingsc48fe982012-06-25 04:49:05 -07001026 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1027 """
1028 compare with walk() results.
1029 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001030 walk_kwargs = walk_kwargs.copy()
1031 fwalk_kwargs = fwalk_kwargs.copy()
1032 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1033 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1034 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001035
Charles-François Natali7372b062012-02-05 15:15:38 +01001036 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001037 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001038 expected[root] = (set(dirs), set(files))
1039
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001040 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001041 self.assertIn(root, expected)
1042 self.assertEqual(expected[root], (set(dirs), set(files)))
1043
Larry Hastingsc48fe982012-06-25 04:49:05 -07001044 def test_compare_to_walk(self):
1045 kwargs = {'top': support.TESTFN}
1046 self._compare_to_walk(kwargs, kwargs)
1047
Charles-François Natali7372b062012-02-05 15:15:38 +01001048 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001049 try:
1050 fd = os.open(".", os.O_RDONLY)
1051 walk_kwargs = {'top': support.TESTFN}
1052 fwalk_kwargs = walk_kwargs.copy()
1053 fwalk_kwargs['dir_fd'] = fd
1054 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1055 finally:
1056 os.close(fd)
1057
1058 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001059 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001060 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1061 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001062 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001063 # check that the FD is valid
1064 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001065 # redundant check
1066 os.stat(rootfd)
1067 # check that listdir() returns consistent information
1068 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001069
1070 def test_fd_leak(self):
1071 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1072 # we both check that calling fwalk() a large number of times doesn't
1073 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1074 minfd = os.dup(1)
1075 os.close(minfd)
1076 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001077 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001078 pass
1079 newfd = os.dup(1)
1080 self.addCleanup(os.close, newfd)
1081 self.assertEqual(newfd, minfd)
1082
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001083class BytesWalkTests(WalkTests):
1084 """Tests for os.walk() with bytes."""
1085 def walk(self, top, **kwargs):
1086 if 'follow_symlinks' in kwargs:
1087 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1088 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1089 root = os.fsdecode(broot)
1090 dirs = list(map(os.fsdecode, bdirs))
1091 files = list(map(os.fsdecode, bfiles))
1092 yield (root, dirs, files)
1093 bdirs[:] = list(map(os.fsencode, dirs))
1094 bfiles[:] = list(map(os.fsencode, files))
1095
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001096@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1097class BytesFwalkTests(FwalkTests):
1098 """Tests for os.walk() with bytes."""
1099 def fwalk(self, top='.', *args, **kwargs):
1100 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1101 root = os.fsdecode(broot)
1102 dirs = list(map(os.fsdecode, bdirs))
1103 files = list(map(os.fsdecode, bfiles))
1104 yield (root, dirs, files, topfd)
1105 bdirs[:] = list(map(os.fsencode, dirs))
1106 bfiles[:] = list(map(os.fsencode, files))
1107
Charles-François Natali7372b062012-02-05 15:15:38 +01001108
Guido van Rossume7ba4952007-06-06 23:52:48 +00001109class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001110 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001111 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001112
1113 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001114 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001115 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1116 os.makedirs(path) # Should work
1117 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1118 os.makedirs(path)
1119
1120 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001121 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001122 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1123 os.makedirs(path)
1124 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1125 'dir5', 'dir6')
1126 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001127
Serhiy Storchakae304e332017-03-24 13:27:42 +02001128 def test_mode(self):
1129 with support.temp_umask(0o002):
1130 base = support.TESTFN
1131 parent = os.path.join(base, 'dir1')
1132 path = os.path.join(parent, 'dir2')
1133 os.makedirs(path, 0o555)
1134 self.assertTrue(os.path.exists(path))
1135 self.assertTrue(os.path.isdir(path))
1136 if os.name != 'nt':
1137 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1138 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1139
Terry Reedy5a22b652010-12-02 07:05:56 +00001140 def test_exist_ok_existing_directory(self):
1141 path = os.path.join(support.TESTFN, 'dir1')
1142 mode = 0o777
1143 old_mask = os.umask(0o022)
1144 os.makedirs(path, mode)
1145 self.assertRaises(OSError, os.makedirs, path, mode)
1146 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001147 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001148 os.makedirs(path, mode=mode, exist_ok=True)
1149 os.umask(old_mask)
1150
Martin Pantera82642f2015-11-19 04:48:44 +00001151 # Issue #25583: A drive root could raise PermissionError on Windows
1152 os.makedirs(os.path.abspath('/'), exist_ok=True)
1153
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001154 def test_exist_ok_s_isgid_directory(self):
1155 path = os.path.join(support.TESTFN, 'dir1')
1156 S_ISGID = stat.S_ISGID
1157 mode = 0o777
1158 old_mask = os.umask(0o022)
1159 try:
1160 existing_testfn_mode = stat.S_IMODE(
1161 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001162 try:
1163 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001164 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001165 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001166 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1167 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1168 # The os should apply S_ISGID from the parent dir for us, but
1169 # this test need not depend on that behavior. Be explicit.
1170 os.makedirs(path, mode | S_ISGID)
1171 # http://bugs.python.org/issue14992
1172 # Should not fail when the bit is already set.
1173 os.makedirs(path, mode, exist_ok=True)
1174 # remove the bit.
1175 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001176 # May work even when the bit is not already set when demanded.
1177 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001178 finally:
1179 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001180
1181 def test_exist_ok_existing_regular_file(self):
1182 base = support.TESTFN
1183 path = os.path.join(support.TESTFN, 'dir1')
1184 f = open(path, 'w')
1185 f.write('abc')
1186 f.close()
1187 self.assertRaises(OSError, os.makedirs, path)
1188 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1189 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1190 os.remove(path)
1191
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001192 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001193 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001194 'dir4', 'dir5', 'dir6')
1195 # If the tests failed, the bottom-most directory ('../dir6')
1196 # may not have been created, so we look for the outermost directory
1197 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001198 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001199 path = os.path.dirname(path)
1200
1201 os.removedirs(path)
1202
Andrew Svetlov405faed2012-12-25 12:18:09 +02001203
R David Murrayf2ad1732014-12-25 18:36:56 -05001204@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1205class ChownFileTests(unittest.TestCase):
1206
Berker Peksag036a71b2015-07-21 09:29:48 +03001207 @classmethod
1208 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001209 os.mkdir(support.TESTFN)
1210
1211 def test_chown_uid_gid_arguments_must_be_index(self):
1212 stat = os.stat(support.TESTFN)
1213 uid = stat.st_uid
1214 gid = stat.st_gid
1215 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1216 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1217 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1218 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1219 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1220
1221 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1222 def test_chown(self):
1223 gid_1, gid_2 = groups[:2]
1224 uid = os.stat(support.TESTFN).st_uid
1225 os.chown(support.TESTFN, uid, gid_1)
1226 gid = os.stat(support.TESTFN).st_gid
1227 self.assertEqual(gid, gid_1)
1228 os.chown(support.TESTFN, uid, gid_2)
1229 gid = os.stat(support.TESTFN).st_gid
1230 self.assertEqual(gid, gid_2)
1231
1232 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1233 "test needs root privilege and more than one user")
1234 def test_chown_with_root(self):
1235 uid_1, uid_2 = all_users[:2]
1236 gid = os.stat(support.TESTFN).st_gid
1237 os.chown(support.TESTFN, uid_1, gid)
1238 uid = os.stat(support.TESTFN).st_uid
1239 self.assertEqual(uid, uid_1)
1240 os.chown(support.TESTFN, uid_2, gid)
1241 uid = os.stat(support.TESTFN).st_uid
1242 self.assertEqual(uid, uid_2)
1243
1244 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1245 "test needs non-root account and more than one user")
1246 def test_chown_without_permission(self):
1247 uid_1, uid_2 = all_users[:2]
1248 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001249 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001250 os.chown(support.TESTFN, uid_1, gid)
1251 os.chown(support.TESTFN, uid_2, gid)
1252
Berker Peksag036a71b2015-07-21 09:29:48 +03001253 @classmethod
1254 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001255 os.rmdir(support.TESTFN)
1256
1257
Andrew Svetlov405faed2012-12-25 12:18:09 +02001258class RemoveDirsTests(unittest.TestCase):
1259 def setUp(self):
1260 os.makedirs(support.TESTFN)
1261
1262 def tearDown(self):
1263 support.rmtree(support.TESTFN)
1264
1265 def test_remove_all(self):
1266 dira = os.path.join(support.TESTFN, 'dira')
1267 os.mkdir(dira)
1268 dirb = os.path.join(dira, 'dirb')
1269 os.mkdir(dirb)
1270 os.removedirs(dirb)
1271 self.assertFalse(os.path.exists(dirb))
1272 self.assertFalse(os.path.exists(dira))
1273 self.assertFalse(os.path.exists(support.TESTFN))
1274
1275 def test_remove_partial(self):
1276 dira = os.path.join(support.TESTFN, 'dira')
1277 os.mkdir(dira)
1278 dirb = os.path.join(dira, 'dirb')
1279 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001280 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001281 os.removedirs(dirb)
1282 self.assertFalse(os.path.exists(dirb))
1283 self.assertTrue(os.path.exists(dira))
1284 self.assertTrue(os.path.exists(support.TESTFN))
1285
1286 def test_remove_nothing(self):
1287 dira = os.path.join(support.TESTFN, 'dira')
1288 os.mkdir(dira)
1289 dirb = os.path.join(dira, 'dirb')
1290 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001291 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001292 with self.assertRaises(OSError):
1293 os.removedirs(dirb)
1294 self.assertTrue(os.path.exists(dirb))
1295 self.assertTrue(os.path.exists(dira))
1296 self.assertTrue(os.path.exists(support.TESTFN))
1297
1298
Guido van Rossume7ba4952007-06-06 23:52:48 +00001299class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001300 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001301 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001302 f.write(b'hello')
1303 f.close()
1304 with open(os.devnull, 'rb') as f:
1305 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001306
Andrew Svetlov405faed2012-12-25 12:18:09 +02001307
Guido van Rossume7ba4952007-06-06 23:52:48 +00001308class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001309 def test_urandom_length(self):
1310 self.assertEqual(len(os.urandom(0)), 0)
1311 self.assertEqual(len(os.urandom(1)), 1)
1312 self.assertEqual(len(os.urandom(10)), 10)
1313 self.assertEqual(len(os.urandom(100)), 100)
1314 self.assertEqual(len(os.urandom(1000)), 1000)
1315
1316 def test_urandom_value(self):
1317 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001318 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001319 data2 = os.urandom(16)
1320 self.assertNotEqual(data1, data2)
1321
1322 def get_urandom_subprocess(self, count):
1323 code = '\n'.join((
1324 'import os, sys',
1325 'data = os.urandom(%s)' % count,
1326 'sys.stdout.buffer.write(data)',
1327 'sys.stdout.buffer.flush()'))
1328 out = assert_python_ok('-c', code)
1329 stdout = out[1]
1330 self.assertEqual(len(stdout), 16)
1331 return stdout
1332
1333 def test_urandom_subprocess(self):
1334 data1 = self.get_urandom_subprocess(16)
1335 data2 = self.get_urandom_subprocess(16)
1336 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001337
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001338
Victor Stinner9b1f4742016-09-06 16:18:52 -07001339@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1340class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001341 @classmethod
1342 def setUpClass(cls):
1343 try:
1344 os.getrandom(1)
1345 except OSError as exc:
1346 if exc.errno == errno.ENOSYS:
1347 # Python compiled on a more recent Linux version
1348 # than the current Linux kernel
1349 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1350 else:
1351 raise
1352
Victor Stinner9b1f4742016-09-06 16:18:52 -07001353 def test_getrandom_type(self):
1354 data = os.getrandom(16)
1355 self.assertIsInstance(data, bytes)
1356 self.assertEqual(len(data), 16)
1357
1358 def test_getrandom0(self):
1359 empty = os.getrandom(0)
1360 self.assertEqual(empty, b'')
1361
1362 def test_getrandom_random(self):
1363 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1364
1365 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1366 # resource /dev/random
1367
1368 def test_getrandom_nonblock(self):
1369 # The call must not fail. Check also that the flag exists
1370 try:
1371 os.getrandom(1, os.GRND_NONBLOCK)
1372 except BlockingIOError:
1373 # System urandom is not initialized yet
1374 pass
1375
1376 def test_getrandom_value(self):
1377 data1 = os.getrandom(16)
1378 data2 = os.getrandom(16)
1379 self.assertNotEqual(data1, data2)
1380
1381
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001382# os.urandom() doesn't use a file descriptor when it is implemented with the
1383# getentropy() function, the getrandom() function or the getrandom() syscall
1384OS_URANDOM_DONT_USE_FD = (
1385 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1386 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1387 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001388
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001389@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1390 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001391class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001392 @unittest.skipUnless(resource, "test requires the resource module")
1393 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001394 # Check urandom() failing when it is not able to open /dev/random.
1395 # We spawn a new process to make the test more robust (if getrlimit()
1396 # failed to restore the file descriptor limit after this, the whole
1397 # test suite would crash; this actually happened on the OS X Tiger
1398 # buildbot).
1399 code = """if 1:
1400 import errno
1401 import os
1402 import resource
1403
1404 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1405 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1406 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001407 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001408 except OSError as e:
1409 assert e.errno == errno.EMFILE, e.errno
1410 else:
1411 raise AssertionError("OSError not raised")
1412 """
1413 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001414
Antoine Pitroue472aea2014-04-26 14:33:03 +02001415 def test_urandom_fd_closed(self):
1416 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1417 # closed.
1418 code = """if 1:
1419 import os
1420 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001421 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001422 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001423 with test.support.SuppressCrashReport():
1424 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001425 sys.stdout.buffer.write(os.urandom(4))
1426 """
1427 rc, out, err = assert_python_ok('-Sc', code)
1428
1429 def test_urandom_fd_reopened(self):
1430 # Issue #21207: urandom() should detect its fd to /dev/urandom
1431 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001432 self.addCleanup(support.unlink, support.TESTFN)
1433 create_file(support.TESTFN, b"x" * 256)
1434
Antoine Pitroue472aea2014-04-26 14:33:03 +02001435 code = """if 1:
1436 import os
1437 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001438 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001439 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001440 with test.support.SuppressCrashReport():
1441 for fd in range(3, 256):
1442 try:
1443 os.close(fd)
1444 except OSError:
1445 pass
1446 else:
1447 # Found the urandom fd (XXX hopefully)
1448 break
1449 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001450 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001451 new_fd = f.fileno()
1452 # Issue #26935: posix allows new_fd and fd to be equal but
1453 # some libc implementations have dup2 return an error in this
1454 # case.
1455 if new_fd != fd:
1456 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001457 sys.stdout.buffer.write(os.urandom(4))
1458 sys.stdout.buffer.write(os.urandom(4))
1459 """.format(TESTFN=support.TESTFN)
1460 rc, out, err = assert_python_ok('-Sc', code)
1461 self.assertEqual(len(out), 8)
1462 self.assertNotEqual(out[0:4], out[4:8])
1463 rc, out2, err2 = assert_python_ok('-Sc', code)
1464 self.assertEqual(len(out2), 8)
1465 self.assertNotEqual(out2, out)
1466
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001467
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001468@contextlib.contextmanager
1469def _execvpe_mockup(defpath=None):
1470 """
1471 Stubs out execv and execve functions when used as context manager.
1472 Records exec calls. The mock execv and execve functions always raise an
1473 exception as they would normally never return.
1474 """
1475 # A list of tuples containing (function name, first arg, args)
1476 # of calls to execv or execve that have been made.
1477 calls = []
1478
1479 def mock_execv(name, *args):
1480 calls.append(('execv', name, args))
1481 raise RuntimeError("execv called")
1482
1483 def mock_execve(name, *args):
1484 calls.append(('execve', name, args))
1485 raise OSError(errno.ENOTDIR, "execve called")
1486
1487 try:
1488 orig_execv = os.execv
1489 orig_execve = os.execve
1490 orig_defpath = os.defpath
1491 os.execv = mock_execv
1492 os.execve = mock_execve
1493 if defpath is not None:
1494 os.defpath = defpath
1495 yield calls
1496 finally:
1497 os.execv = orig_execv
1498 os.execve = orig_execve
1499 os.defpath = orig_defpath
1500
Victor Stinner4659ccf2016-09-14 10:57:00 +02001501
Guido van Rossume7ba4952007-06-06 23:52:48 +00001502class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001503 @unittest.skipIf(USING_LINUXTHREADS,
1504 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001505 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001506 self.assertRaises(OSError, os.execvpe, 'no such app-',
1507 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001508
Steve Dowerbce26262016-11-19 19:17:26 -08001509 def test_execv_with_bad_arglist(self):
1510 self.assertRaises(ValueError, os.execv, 'notepad', ())
1511 self.assertRaises(ValueError, os.execv, 'notepad', [])
1512 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1513 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1514
Thomas Heller6790d602007-08-30 17:15:14 +00001515 def test_execvpe_with_bad_arglist(self):
1516 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001517 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1518 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001519
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001520 @unittest.skipUnless(hasattr(os, '_execvpe'),
1521 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001522 def _test_internal_execvpe(self, test_type):
1523 program_path = os.sep + 'absolutepath'
1524 if test_type is bytes:
1525 program = b'executable'
1526 fullpath = os.path.join(os.fsencode(program_path), program)
1527 native_fullpath = fullpath
1528 arguments = [b'progname', 'arg1', 'arg2']
1529 else:
1530 program = 'executable'
1531 arguments = ['progname', 'arg1', 'arg2']
1532 fullpath = os.path.join(program_path, program)
1533 if os.name != "nt":
1534 native_fullpath = os.fsencode(fullpath)
1535 else:
1536 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001537 env = {'spam': 'beans'}
1538
Victor Stinnerb745a742010-05-18 17:17:23 +00001539 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001540 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001541 self.assertRaises(RuntimeError,
1542 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001543 self.assertEqual(len(calls), 1)
1544 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1545
Victor Stinnerb745a742010-05-18 17:17:23 +00001546 # test os._execvpe() with a relative path:
1547 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001548 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001549 self.assertRaises(OSError,
1550 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001551 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001552 self.assertSequenceEqual(calls[0],
1553 ('execve', native_fullpath, (arguments, env)))
1554
1555 # test os._execvpe() with a relative path:
1556 # os.get_exec_path() reads the 'PATH' variable
1557 with _execvpe_mockup() as calls:
1558 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001559 if test_type is bytes:
1560 env_path[b'PATH'] = program_path
1561 else:
1562 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001563 self.assertRaises(OSError,
1564 os._execvpe, program, arguments, env=env_path)
1565 self.assertEqual(len(calls), 1)
1566 self.assertSequenceEqual(calls[0],
1567 ('execve', native_fullpath, (arguments, env_path)))
1568
1569 def test_internal_execvpe_str(self):
1570 self._test_internal_execvpe(str)
1571 if os.name != "nt":
1572 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001573
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001574
Serhiy Storchaka43767632013-11-03 21:31:38 +02001575@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001576class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001577 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001578 try:
1579 os.stat(support.TESTFN)
1580 except FileNotFoundError:
1581 exists = False
1582 except OSError as exc:
1583 exists = True
1584 self.fail("file %s must not exist; os.stat failed with %s"
1585 % (support.TESTFN, exc))
1586 else:
1587 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001588
Thomas Wouters477c8d52006-05-27 19:21:47 +00001589 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001590 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001591
1592 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001593 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001594
1595 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001596 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001597
1598 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001599 self.addCleanup(support.unlink, support.TESTFN)
1600
Victor Stinnere77c9742016-03-25 10:28:23 +01001601 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001602 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001603
1604 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001605 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001606
Thomas Wouters477c8d52006-05-27 19:21:47 +00001607 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001608 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001609
Victor Stinnere77c9742016-03-25 10:28:23 +01001610
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001611class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001612 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001613 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1614 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001615 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001616 def get_single(f):
1617 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001618 if hasattr(os, f):
1619 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001620 return helper
1621 for f in singles:
1622 locals()["test_"+f] = get_single(f)
1623
Benjamin Peterson7522c742009-01-19 21:00:09 +00001624 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001625 try:
1626 f(support.make_bad_fd(), *args)
1627 except OSError as e:
1628 self.assertEqual(e.errno, errno.EBADF)
1629 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001630 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001631 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001632
Serhiy Storchaka43767632013-11-03 21:31:38 +02001633 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001634 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001635 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001636
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001638 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001639 fd = support.make_bad_fd()
1640 # Make sure none of the descriptors we are about to close are
1641 # currently valid (issue 6542).
1642 for i in range(10):
1643 try: os.fstat(fd+i)
1644 except OSError:
1645 pass
1646 else:
1647 break
1648 if i < 2:
1649 raise unittest.SkipTest(
1650 "Unable to acquire a range of invalid file descriptors")
1651 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001652
Serhiy Storchaka43767632013-11-03 21:31:38 +02001653 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001654 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656
Serhiy Storchaka43767632013-11-03 21:31:38 +02001657 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001658 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001659 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001660
Serhiy Storchaka43767632013-11-03 21:31:38 +02001661 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001667 self.check(os.pathconf, "PC_NAME_MAX")
1668 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001669
Serhiy Storchaka43767632013-11-03 21:31:38 +02001670 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001671 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001672 self.check(os.truncate, 0)
1673 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001674
Serhiy Storchaka43767632013-11-03 21:31:38 +02001675 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001676 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001678
Serhiy Storchaka43767632013-11-03 21:31:38 +02001679 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001680 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001682
Victor Stinner57ddf782014-01-08 15:21:28 +01001683 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1684 def test_readv(self):
1685 buf = bytearray(10)
1686 self.check(os.readv, [buf])
1687
Serhiy Storchaka43767632013-11-03 21:31:38 +02001688 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001689 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001691
Serhiy Storchaka43767632013-11-03 21:31:38 +02001692 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001693 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001694 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001695
Victor Stinner57ddf782014-01-08 15:21:28 +01001696 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1697 def test_writev(self):
1698 self.check(os.writev, [b'abc'])
1699
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001700 def test_inheritable(self):
1701 self.check(os.get_inheritable)
1702 self.check(os.set_inheritable, True)
1703
1704 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1705 'needs os.get_blocking() and os.set_blocking()')
1706 def test_blocking(self):
1707 self.check(os.get_blocking)
1708 self.check(os.set_blocking, True)
1709
Brian Curtin1b9df392010-11-24 20:24:31 +00001710
1711class LinkTests(unittest.TestCase):
1712 def setUp(self):
1713 self.file1 = support.TESTFN
1714 self.file2 = os.path.join(support.TESTFN + "2")
1715
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001716 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001717 for file in (self.file1, self.file2):
1718 if os.path.exists(file):
1719 os.unlink(file)
1720
Brian Curtin1b9df392010-11-24 20:24:31 +00001721 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001722 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001723
Steve Dowercc16be82016-09-08 10:35:16 -07001724 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001725 with open(file1, "r") as f1, open(file2, "r") as f2:
1726 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1727
1728 def test_link(self):
1729 self._test_link(self.file1, self.file2)
1730
1731 def test_link_bytes(self):
1732 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1733 bytes(self.file2, sys.getfilesystemencoding()))
1734
Brian Curtinf498b752010-11-30 15:54:04 +00001735 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001736 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001737 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001738 except UnicodeError:
1739 raise unittest.SkipTest("Unable to encode for this platform.")
1740
Brian Curtinf498b752010-11-30 15:54:04 +00001741 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001742 self.file2 = self.file1 + "2"
1743 self._test_link(self.file1, self.file2)
1744
Serhiy Storchaka43767632013-11-03 21:31:38 +02001745@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1746class PosixUidGidTests(unittest.TestCase):
1747 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1748 def test_setuid(self):
1749 if os.getuid() != 0:
1750 self.assertRaises(OSError, os.setuid, 0)
1751 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001752
Serhiy Storchaka43767632013-11-03 21:31:38 +02001753 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1754 def test_setgid(self):
1755 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1756 self.assertRaises(OSError, os.setgid, 0)
1757 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001758
Serhiy Storchaka43767632013-11-03 21:31:38 +02001759 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1760 def test_seteuid(self):
1761 if os.getuid() != 0:
1762 self.assertRaises(OSError, os.seteuid, 0)
1763 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001764
Serhiy Storchaka43767632013-11-03 21:31:38 +02001765 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1766 def test_setegid(self):
1767 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1768 self.assertRaises(OSError, os.setegid, 0)
1769 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001770
Serhiy Storchaka43767632013-11-03 21:31:38 +02001771 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1772 def test_setreuid(self):
1773 if os.getuid() != 0:
1774 self.assertRaises(OSError, os.setreuid, 0, 0)
1775 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1776 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001777
Serhiy Storchaka43767632013-11-03 21:31:38 +02001778 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1779 def test_setreuid_neg1(self):
1780 # Needs to accept -1. We run this in a subprocess to avoid
1781 # altering the test runner's process state (issue8045).
1782 subprocess.check_call([
1783 sys.executable, '-c',
1784 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001785
Serhiy Storchaka43767632013-11-03 21:31:38 +02001786 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1787 def test_setregid(self):
1788 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1789 self.assertRaises(OSError, os.setregid, 0, 0)
1790 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1791 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001792
Serhiy Storchaka43767632013-11-03 21:31:38 +02001793 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1794 def test_setregid_neg1(self):
1795 # Needs to accept -1. We run this in a subprocess to avoid
1796 # altering the test runner's process state (issue8045).
1797 subprocess.check_call([
1798 sys.executable, '-c',
1799 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001800
Serhiy Storchaka43767632013-11-03 21:31:38 +02001801@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1802class Pep383Tests(unittest.TestCase):
1803 def setUp(self):
1804 if support.TESTFN_UNENCODABLE:
1805 self.dir = support.TESTFN_UNENCODABLE
1806 elif support.TESTFN_NONASCII:
1807 self.dir = support.TESTFN_NONASCII
1808 else:
1809 self.dir = support.TESTFN
1810 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001811
Serhiy Storchaka43767632013-11-03 21:31:38 +02001812 bytesfn = []
1813 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001814 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001815 fn = os.fsencode(fn)
1816 except UnicodeEncodeError:
1817 return
1818 bytesfn.append(fn)
1819 add_filename(support.TESTFN_UNICODE)
1820 if support.TESTFN_UNENCODABLE:
1821 add_filename(support.TESTFN_UNENCODABLE)
1822 if support.TESTFN_NONASCII:
1823 add_filename(support.TESTFN_NONASCII)
1824 if not bytesfn:
1825 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001826
Serhiy Storchaka43767632013-11-03 21:31:38 +02001827 self.unicodefn = set()
1828 os.mkdir(self.dir)
1829 try:
1830 for fn in bytesfn:
1831 support.create_empty_file(os.path.join(self.bdir, fn))
1832 fn = os.fsdecode(fn)
1833 if fn in self.unicodefn:
1834 raise ValueError("duplicate filename")
1835 self.unicodefn.add(fn)
1836 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001837 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001838 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001839
Serhiy Storchaka43767632013-11-03 21:31:38 +02001840 def tearDown(self):
1841 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001842
Serhiy Storchaka43767632013-11-03 21:31:38 +02001843 def test_listdir(self):
1844 expected = self.unicodefn
1845 found = set(os.listdir(self.dir))
1846 self.assertEqual(found, expected)
1847 # test listdir without arguments
1848 current_directory = os.getcwd()
1849 try:
1850 os.chdir(os.sep)
1851 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1852 finally:
1853 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001854
Serhiy Storchaka43767632013-11-03 21:31:38 +02001855 def test_open(self):
1856 for fn in self.unicodefn:
1857 f = open(os.path.join(self.dir, fn), 'rb')
1858 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001859
Serhiy Storchaka43767632013-11-03 21:31:38 +02001860 @unittest.skipUnless(hasattr(os, 'statvfs'),
1861 "need os.statvfs()")
1862 def test_statvfs(self):
1863 # issue #9645
1864 for fn in self.unicodefn:
1865 # should not fail with file not found error
1866 fullname = os.path.join(self.dir, fn)
1867 os.statvfs(fullname)
1868
1869 def test_stat(self):
1870 for fn in self.unicodefn:
1871 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001872
Brian Curtineb24d742010-04-12 17:16:38 +00001873@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1874class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001875 def _kill(self, sig):
1876 # Start sys.executable as a subprocess and communicate from the
1877 # subprocess to the parent that the interpreter is ready. When it
1878 # becomes ready, send *sig* via os.kill to the subprocess and check
1879 # that the return code is equal to *sig*.
1880 import ctypes
1881 from ctypes import wintypes
1882 import msvcrt
1883
1884 # Since we can't access the contents of the process' stdout until the
1885 # process has exited, use PeekNamedPipe to see what's inside stdout
1886 # without waiting. This is done so we can tell that the interpreter
1887 # is started and running at a point where it could handle a signal.
1888 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1889 PeekNamedPipe.restype = wintypes.BOOL
1890 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1891 ctypes.POINTER(ctypes.c_char), # stdout buf
1892 wintypes.DWORD, # Buffer size
1893 ctypes.POINTER(wintypes.DWORD), # bytes read
1894 ctypes.POINTER(wintypes.DWORD), # bytes avail
1895 ctypes.POINTER(wintypes.DWORD)) # bytes left
1896 msg = "running"
1897 proc = subprocess.Popen([sys.executable, "-c",
1898 "import sys;"
1899 "sys.stdout.write('{}');"
1900 "sys.stdout.flush();"
1901 "input()".format(msg)],
1902 stdout=subprocess.PIPE,
1903 stderr=subprocess.PIPE,
1904 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001905 self.addCleanup(proc.stdout.close)
1906 self.addCleanup(proc.stderr.close)
1907 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001908
1909 count, max = 0, 100
1910 while count < max and proc.poll() is None:
1911 # Create a string buffer to store the result of stdout from the pipe
1912 buf = ctypes.create_string_buffer(len(msg))
1913 # Obtain the text currently in proc.stdout
1914 # Bytes read/avail/left are left as NULL and unused
1915 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1916 buf, ctypes.sizeof(buf), None, None, None)
1917 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1918 if buf.value:
1919 self.assertEqual(msg, buf.value.decode())
1920 break
1921 time.sleep(0.1)
1922 count += 1
1923 else:
1924 self.fail("Did not receive communication from the subprocess")
1925
Brian Curtineb24d742010-04-12 17:16:38 +00001926 os.kill(proc.pid, sig)
1927 self.assertEqual(proc.wait(), sig)
1928
1929 def test_kill_sigterm(self):
1930 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001931 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001932
1933 def test_kill_int(self):
1934 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001935 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001936
1937 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001938 tagname = "test_os_%s" % uuid.uuid1()
1939 m = mmap.mmap(-1, 1, tagname)
1940 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001941 # Run a script which has console control handling enabled.
1942 proc = subprocess.Popen([sys.executable,
1943 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001944 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001945 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1946 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001947 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001948 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001949 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001950 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001951 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001952 count += 1
1953 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001954 # Forcefully kill the process if we weren't able to signal it.
1955 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001956 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001957 os.kill(proc.pid, event)
1958 # proc.send_signal(event) could also be done here.
1959 # Allow time for the signal to be passed and the process to exit.
1960 time.sleep(0.5)
1961 if not proc.poll():
1962 # Forcefully kill the process if we weren't able to signal it.
1963 os.kill(proc.pid, signal.SIGINT)
1964 self.fail("subprocess did not stop on {}".format(name))
1965
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001966 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001967 def test_CTRL_C_EVENT(self):
1968 from ctypes import wintypes
1969 import ctypes
1970
1971 # Make a NULL value by creating a pointer with no argument.
1972 NULL = ctypes.POINTER(ctypes.c_int)()
1973 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1974 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1975 wintypes.BOOL)
1976 SetConsoleCtrlHandler.restype = wintypes.BOOL
1977
1978 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001979 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001980 # by subprocesses.
1981 SetConsoleCtrlHandler(NULL, 0)
1982
1983 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1984
1985 def test_CTRL_BREAK_EVENT(self):
1986 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1987
1988
Brian Curtind40e6f72010-07-08 21:39:08 +00001989@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001990class Win32ListdirTests(unittest.TestCase):
1991 """Test listdir on Windows."""
1992
1993 def setUp(self):
1994 self.created_paths = []
1995 for i in range(2):
1996 dir_name = 'SUB%d' % i
1997 dir_path = os.path.join(support.TESTFN, dir_name)
1998 file_name = 'FILE%d' % i
1999 file_path = os.path.join(support.TESTFN, file_name)
2000 os.makedirs(dir_path)
2001 with open(file_path, 'w') as f:
2002 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2003 self.created_paths.extend([dir_name, file_name])
2004 self.created_paths.sort()
2005
2006 def tearDown(self):
2007 shutil.rmtree(support.TESTFN)
2008
2009 def test_listdir_no_extended_path(self):
2010 """Test when the path is not an "extended" path."""
2011 # unicode
2012 self.assertEqual(
2013 sorted(os.listdir(support.TESTFN)),
2014 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002015
Tim Golden781bbeb2013-10-25 20:24:06 +01002016 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002017 self.assertEqual(
2018 sorted(os.listdir(os.fsencode(support.TESTFN))),
2019 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002020
2021 def test_listdir_extended_path(self):
2022 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002023 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002024 # unicode
2025 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2026 self.assertEqual(
2027 sorted(os.listdir(path)),
2028 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002029
Tim Golden781bbeb2013-10-25 20:24:06 +01002030 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002031 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2032 self.assertEqual(
2033 sorted(os.listdir(path)),
2034 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002035
2036
2037@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002038@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002039class Win32SymlinkTests(unittest.TestCase):
2040 filelink = 'filelinktest'
2041 filelink_target = os.path.abspath(__file__)
2042 dirlink = 'dirlinktest'
2043 dirlink_target = os.path.dirname(filelink_target)
2044 missing_link = 'missing link'
2045
2046 def setUp(self):
2047 assert os.path.exists(self.dirlink_target)
2048 assert os.path.exists(self.filelink_target)
2049 assert not os.path.exists(self.dirlink)
2050 assert not os.path.exists(self.filelink)
2051 assert not os.path.exists(self.missing_link)
2052
2053 def tearDown(self):
2054 if os.path.exists(self.filelink):
2055 os.remove(self.filelink)
2056 if os.path.exists(self.dirlink):
2057 os.rmdir(self.dirlink)
2058 if os.path.lexists(self.missing_link):
2059 os.remove(self.missing_link)
2060
2061 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002062 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002063 self.assertTrue(os.path.exists(self.dirlink))
2064 self.assertTrue(os.path.isdir(self.dirlink))
2065 self.assertTrue(os.path.islink(self.dirlink))
2066 self.check_stat(self.dirlink, self.dirlink_target)
2067
2068 def test_file_link(self):
2069 os.symlink(self.filelink_target, self.filelink)
2070 self.assertTrue(os.path.exists(self.filelink))
2071 self.assertTrue(os.path.isfile(self.filelink))
2072 self.assertTrue(os.path.islink(self.filelink))
2073 self.check_stat(self.filelink, self.filelink_target)
2074
2075 def _create_missing_dir_link(self):
2076 'Create a "directory" link to a non-existent target'
2077 linkname = self.missing_link
2078 if os.path.lexists(linkname):
2079 os.remove(linkname)
2080 target = r'c:\\target does not exist.29r3c740'
2081 assert not os.path.exists(target)
2082 target_is_dir = True
2083 os.symlink(target, linkname, target_is_dir)
2084
2085 def test_remove_directory_link_to_missing_target(self):
2086 self._create_missing_dir_link()
2087 # For compatibility with Unix, os.remove will check the
2088 # directory status and call RemoveDirectory if the symlink
2089 # was created with target_is_dir==True.
2090 os.remove(self.missing_link)
2091
2092 @unittest.skip("currently fails; consider for improvement")
2093 def test_isdir_on_directory_link_to_missing_target(self):
2094 self._create_missing_dir_link()
2095 # consider having isdir return true for directory links
2096 self.assertTrue(os.path.isdir(self.missing_link))
2097
2098 @unittest.skip("currently fails; consider for improvement")
2099 def test_rmdir_on_directory_link_to_missing_target(self):
2100 self._create_missing_dir_link()
2101 # consider allowing rmdir to remove directory links
2102 os.rmdir(self.missing_link)
2103
2104 def check_stat(self, link, target):
2105 self.assertEqual(os.stat(link), os.stat(target))
2106 self.assertNotEqual(os.lstat(link), os.stat(link))
2107
Brian Curtind25aef52011-06-13 15:16:04 -05002108 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002109 self.assertEqual(os.stat(bytes_link), os.stat(target))
2110 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002111
2112 def test_12084(self):
2113 level1 = os.path.abspath(support.TESTFN)
2114 level2 = os.path.join(level1, "level2")
2115 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002116 self.addCleanup(support.rmtree, level1)
2117
2118 os.mkdir(level1)
2119 os.mkdir(level2)
2120 os.mkdir(level3)
2121
2122 file1 = os.path.abspath(os.path.join(level1, "file1"))
2123 create_file(file1)
2124
2125 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002126 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002127 os.chdir(level2)
2128 link = os.path.join(level2, "link")
2129 os.symlink(os.path.relpath(file1), "link")
2130 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002131
Victor Stinnerae39d232016-03-24 17:12:55 +01002132 # Check os.stat calls from the same dir as the link
2133 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002134
Victor Stinnerae39d232016-03-24 17:12:55 +01002135 # Check os.stat calls from a dir below the link
2136 os.chdir(level1)
2137 self.assertEqual(os.stat(file1),
2138 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002139
Victor Stinnerae39d232016-03-24 17:12:55 +01002140 # Check os.stat calls from a dir above the link
2141 os.chdir(level3)
2142 self.assertEqual(os.stat(file1),
2143 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002144 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002145 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002146
Brian Curtind40e6f72010-07-08 21:39:08 +00002147
Tim Golden0321cf22014-05-05 19:46:17 +01002148@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2149class Win32JunctionTests(unittest.TestCase):
2150 junction = 'junctiontest'
2151 junction_target = os.path.dirname(os.path.abspath(__file__))
2152
2153 def setUp(self):
2154 assert os.path.exists(self.junction_target)
2155 assert not os.path.exists(self.junction)
2156
2157 def tearDown(self):
2158 if os.path.exists(self.junction):
2159 # os.rmdir delegates to Windows' RemoveDirectoryW,
2160 # which removes junction points safely.
2161 os.rmdir(self.junction)
2162
2163 def test_create_junction(self):
2164 _winapi.CreateJunction(self.junction_target, self.junction)
2165 self.assertTrue(os.path.exists(self.junction))
2166 self.assertTrue(os.path.isdir(self.junction))
2167
2168 # Junctions are not recognized as links.
2169 self.assertFalse(os.path.islink(self.junction))
2170
2171 def test_unlink_removes_junction(self):
2172 _winapi.CreateJunction(self.junction_target, self.junction)
2173 self.assertTrue(os.path.exists(self.junction))
2174
2175 os.unlink(self.junction)
2176 self.assertFalse(os.path.exists(self.junction))
2177
2178
Jason R. Coombs3a092862013-05-27 23:21:28 -04002179@support.skip_unless_symlink
2180class NonLocalSymlinkTests(unittest.TestCase):
2181
2182 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002183 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002184 Create this structure:
2185
2186 base
2187 \___ some_dir
2188 """
2189 os.makedirs('base/some_dir')
2190
2191 def tearDown(self):
2192 shutil.rmtree('base')
2193
2194 def test_directory_link_nonlocal(self):
2195 """
2196 The symlink target should resolve relative to the link, not relative
2197 to the current directory.
2198
2199 Then, link base/some_link -> base/some_dir and ensure that some_link
2200 is resolved as a directory.
2201
2202 In issue13772, it was discovered that directory detection failed if
2203 the symlink target was not specified relative to the current
2204 directory, which was a defect in the implementation.
2205 """
2206 src = os.path.join('base', 'some_link')
2207 os.symlink('some_dir', src)
2208 assert os.path.isdir(src)
2209
2210
Victor Stinnere8d51452010-08-19 01:05:19 +00002211class FSEncodingTests(unittest.TestCase):
2212 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2214 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002215
Victor Stinnere8d51452010-08-19 01:05:19 +00002216 def test_identity(self):
2217 # assert fsdecode(fsencode(x)) == x
2218 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2219 try:
2220 bytesfn = os.fsencode(fn)
2221 except UnicodeEncodeError:
2222 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002224
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002225
Brett Cannonefb00c02012-02-29 18:31:31 -05002226
2227class DeviceEncodingTests(unittest.TestCase):
2228
2229 def test_bad_fd(self):
2230 # Return None when an fd doesn't actually exist.
2231 self.assertIsNone(os.device_encoding(123456))
2232
Philip Jenveye308b7c2012-02-29 16:16:15 -08002233 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2234 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002235 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002236 def test_device_encoding(self):
2237 encoding = os.device_encoding(0)
2238 self.assertIsNotNone(encoding)
2239 self.assertTrue(codecs.lookup(encoding))
2240
2241
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002242class PidTests(unittest.TestCase):
2243 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2244 def test_getppid(self):
2245 p = subprocess.Popen([sys.executable, '-c',
2246 'import os; print(os.getppid())'],
2247 stdout=subprocess.PIPE)
2248 stdout, _ = p.communicate()
2249 # We are the parent of our subprocess
2250 self.assertEqual(int(stdout), os.getpid())
2251
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002252 def test_waitpid(self):
2253 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002254 # Add an implicit test for PyUnicode_FSConverter().
2255 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002256 status = os.waitpid(pid, 0)
2257 self.assertEqual(status, (pid, 0))
2258
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002259
Victor Stinner4659ccf2016-09-14 10:57:00 +02002260class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002261 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002262 self.exitcode = 17
2263
2264 filename = support.TESTFN
2265 self.addCleanup(support.unlink, filename)
2266
2267 if not with_env:
2268 code = 'import sys; sys.exit(%s)' % self.exitcode
2269 else:
2270 self.env = dict(os.environ)
2271 # create an unique key
2272 self.key = str(uuid.uuid4())
2273 self.env[self.key] = self.key
2274 # read the variable from os.environ to check that it exists
2275 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2276 % (self.key, self.exitcode))
2277
2278 with open(filename, "w") as fp:
2279 fp.write(code)
2280
Berker Peksag81816462016-09-15 20:19:47 +03002281 args = [sys.executable, filename]
2282 if use_bytes:
2283 args = [os.fsencode(a) for a in args]
2284 self.env = {os.fsencode(k): os.fsencode(v)
2285 for k, v in self.env.items()}
2286
2287 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002288
Berker Peksag4af23d72016-09-15 20:32:44 +03002289 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002290 def test_spawnl(self):
2291 args = self.create_args()
2292 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2293 self.assertEqual(exitcode, self.exitcode)
2294
Berker Peksag4af23d72016-09-15 20:32:44 +03002295 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002296 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002297 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002298 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2299 self.assertEqual(exitcode, self.exitcode)
2300
Berker Peksag4af23d72016-09-15 20:32:44 +03002301 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002302 def test_spawnlp(self):
2303 args = self.create_args()
2304 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2305 self.assertEqual(exitcode, self.exitcode)
2306
Berker Peksag4af23d72016-09-15 20:32:44 +03002307 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002308 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002309 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002310 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2311 self.assertEqual(exitcode, self.exitcode)
2312
Berker Peksag4af23d72016-09-15 20:32:44 +03002313 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002314 def test_spawnv(self):
2315 args = self.create_args()
2316 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2317 self.assertEqual(exitcode, self.exitcode)
2318
Berker Peksag4af23d72016-09-15 20:32:44 +03002319 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002320 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002321 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002322 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2323 self.assertEqual(exitcode, self.exitcode)
2324
Berker Peksag4af23d72016-09-15 20:32:44 +03002325 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002326 def test_spawnvp(self):
2327 args = self.create_args()
2328 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2329 self.assertEqual(exitcode, self.exitcode)
2330
Berker Peksag4af23d72016-09-15 20:32:44 +03002331 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002332 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002333 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002334 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2335 self.assertEqual(exitcode, self.exitcode)
2336
Berker Peksag4af23d72016-09-15 20:32:44 +03002337 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002338 def test_nowait(self):
2339 args = self.create_args()
2340 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2341 result = os.waitpid(pid, 0)
2342 self.assertEqual(result[0], pid)
2343 status = result[1]
2344 if hasattr(os, 'WIFEXITED'):
2345 self.assertTrue(os.WIFEXITED(status))
2346 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2347 else:
2348 self.assertEqual(status, self.exitcode << 8)
2349
Berker Peksag4af23d72016-09-15 20:32:44 +03002350 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002351 def test_spawnve_bytes(self):
2352 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2353 args = self.create_args(with_env=True, use_bytes=True)
2354 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2355 self.assertEqual(exitcode, self.exitcode)
2356
Steve Dower859fd7b2016-11-19 18:53:19 -08002357 @requires_os_func('spawnl')
2358 def test_spawnl_noargs(self):
2359 args = self.create_args()
2360 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002361 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002362
2363 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002364 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002365 args = self.create_args()
2366 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002367 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002368
2369 @requires_os_func('spawnv')
2370 def test_spawnv_noargs(self):
2371 args = self.create_args()
2372 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2373 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002374 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2375 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002376
2377 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002378 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002379 args = self.create_args()
2380 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2381 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002382 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2383 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002384
Brian Curtin0151b8e2010-09-24 13:43:43 +00002385# The introduction of this TestCase caused at least two different errors on
2386# *nix buildbots. Temporarily skip this to let the buildbots move along.
2387@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002388@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2389class LoginTests(unittest.TestCase):
2390 def test_getlogin(self):
2391 user_name = os.getlogin()
2392 self.assertNotEqual(len(user_name), 0)
2393
2394
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002395@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2396 "needs os.getpriority and os.setpriority")
2397class ProgramPriorityTests(unittest.TestCase):
2398 """Tests for os.getpriority() and os.setpriority()."""
2399
2400 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002401
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002402 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2403 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2404 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002405 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2406 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002407 raise unittest.SkipTest("unable to reliably test setpriority "
2408 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002409 else:
2410 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002411 finally:
2412 try:
2413 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2414 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002415 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002416 raise
2417
2418
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002419if threading is not None:
2420 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002421
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002422 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002423
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002424 def __init__(self, conn):
2425 asynchat.async_chat.__init__(self, conn)
2426 self.in_buffer = []
2427 self.closed = False
2428 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002429
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002430 def handle_read(self):
2431 data = self.recv(4096)
2432 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002433
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002434 def get_data(self):
2435 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002436
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002437 def handle_close(self):
2438 self.close()
2439 self.closed = True
2440
2441 def handle_error(self):
2442 raise
2443
2444 def __init__(self, address):
2445 threading.Thread.__init__(self)
2446 asyncore.dispatcher.__init__(self)
2447 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2448 self.bind(address)
2449 self.listen(5)
2450 self.host, self.port = self.socket.getsockname()[:2]
2451 self.handler_instance = None
2452 self._active = False
2453 self._active_lock = threading.Lock()
2454
2455 # --- public API
2456
2457 @property
2458 def running(self):
2459 return self._active
2460
2461 def start(self):
2462 assert not self.running
2463 self.__flag = threading.Event()
2464 threading.Thread.start(self)
2465 self.__flag.wait()
2466
2467 def stop(self):
2468 assert self.running
2469 self._active = False
2470 self.join()
2471
2472 def wait(self):
2473 # wait for handler connection to be closed, then stop the server
2474 while not getattr(self.handler_instance, "closed", False):
2475 time.sleep(0.001)
2476 self.stop()
2477
2478 # --- internals
2479
2480 def run(self):
2481 self._active = True
2482 self.__flag.set()
2483 while self._active and asyncore.socket_map:
2484 self._active_lock.acquire()
2485 asyncore.loop(timeout=0.001, count=1)
2486 self._active_lock.release()
2487 asyncore.close_all()
2488
2489 def handle_accept(self):
2490 conn, addr = self.accept()
2491 self.handler_instance = self.Handler(conn)
2492
2493 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002494 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002495 handle_read = handle_connect
2496
2497 def writable(self):
2498 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002499
2500 def handle_error(self):
2501 raise
2502
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002503
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002504@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002505@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2506class TestSendfile(unittest.TestCase):
2507
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002508 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002509 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002510 not sys.platform.startswith("solaris") and \
2511 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002512 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2513 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002514
2515 @classmethod
2516 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002517 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002518 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002519
2520 @classmethod
2521 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002522 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002523 support.unlink(support.TESTFN)
2524
2525 def setUp(self):
2526 self.server = SendfileTestServer((support.HOST, 0))
2527 self.server.start()
2528 self.client = socket.socket()
2529 self.client.connect((self.server.host, self.server.port))
2530 self.client.settimeout(1)
2531 # synchronize by waiting for "220 ready" response
2532 self.client.recv(1024)
2533 self.sockno = self.client.fileno()
2534 self.file = open(support.TESTFN, 'rb')
2535 self.fileno = self.file.fileno()
2536
2537 def tearDown(self):
2538 self.file.close()
2539 self.client.close()
2540 if self.server.running:
2541 self.server.stop()
2542
2543 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2544 """A higher level wrapper representing how an application is
2545 supposed to use sendfile().
2546 """
2547 while 1:
2548 try:
2549 if self.SUPPORT_HEADERS_TRAILERS:
2550 return os.sendfile(sock, file, offset, nbytes, headers,
2551 trailers)
2552 else:
2553 return os.sendfile(sock, file, offset, nbytes)
2554 except OSError as err:
2555 if err.errno == errno.ECONNRESET:
2556 # disconnected
2557 raise
2558 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2559 # we have to retry send data
2560 continue
2561 else:
2562 raise
2563
2564 def test_send_whole_file(self):
2565 # normal send
2566 total_sent = 0
2567 offset = 0
2568 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002569 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002570 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2571 if sent == 0:
2572 break
2573 offset += sent
2574 total_sent += sent
2575 self.assertTrue(sent <= nbytes)
2576 self.assertEqual(offset, total_sent)
2577
2578 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002579 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002580 self.client.close()
2581 self.server.wait()
2582 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002583 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002584 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002585
2586 def test_send_at_certain_offset(self):
2587 # start sending a file at a certain offset
2588 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002589 offset = len(self.DATA) // 2
2590 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002591 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002592 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002593 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2594 if sent == 0:
2595 break
2596 offset += sent
2597 total_sent += sent
2598 self.assertTrue(sent <= nbytes)
2599
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002600 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002601 self.client.close()
2602 self.server.wait()
2603 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002604 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002605 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002606 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002607 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002608
2609 def test_offset_overflow(self):
2610 # specify an offset > file size
2611 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002612 try:
2613 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2614 except OSError as e:
2615 # Solaris can raise EINVAL if offset >= file length, ignore.
2616 if e.errno != errno.EINVAL:
2617 raise
2618 else:
2619 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002620 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002621 self.client.close()
2622 self.server.wait()
2623 data = self.server.handler_instance.get_data()
2624 self.assertEqual(data, b'')
2625
2626 def test_invalid_offset(self):
2627 with self.assertRaises(OSError) as cm:
2628 os.sendfile(self.sockno, self.fileno, -1, 4096)
2629 self.assertEqual(cm.exception.errno, errno.EINVAL)
2630
Martin Panterbf19d162015-09-09 01:01:13 +00002631 def test_keywords(self):
2632 # Keyword arguments should be supported
2633 os.sendfile(out=self.sockno, offset=0, count=4096,
2634 **{'in': self.fileno})
2635 if self.SUPPORT_HEADERS_TRAILERS:
2636 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002637 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002638
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002639 # --- headers / trailers tests
2640
Serhiy Storchaka43767632013-11-03 21:31:38 +02002641 @requires_headers_trailers
2642 def test_headers(self):
2643 total_sent = 0
2644 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2645 headers=[b"x" * 512])
2646 total_sent += sent
2647 offset = 4096
2648 nbytes = 4096
2649 while 1:
2650 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2651 offset, nbytes)
2652 if sent == 0:
2653 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002654 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002655 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002656
Serhiy Storchaka43767632013-11-03 21:31:38 +02002657 expected_data = b"x" * 512 + self.DATA
2658 self.assertEqual(total_sent, len(expected_data))
2659 self.client.close()
2660 self.server.wait()
2661 data = self.server.handler_instance.get_data()
2662 self.assertEqual(hash(data), hash(expected_data))
2663
2664 @requires_headers_trailers
2665 def test_trailers(self):
2666 TESTFN2 = support.TESTFN + "2"
2667 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002668
2669 self.addCleanup(support.unlink, TESTFN2)
2670 create_file(TESTFN2, file_data)
2671
2672 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002673 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2674 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002675 self.client.close()
2676 self.server.wait()
2677 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002678 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002679
Serhiy Storchaka43767632013-11-03 21:31:38 +02002680 @requires_headers_trailers
2681 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2682 'test needs os.SF_NODISKIO')
2683 def test_flags(self):
2684 try:
2685 os.sendfile(self.sockno, self.fileno, 0, 4096,
2686 flags=os.SF_NODISKIO)
2687 except OSError as err:
2688 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2689 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002690
2691
Larry Hastings9cf065c2012-06-22 16:30:09 -07002692def supports_extended_attributes():
2693 if not hasattr(os, "setxattr"):
2694 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002695
Larry Hastings9cf065c2012-06-22 16:30:09 -07002696 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002697 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002698 try:
2699 os.setxattr(fp.fileno(), b"user.test", b"")
2700 except OSError:
2701 return False
2702 finally:
2703 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002704
2705 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002706
2707
2708@unittest.skipUnless(supports_extended_attributes(),
2709 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002710# Kernels < 2.6.39 don't respect setxattr flags.
2711@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002712class ExtendedAttributeTests(unittest.TestCase):
2713
Larry Hastings9cf065c2012-06-22 16:30:09 -07002714 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002715 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002716 self.addCleanup(support.unlink, fn)
2717 create_file(fn)
2718
Benjamin Peterson799bd802011-08-31 22:15:17 -04002719 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002720 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002721 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002722
Victor Stinnerf12e5062011-10-16 22:12:03 +02002723 init_xattr = listxattr(fn)
2724 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002725
Larry Hastings9cf065c2012-06-22 16:30:09 -07002726 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002727 xattr = set(init_xattr)
2728 xattr.add("user.test")
2729 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002730 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2731 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2732 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002733
Benjamin Peterson799bd802011-08-31 22:15:17 -04002734 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002735 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002736 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002737
Benjamin Peterson799bd802011-08-31 22:15:17 -04002738 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002739 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002740 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002741
Larry Hastings9cf065c2012-06-22 16:30:09 -07002742 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002743 xattr.add("user.test2")
2744 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002745 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002746
Benjamin Peterson799bd802011-08-31 22:15:17 -04002747 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002748 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002749 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002750
Victor Stinnerf12e5062011-10-16 22:12:03 +02002751 xattr.remove("user.test")
2752 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002753 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2754 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2755 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2756 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002757 many = sorted("user.test{}".format(i) for i in range(100))
2758 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002759 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002760 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002761
Larry Hastings9cf065c2012-06-22 16:30:09 -07002762 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002763 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002764 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002765
2766 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2767 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002768
2769 def test_simple(self):
2770 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2771 os.listxattr)
2772
2773 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002774 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2775 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002776
2777 def test_fds(self):
2778 def getxattr(path, *args):
2779 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002780 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002781 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002782 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002783 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002784 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002785 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002786 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002787 def listxattr(path, *args):
2788 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002789 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002790 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2791
2792
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002793@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2794class TermsizeTests(unittest.TestCase):
2795 def test_does_not_crash(self):
2796 """Check if get_terminal_size() returns a meaningful value.
2797
2798 There's no easy portable way to actually check the size of the
2799 terminal, so let's check if it returns something sensible instead.
2800 """
2801 try:
2802 size = os.get_terminal_size()
2803 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002804 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002805 # Under win32 a generic OSError can be thrown if the
2806 # handle cannot be retrieved
2807 self.skipTest("failed to query terminal size")
2808 raise
2809
Antoine Pitroucfade362012-02-08 23:48:59 +01002810 self.assertGreaterEqual(size.columns, 0)
2811 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002812
2813 def test_stty_match(self):
2814 """Check if stty returns the same results
2815
2816 stty actually tests stdin, so get_terminal_size is invoked on
2817 stdin explicitly. If stty succeeded, then get_terminal_size()
2818 should work too.
2819 """
2820 try:
2821 size = subprocess.check_output(['stty', 'size']).decode().split()
2822 except (FileNotFoundError, subprocess.CalledProcessError):
2823 self.skipTest("stty invocation failed")
2824 expected = (int(size[1]), int(size[0])) # reversed order
2825
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002826 try:
2827 actual = os.get_terminal_size(sys.__stdin__.fileno())
2828 except OSError as e:
2829 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2830 # Under win32 a generic OSError can be thrown if the
2831 # handle cannot be retrieved
2832 self.skipTest("failed to query terminal size")
2833 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002834 self.assertEqual(expected, actual)
2835
2836
Victor Stinner292c8352012-10-30 02:17:38 +01002837class OSErrorTests(unittest.TestCase):
2838 def setUp(self):
2839 class Str(str):
2840 pass
2841
Victor Stinnerafe17062012-10-31 22:47:43 +01002842 self.bytes_filenames = []
2843 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002844 if support.TESTFN_UNENCODABLE is not None:
2845 decoded = support.TESTFN_UNENCODABLE
2846 else:
2847 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002848 self.unicode_filenames.append(decoded)
2849 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002850 if support.TESTFN_UNDECODABLE is not None:
2851 encoded = support.TESTFN_UNDECODABLE
2852 else:
2853 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002854 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002855 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002856 self.bytes_filenames.append(memoryview(encoded))
2857
2858 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002859
2860 def test_oserror_filename(self):
2861 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002862 (self.filenames, os.chdir,),
2863 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002864 (self.filenames, os.lstat,),
2865 (self.filenames, os.open, os.O_RDONLY),
2866 (self.filenames, os.rmdir,),
2867 (self.filenames, os.stat,),
2868 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002869 ]
2870 if sys.platform == "win32":
2871 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002872 (self.bytes_filenames, os.rename, b"dst"),
2873 (self.bytes_filenames, os.replace, b"dst"),
2874 (self.unicode_filenames, os.rename, "dst"),
2875 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002876 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002877 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002878 else:
2879 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002880 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002881 (self.filenames, os.rename, "dst"),
2882 (self.filenames, os.replace, "dst"),
2883 ))
2884 if hasattr(os, "chown"):
2885 funcs.append((self.filenames, os.chown, 0, 0))
2886 if hasattr(os, "lchown"):
2887 funcs.append((self.filenames, os.lchown, 0, 0))
2888 if hasattr(os, "truncate"):
2889 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002890 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002891 funcs.append((self.filenames, os.chflags, 0))
2892 if hasattr(os, "lchflags"):
2893 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002894 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002895 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002896 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002897 if sys.platform == "win32":
2898 funcs.append((self.bytes_filenames, os.link, b"dst"))
2899 funcs.append((self.unicode_filenames, os.link, "dst"))
2900 else:
2901 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002902 if hasattr(os, "listxattr"):
2903 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002904 (self.filenames, os.listxattr,),
2905 (self.filenames, os.getxattr, "user.test"),
2906 (self.filenames, os.setxattr, "user.test", b'user'),
2907 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002908 ))
2909 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002910 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002911 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002912 if sys.platform == "win32":
2913 funcs.append((self.unicode_filenames, os.readlink,))
2914 else:
2915 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002916
Steve Dowercc16be82016-09-08 10:35:16 -07002917
Victor Stinnerafe17062012-10-31 22:47:43 +01002918 for filenames, func, *func_args in funcs:
2919 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002920 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002921 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002922 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002923 else:
2924 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2925 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002926 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002927 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002928 except UnicodeDecodeError:
2929 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002930 else:
2931 self.fail("No exception thrown by {}".format(func))
2932
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002933class CPUCountTests(unittest.TestCase):
2934 def test_cpu_count(self):
2935 cpus = os.cpu_count()
2936 if cpus is not None:
2937 self.assertIsInstance(cpus, int)
2938 self.assertGreater(cpus, 0)
2939 else:
2940 self.skipTest("Could not determine the number of CPUs")
2941
Victor Stinnerdaf45552013-08-28 00:53:59 +02002942
2943class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002944 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002945 fd = os.open(__file__, os.O_RDONLY)
2946 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002947 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002948
Victor Stinnerdaf45552013-08-28 00:53:59 +02002949 os.set_inheritable(fd, True)
2950 self.assertEqual(os.get_inheritable(fd), True)
2951
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002952 @unittest.skipIf(fcntl is None, "need fcntl")
2953 def test_get_inheritable_cloexec(self):
2954 fd = os.open(__file__, os.O_RDONLY)
2955 self.addCleanup(os.close, fd)
2956 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002957
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002958 # clear FD_CLOEXEC flag
2959 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2960 flags &= ~fcntl.FD_CLOEXEC
2961 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002962
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002963 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002964
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002965 @unittest.skipIf(fcntl is None, "need fcntl")
2966 def test_set_inheritable_cloexec(self):
2967 fd = os.open(__file__, os.O_RDONLY)
2968 self.addCleanup(os.close, fd)
2969 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2970 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002971
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002972 os.set_inheritable(fd, True)
2973 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2974 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002975
Victor Stinnerdaf45552013-08-28 00:53:59 +02002976 def test_open(self):
2977 fd = os.open(__file__, os.O_RDONLY)
2978 self.addCleanup(os.close, fd)
2979 self.assertEqual(os.get_inheritable(fd), False)
2980
2981 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2982 def test_pipe(self):
2983 rfd, wfd = os.pipe()
2984 self.addCleanup(os.close, rfd)
2985 self.addCleanup(os.close, wfd)
2986 self.assertEqual(os.get_inheritable(rfd), False)
2987 self.assertEqual(os.get_inheritable(wfd), False)
2988
2989 def test_dup(self):
2990 fd1 = os.open(__file__, os.O_RDONLY)
2991 self.addCleanup(os.close, fd1)
2992
2993 fd2 = os.dup(fd1)
2994 self.addCleanup(os.close, fd2)
2995 self.assertEqual(os.get_inheritable(fd2), False)
2996
2997 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2998 def test_dup2(self):
2999 fd = os.open(__file__, os.O_RDONLY)
3000 self.addCleanup(os.close, fd)
3001
3002 # inheritable by default
3003 fd2 = os.open(__file__, os.O_RDONLY)
3004 try:
3005 os.dup2(fd, fd2)
3006 self.assertEqual(os.get_inheritable(fd2), True)
3007 finally:
3008 os.close(fd2)
3009
3010 # force non-inheritable
3011 fd3 = os.open(__file__, os.O_RDONLY)
3012 try:
3013 os.dup2(fd, fd3, inheritable=False)
3014 self.assertEqual(os.get_inheritable(fd3), False)
3015 finally:
3016 os.close(fd3)
3017
3018 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3019 def test_openpty(self):
3020 master_fd, slave_fd = os.openpty()
3021 self.addCleanup(os.close, master_fd)
3022 self.addCleanup(os.close, slave_fd)
3023 self.assertEqual(os.get_inheritable(master_fd), False)
3024 self.assertEqual(os.get_inheritable(slave_fd), False)
3025
3026
Brett Cannon3f9183b2016-08-26 14:44:48 -07003027class PathTConverterTests(unittest.TestCase):
3028 # tuples of (function name, allows fd arguments, additional arguments to
3029 # function, cleanup function)
3030 functions = [
3031 ('stat', True, (), None),
3032 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003033 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003034 ('chflags', False, (0,), None),
3035 ('lchflags', False, (0,), None),
3036 ('open', False, (0,), getattr(os, 'close', None)),
3037 ]
3038
3039 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003040 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003041 if os.name == 'nt':
3042 bytes_fspath = bytes_filename = None
3043 else:
3044 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003045 bytes_fspath = _PathLike(bytes_filename)
3046 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003047 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003048 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003049
Brett Cannonec6ce872016-09-06 15:50:29 -07003050 int_fspath = _PathLike(fd)
3051 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003052
3053 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3054 with self.subTest(name=name):
3055 try:
3056 fn = getattr(os, name)
3057 except AttributeError:
3058 continue
3059
Brett Cannon8f96a302016-08-26 19:30:11 -07003060 for path in (str_filename, bytes_filename, str_fspath,
3061 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003062 if path is None:
3063 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003064 with self.subTest(name=name, path=path):
3065 result = fn(path, *extra_args)
3066 if cleanup_fn is not None:
3067 cleanup_fn(result)
3068
3069 with self.assertRaisesRegex(
3070 TypeError, 'should be string, bytes'):
3071 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003072
3073 if allow_fd:
3074 result = fn(fd, *extra_args) # should not fail
3075 if cleanup_fn is not None:
3076 cleanup_fn(result)
3077 else:
3078 with self.assertRaisesRegex(
3079 TypeError,
3080 'os.PathLike'):
3081 fn(fd, *extra_args)
3082
3083
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003084@unittest.skipUnless(hasattr(os, 'get_blocking'),
3085 'needs os.get_blocking() and os.set_blocking()')
3086class BlockingTests(unittest.TestCase):
3087 def test_blocking(self):
3088 fd = os.open(__file__, os.O_RDONLY)
3089 self.addCleanup(os.close, fd)
3090 self.assertEqual(os.get_blocking(fd), True)
3091
3092 os.set_blocking(fd, False)
3093 self.assertEqual(os.get_blocking(fd), False)
3094
3095 os.set_blocking(fd, True)
3096 self.assertEqual(os.get_blocking(fd), True)
3097
3098
Yury Selivanov97e2e062014-09-26 12:33:06 -04003099
3100class ExportsTests(unittest.TestCase):
3101 def test_os_all(self):
3102 self.assertIn('open', os.__all__)
3103 self.assertIn('walk', os.__all__)
3104
3105
Victor Stinner6036e442015-03-08 01:58:04 +01003106class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003107 check_no_resource_warning = support.check_no_resource_warning
3108
Victor Stinner6036e442015-03-08 01:58:04 +01003109 def setUp(self):
3110 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003111 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003112 self.addCleanup(support.rmtree, self.path)
3113 os.mkdir(self.path)
3114
3115 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003116 path = self.bytes_path if isinstance(name, bytes) else self.path
3117 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003118 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003119 return filename
3120
3121 def get_entries(self, names):
3122 entries = dict((entry.name, entry)
3123 for entry in os.scandir(self.path))
3124 self.assertEqual(sorted(entries.keys()), names)
3125 return entries
3126
3127 def assert_stat_equal(self, stat1, stat2, skip_fields):
3128 if skip_fields:
3129 for attr in dir(stat1):
3130 if not attr.startswith("st_"):
3131 continue
3132 if attr in ("st_dev", "st_ino", "st_nlink"):
3133 continue
3134 self.assertEqual(getattr(stat1, attr),
3135 getattr(stat2, attr),
3136 (stat1, stat2, attr))
3137 else:
3138 self.assertEqual(stat1, stat2)
3139
3140 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003141 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003142 self.assertEqual(entry.name, name)
3143 self.assertEqual(entry.path, os.path.join(self.path, name))
3144 self.assertEqual(entry.inode(),
3145 os.stat(entry.path, follow_symlinks=False).st_ino)
3146
3147 entry_stat = os.stat(entry.path)
3148 self.assertEqual(entry.is_dir(),
3149 stat.S_ISDIR(entry_stat.st_mode))
3150 self.assertEqual(entry.is_file(),
3151 stat.S_ISREG(entry_stat.st_mode))
3152 self.assertEqual(entry.is_symlink(),
3153 os.path.islink(entry.path))
3154
3155 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3156 self.assertEqual(entry.is_dir(follow_symlinks=False),
3157 stat.S_ISDIR(entry_lstat.st_mode))
3158 self.assertEqual(entry.is_file(follow_symlinks=False),
3159 stat.S_ISREG(entry_lstat.st_mode))
3160
3161 self.assert_stat_equal(entry.stat(),
3162 entry_stat,
3163 os.name == 'nt' and not is_symlink)
3164 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3165 entry_lstat,
3166 os.name == 'nt')
3167
3168 def test_attributes(self):
3169 link = hasattr(os, 'link')
3170 symlink = support.can_symlink()
3171
3172 dirname = os.path.join(self.path, "dir")
3173 os.mkdir(dirname)
3174 filename = self.create_file("file.txt")
3175 if link:
3176 os.link(filename, os.path.join(self.path, "link_file.txt"))
3177 if symlink:
3178 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3179 target_is_directory=True)
3180 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3181
3182 names = ['dir', 'file.txt']
3183 if link:
3184 names.append('link_file.txt')
3185 if symlink:
3186 names.extend(('symlink_dir', 'symlink_file.txt'))
3187 entries = self.get_entries(names)
3188
3189 entry = entries['dir']
3190 self.check_entry(entry, 'dir', True, False, False)
3191
3192 entry = entries['file.txt']
3193 self.check_entry(entry, 'file.txt', False, True, False)
3194
3195 if link:
3196 entry = entries['link_file.txt']
3197 self.check_entry(entry, 'link_file.txt', False, True, False)
3198
3199 if symlink:
3200 entry = entries['symlink_dir']
3201 self.check_entry(entry, 'symlink_dir', True, False, True)
3202
3203 entry = entries['symlink_file.txt']
3204 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3205
3206 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003207 path = self.bytes_path if isinstance(name, bytes) else self.path
3208 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003209 self.assertEqual(len(entries), 1)
3210
3211 entry = entries[0]
3212 self.assertEqual(entry.name, name)
3213 return entry
3214
Brett Cannon96881cd2016-06-10 14:37:21 -07003215 def create_file_entry(self, name='file.txt'):
3216 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003217 return self.get_entry(os.path.basename(filename))
3218
3219 def test_current_directory(self):
3220 filename = self.create_file()
3221 old_dir = os.getcwd()
3222 try:
3223 os.chdir(self.path)
3224
3225 # call scandir() without parameter: it must list the content
3226 # of the current directory
3227 entries = dict((entry.name, entry) for entry in os.scandir())
3228 self.assertEqual(sorted(entries.keys()),
3229 [os.path.basename(filename)])
3230 finally:
3231 os.chdir(old_dir)
3232
3233 def test_repr(self):
3234 entry = self.create_file_entry()
3235 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3236
Brett Cannon96881cd2016-06-10 14:37:21 -07003237 def test_fspath_protocol(self):
3238 entry = self.create_file_entry()
3239 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3240
3241 def test_fspath_protocol_bytes(self):
3242 bytes_filename = os.fsencode('bytesfile.txt')
3243 bytes_entry = self.create_file_entry(name=bytes_filename)
3244 fspath = os.fspath(bytes_entry)
3245 self.assertIsInstance(fspath, bytes)
3246 self.assertEqual(fspath,
3247 os.path.join(os.fsencode(self.path),bytes_filename))
3248
Victor Stinner6036e442015-03-08 01:58:04 +01003249 def test_removed_dir(self):
3250 path = os.path.join(self.path, 'dir')
3251
3252 os.mkdir(path)
3253 entry = self.get_entry('dir')
3254 os.rmdir(path)
3255
3256 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3257 if os.name == 'nt':
3258 self.assertTrue(entry.is_dir())
3259 self.assertFalse(entry.is_file())
3260 self.assertFalse(entry.is_symlink())
3261 if os.name == 'nt':
3262 self.assertRaises(FileNotFoundError, entry.inode)
3263 # don't fail
3264 entry.stat()
3265 entry.stat(follow_symlinks=False)
3266 else:
3267 self.assertGreater(entry.inode(), 0)
3268 self.assertRaises(FileNotFoundError, entry.stat)
3269 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3270
3271 def test_removed_file(self):
3272 entry = self.create_file_entry()
3273 os.unlink(entry.path)
3274
3275 self.assertFalse(entry.is_dir())
3276 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3277 if os.name == 'nt':
3278 self.assertTrue(entry.is_file())
3279 self.assertFalse(entry.is_symlink())
3280 if os.name == 'nt':
3281 self.assertRaises(FileNotFoundError, entry.inode)
3282 # don't fail
3283 entry.stat()
3284 entry.stat(follow_symlinks=False)
3285 else:
3286 self.assertGreater(entry.inode(), 0)
3287 self.assertRaises(FileNotFoundError, entry.stat)
3288 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3289
3290 def test_broken_symlink(self):
3291 if not support.can_symlink():
3292 return self.skipTest('cannot create symbolic link')
3293
3294 filename = self.create_file("file.txt")
3295 os.symlink(filename,
3296 os.path.join(self.path, "symlink.txt"))
3297 entries = self.get_entries(['file.txt', 'symlink.txt'])
3298 entry = entries['symlink.txt']
3299 os.unlink(filename)
3300
3301 self.assertGreater(entry.inode(), 0)
3302 self.assertFalse(entry.is_dir())
3303 self.assertFalse(entry.is_file()) # broken symlink returns False
3304 self.assertFalse(entry.is_dir(follow_symlinks=False))
3305 self.assertFalse(entry.is_file(follow_symlinks=False))
3306 self.assertTrue(entry.is_symlink())
3307 self.assertRaises(FileNotFoundError, entry.stat)
3308 # don't fail
3309 entry.stat(follow_symlinks=False)
3310
3311 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003312 self.create_file("file.txt")
3313
3314 path_bytes = os.fsencode(self.path)
3315 entries = list(os.scandir(path_bytes))
3316 self.assertEqual(len(entries), 1, entries)
3317 entry = entries[0]
3318
3319 self.assertEqual(entry.name, b'file.txt')
3320 self.assertEqual(entry.path,
3321 os.fsencode(os.path.join(self.path, 'file.txt')))
3322
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003323 @unittest.skipUnless(os.listdir in os.supports_fd,
3324 'fd support for listdir required for this test.')
3325 def test_fd(self):
3326 self.assertIn(os.scandir, os.supports_fd)
3327 self.create_file('file.txt')
3328 expected_names = ['file.txt']
3329 if support.can_symlink():
3330 os.symlink('file.txt', os.path.join(self.path, 'link'))
3331 expected_names.append('link')
3332
3333 fd = os.open(self.path, os.O_RDONLY)
3334 try:
3335 with os.scandir(fd) as it:
3336 entries = list(it)
3337 names = [entry.name for entry in entries]
3338 self.assertEqual(sorted(names), expected_names)
3339 self.assertEqual(names, os.listdir(fd))
3340 for entry in entries:
3341 self.assertEqual(entry.path, entry.name)
3342 self.assertEqual(os.fspath(entry), entry.name)
3343 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3344 if os.stat in os.supports_dir_fd:
3345 st = os.stat(entry.name, dir_fd=fd)
3346 self.assertEqual(entry.stat(), st)
3347 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3348 self.assertEqual(entry.stat(follow_symlinks=False), st)
3349 finally:
3350 os.close(fd)
3351
Victor Stinner6036e442015-03-08 01:58:04 +01003352 def test_empty_path(self):
3353 self.assertRaises(FileNotFoundError, os.scandir, '')
3354
3355 def test_consume_iterator_twice(self):
3356 self.create_file("file.txt")
3357 iterator = os.scandir(self.path)
3358
3359 entries = list(iterator)
3360 self.assertEqual(len(entries), 1, entries)
3361
3362 # check than consuming the iterator twice doesn't raise exception
3363 entries2 = list(iterator)
3364 self.assertEqual(len(entries2), 0, entries2)
3365
3366 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003367 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003368 self.assertRaises(TypeError, os.scandir, obj)
3369
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003370 def test_close(self):
3371 self.create_file("file.txt")
3372 self.create_file("file2.txt")
3373 iterator = os.scandir(self.path)
3374 next(iterator)
3375 iterator.close()
3376 # multiple closes
3377 iterator.close()
3378 with self.check_no_resource_warning():
3379 del iterator
3380
3381 def test_context_manager(self):
3382 self.create_file("file.txt")
3383 self.create_file("file2.txt")
3384 with os.scandir(self.path) as iterator:
3385 next(iterator)
3386 with self.check_no_resource_warning():
3387 del iterator
3388
3389 def test_context_manager_close(self):
3390 self.create_file("file.txt")
3391 self.create_file("file2.txt")
3392 with os.scandir(self.path) as iterator:
3393 next(iterator)
3394 iterator.close()
3395
3396 def test_context_manager_exception(self):
3397 self.create_file("file.txt")
3398 self.create_file("file2.txt")
3399 with self.assertRaises(ZeroDivisionError):
3400 with os.scandir(self.path) as iterator:
3401 next(iterator)
3402 1/0
3403 with self.check_no_resource_warning():
3404 del iterator
3405
3406 def test_resource_warning(self):
3407 self.create_file("file.txt")
3408 self.create_file("file2.txt")
3409 iterator = os.scandir(self.path)
3410 next(iterator)
3411 with self.assertWarns(ResourceWarning):
3412 del iterator
3413 support.gc_collect()
3414 # exhausted iterator
3415 iterator = os.scandir(self.path)
3416 list(iterator)
3417 with self.check_no_resource_warning():
3418 del iterator
3419
Victor Stinner6036e442015-03-08 01:58:04 +01003420
Ethan Furmancdc08792016-06-02 15:06:09 -07003421class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003422
3423 # Abstracted so it can be overridden to test pure Python implementation
3424 # if a C version is provided.
3425 fspath = staticmethod(os.fspath)
3426
Ethan Furmancdc08792016-06-02 15:06:09 -07003427 def test_return_bytes(self):
3428 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003429 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003430
3431 def test_return_string(self):
3432 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003433 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003434
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003435 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003436 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003437 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003438
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003439 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003440 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3441 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3442
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003443 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003444 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3445 self.assertTrue(issubclass(_PathLike, os.PathLike))
3446 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003447
Ethan Furmancdc08792016-06-02 15:06:09 -07003448 def test_garbage_in_exception_out(self):
3449 vapor = type('blah', (), {})
3450 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003451 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003452
3453 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003454 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003455
Brett Cannon044283a2016-07-15 10:41:49 -07003456 def test_bad_pathlike(self):
3457 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003458 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003459 # __fspath__ attribute that is not callable.
3460 c = type('foo', (), {})
3461 c.__fspath__ = 1
3462 self.assertRaises(TypeError, self.fspath, c())
3463 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003464 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003465 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003466
3467# Only test if the C version is provided, otherwise TestPEP519 already tested
3468# the pure Python implementation.
3469if hasattr(os, "_fspath"):
3470 class TestPEP519PurePython(TestPEP519):
3471
3472 """Explicitly test the pure Python implementation of os.fspath()."""
3473
3474 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003475
3476
Fred Drake2e2be372001-09-20 21:33:42 +00003477if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003478 unittest.main()