blob: 5f302f6d0eead6a55eb14938fe72129b74b0c339 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020025import time
26import unittest
27import uuid
28import warnings
29from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import grp
48 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
49 if hasattr(os, 'getgid'):
50 process_gid = os.getgid()
51 if process_gid not in groups:
52 groups.append(process_gid)
53except ImportError:
54 groups = []
55try:
56 import pwd
57 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010058except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050059 all_users = []
60try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020062except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020066from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000067
Victor Stinner923590e2016-03-24 09:11:48 +010068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Victor Stinner923590e2016-03-24 09:11:48 +010085
86@contextlib.contextmanager
87def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
Berker Peksag4af23d72016-09-15 20:32:44 +030092def requires_os_func(name):
93 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
94
95
Brett Cannonec6ce872016-09-06 15:50:29 -070096class _PathLike(os.PathLike):
97
98 def __init__(self, path=""):
99 self.path = path
100
101 def __str__(self):
102 return str(self.path)
103
104 def __fspath__(self):
105 if isinstance(self.path, BaseException):
106 raise self.path
107 else:
108 return self.path
109
110
Victor Stinnerae39d232016-03-24 17:12:55 +0100111def create_file(filename, content=b'content'):
112 with open(filename, "xb", 0) as fp:
113 fp.write(content)
114
115
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116# Tests creating TESTFN
117class FileTests(unittest.TestCase):
118 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000119 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000120 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121 tearDown = setUp
122
123 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Christian Heimesfdab48e2008-01-20 09:06:41 +0000128 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000129 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
130 # We must allocate two consecutive file descriptors, otherwise
131 # it will mess up other file descriptors (perhaps even the three
132 # standard ones).
133 second = os.dup(first)
134 try:
135 retries = 0
136 while second != first + 1:
137 os.close(first)
138 retries += 1
139 if retries > 10:
140 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000141 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000142 first, second = second, os.dup(second)
143 finally:
144 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000145 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000146 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000147 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000148
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000149 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000150 def test_rename(self):
151 path = support.TESTFN
152 old = sys.getrefcount(path)
153 self.assertRaises(TypeError, os.rename, path, 0)
154 new = sys.getrefcount(path)
155 self.assertEqual(old, new)
156
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157 def test_read(self):
158 with open(support.TESTFN, "w+b") as fobj:
159 fobj.write(b"spam")
160 fobj.flush()
161 fd = fobj.fileno()
162 os.lseek(fd, 0, 0)
163 s = os.read(fd, 4)
164 self.assertEqual(type(s), bytes)
165 self.assertEqual(s, b"spam")
166
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200167 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200168 # Skip the test on 32-bit platforms: the number of bytes must fit in a
169 # Py_ssize_t type
170 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
171 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200172 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
173 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200174 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100175 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200176
177 # Issue #21932: Make sure that os.read() does not raise an
178 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200179 with open(support.TESTFN, "rb") as fp:
180 data = os.read(fp.fileno(), size)
181
182 # The test does not try to read more than 2 GB at once because the
183 # operating system is free to return less bytes than requested.
184 self.assertEqual(data, b'test')
185
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000186 def test_write(self):
187 # os.write() accepts bytes- and buffer-like objects but not strings
188 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
189 self.assertRaises(TypeError, os.write, fd, "beans")
190 os.write(fd, b"bacon\n")
191 os.write(fd, bytearray(b"eggs\n"))
192 os.write(fd, memoryview(b"spam\n"))
193 os.close(fd)
194 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000195 self.assertEqual(fobj.read().splitlines(),
196 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000197
Victor Stinnere0daff12011-03-20 23:36:35 +0100198 def write_windows_console(self, *args):
199 retcode = subprocess.call(args,
200 # use a new console to not flood the test output
201 creationflags=subprocess.CREATE_NEW_CONSOLE,
202 # use a shell to hide the console window (SW_HIDE)
203 shell=True)
204 self.assertEqual(retcode, 0)
205
206 @unittest.skipUnless(sys.platform == 'win32',
207 'test specific to the Windows console')
208 def test_write_windows_console(self):
209 # Issue #11395: the Windows console returns an error (12: not enough
210 # space error) on writing into stdout if stdout mode is binary and the
211 # length is greater than 66,000 bytes (or less, depending on heap
212 # usage).
213 code = "print('x' * 100000)"
214 self.write_windows_console(sys.executable, "-c", code)
215 self.write_windows_console(sys.executable, "-u", "-c", code)
216
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000217 def fdopen_helper(self, *args):
218 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200219 f = os.fdopen(fd, *args)
220 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000221
222 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200223 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
224 os.close(fd)
225
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000226 self.fdopen_helper()
227 self.fdopen_helper('r')
228 self.fdopen_helper('r', 100)
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 def test_replace(self):
231 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100232 self.addCleanup(support.unlink, support.TESTFN)
233 self.addCleanup(support.unlink, TESTFN2)
234
235 create_file(support.TESTFN, b"1")
236 create_file(TESTFN2, b"2")
237
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100238 os.replace(support.TESTFN, TESTFN2)
239 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
240 with open(TESTFN2, 'r') as f:
241 self.assertEqual(f.read(), "1")
242
Martin Panterbf19d162015-09-09 01:01:13 +0000243 def test_open_keywords(self):
244 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
245 dir_fd=None)
246 os.close(f)
247
248 def test_symlink_keywords(self):
249 symlink = support.get_attribute(os, "symlink")
250 try:
251 symlink(src='target', dst=support.TESTFN,
252 target_is_directory=False, dir_fd=None)
253 except (NotImplementedError, OSError):
254 pass # No OS support or unprivileged user
255
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257# Test attributes on return values from os.*stat* family.
258class StatAttributeTests(unittest.TestCase):
259 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200260 self.fname = support.TESTFN
261 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100262 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Serhiy Storchaka43767632013-11-03 21:31:38 +0200264 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000265 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result[stat.ST_SIZE], 3)
270 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 # Make sure all the attributes are there
273 members = dir(result)
274 for name in dir(stat):
275 if name[:3] == 'ST_':
276 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000277 if name.endswith("TIME"):
278 def trunc(x): return int(x)
279 else:
280 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000281 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Larry Hastings6fe20b32012-04-19 15:07:49 -0700285 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700286 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700287 for name in 'st_atime st_mtime st_ctime'.split():
288 floaty = int(getattr(result, name) * 100000)
289 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700290 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700291
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except IndexError:
296 pass
297
298 # Make sure that assignment fails
299 try:
300 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000302 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 pass
304
305 try:
306 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000308 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309 pass
310
311 try:
312 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200313 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000314 except AttributeError:
315 pass
316
317 # Use the stat_result constructor with a too-short tuple.
318 try:
319 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200320 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 except TypeError:
322 pass
323
Ezio Melotti42da6632011-03-15 05:18:48 +0200324 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 try:
326 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
327 except TypeError:
328 pass
329
Antoine Pitrou38425292010-09-21 18:19:07 +0000330 def test_stat_attributes(self):
331 self.check_stat_attributes(self.fname)
332
333 def test_stat_attributes_bytes(self):
334 try:
335 fname = self.fname.encode(sys.getfilesystemencoding())
336 except UnicodeEncodeError:
337 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700338 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000339
Christian Heimes25827622013-10-12 01:27:08 +0200340 def test_stat_result_pickle(self):
341 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200342 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
343 p = pickle.dumps(result, proto)
344 self.assertIn(b'stat_result', p)
345 if proto < 4:
346 self.assertIn(b'cos\nstat_result\n', p)
347 unpickled = pickle.loads(p)
348 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200349
Serhiy Storchaka43767632013-11-03 21:31:38 +0200350 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000352 try:
353 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000354 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000355 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200357 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000358
359 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000362 # Make sure all the attributes are there.
363 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
364 'ffree', 'favail', 'flag', 'namemax')
365 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000366 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000367
368 # Make sure that assignment really fails
369 try:
370 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000372 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000373 pass
374
375 try:
376 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except AttributeError:
379 pass
380
381 # Use the constructor with a too-short tuple.
382 try:
383 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200384 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385 except TypeError:
386 pass
387
Ezio Melotti42da6632011-03-15 05:18:48 +0200388 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 try:
390 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
391 except TypeError:
392 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000393
Christian Heimes25827622013-10-12 01:27:08 +0200394 @unittest.skipUnless(hasattr(os, 'statvfs'),
395 "need os.statvfs()")
396 def test_statvfs_result_pickle(self):
397 try:
398 result = os.statvfs(self.fname)
399 except OSError as e:
400 # On AtheOS, glibc always returns ENOSYS
401 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200402 self.skipTest('os.statvfs() failed with ENOSYS')
403
Serhiy Storchakabad12572014-12-15 14:03:42 +0200404 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405 p = pickle.dumps(result, proto)
406 self.assertIn(b'statvfs_result', p)
407 if proto < 4:
408 self.assertIn(b'cos\nstatvfs_result\n', p)
409 unpickled = pickle.loads(p)
410 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200411
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
413 def test_1686475(self):
414 # Verify that an open file can be stat'ed
415 try:
416 os.stat(r"c:\pagefile.sys")
417 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600418 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200419 except OSError as e:
420 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000421
Serhiy Storchaka43767632013-11-03 21:31:38 +0200422 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
423 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
424 def test_15261(self):
425 # Verify that stat'ing a closed fd does not cause crash
426 r, w = os.pipe()
427 try:
428 os.stat(r) # should not raise error
429 finally:
430 os.close(r)
431 os.close(w)
432 with self.assertRaises(OSError) as ctx:
433 os.stat(r)
434 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100435
Zachary Ware63f277b2014-06-19 09:46:37 -0500436 def check_file_attributes(self, result):
437 self.assertTrue(hasattr(result, 'st_file_attributes'))
438 self.assertTrue(isinstance(result.st_file_attributes, int))
439 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
440
441 @unittest.skipUnless(sys.platform == "win32",
442 "st_file_attributes is Win32 specific")
443 def test_file_attributes(self):
444 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
445 result = os.stat(self.fname)
446 self.check_file_attributes(result)
447 self.assertEqual(
448 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
449 0)
450
451 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200452 dirname = support.TESTFN + "dir"
453 os.mkdir(dirname)
454 self.addCleanup(os.rmdir, dirname)
455
456 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500457 self.check_file_attributes(result)
458 self.assertEqual(
459 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
460 stat.FILE_ATTRIBUTE_DIRECTORY)
461
Berker Peksag0b4dc482016-09-17 15:49:59 +0300462 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
463 def test_access_denied(self):
464 # Default to FindFirstFile WIN32_FIND_DATA when access is
465 # denied. See issue 28075.
466 # os.environ['TEMP'] should be located on a volume that
467 # supports file ACLs.
468 fname = os.path.join(os.environ['TEMP'], self.fname)
469 self.addCleanup(support.unlink, fname)
470 create_file(fname, b'ABC')
471 # Deny the right to [S]YNCHRONIZE on the file to
472 # force CreateFile to fail with ERROR_ACCESS_DENIED.
473 DETACHED_PROCESS = 8
474 subprocess.check_call(
475 ['icacls.exe', fname, '/deny', 'Users:(S)'],
476 creationflags=DETACHED_PROCESS
477 )
478 result = os.stat(fname)
479 self.assertNotEqual(result.st_size, 0)
480
Victor Stinner47aacc82015-06-12 17:26:23 +0200481
482class UtimeTests(unittest.TestCase):
483 def setUp(self):
484 self.dirname = support.TESTFN
485 self.fname = os.path.join(self.dirname, "f1")
486
487 self.addCleanup(support.rmtree, self.dirname)
488 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100489 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200490
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200491 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100492 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200493 os.stat_float_times(state)
494
Victor Stinner47aacc82015-06-12 17:26:23 +0200495 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100496 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200497 old_float_times = os.stat_float_times(-1)
498 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200499
500 os.stat_float_times(True)
501
502 def support_subsecond(self, filename):
503 # Heuristic to check if the filesystem supports timestamp with
504 # subsecond resolution: check if float and int timestamps are different
505 st = os.stat(filename)
506 return ((st.st_atime != st[7])
507 or (st.st_mtime != st[8])
508 or (st.st_ctime != st[9]))
509
510 def _test_utime(self, set_time, filename=None):
511 if not filename:
512 filename = self.fname
513
514 support_subsecond = self.support_subsecond(filename)
515 if support_subsecond:
516 # Timestamp with a resolution of 1 microsecond (10^-6).
517 #
518 # The resolution of the C internal function used by os.utime()
519 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
520 # test with a resolution of 1 ns requires more work:
521 # see the issue #15745.
522 atime_ns = 1002003000 # 1.002003 seconds
523 mtime_ns = 4005006000 # 4.005006 seconds
524 else:
525 # use a resolution of 1 second
526 atime_ns = 5 * 10**9
527 mtime_ns = 8 * 10**9
528
529 set_time(filename, (atime_ns, mtime_ns))
530 st = os.stat(filename)
531
532 if support_subsecond:
533 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
534 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
535 else:
536 self.assertEqual(st.st_atime, atime_ns * 1e-9)
537 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
538 self.assertEqual(st.st_atime_ns, atime_ns)
539 self.assertEqual(st.st_mtime_ns, mtime_ns)
540
541 def test_utime(self):
542 def set_time(filename, ns):
543 # test the ns keyword parameter
544 os.utime(filename, ns=ns)
545 self._test_utime(set_time)
546
547 @staticmethod
548 def ns_to_sec(ns):
549 # Convert a number of nanosecond (int) to a number of seconds (float).
550 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
551 # issue, os.utime() rounds towards minus infinity.
552 return (ns * 1e-9) + 0.5e-9
553
554 def test_utime_by_indexed(self):
555 # pass times as floating point seconds as the second indexed parameter
556 def set_time(filename, ns):
557 atime_ns, mtime_ns = ns
558 atime = self.ns_to_sec(atime_ns)
559 mtime = self.ns_to_sec(mtime_ns)
560 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
561 # or utime(time_t)
562 os.utime(filename, (atime, mtime))
563 self._test_utime(set_time)
564
565 def test_utime_by_times(self):
566 def set_time(filename, ns):
567 atime_ns, mtime_ns = ns
568 atime = self.ns_to_sec(atime_ns)
569 mtime = self.ns_to_sec(mtime_ns)
570 # test the times keyword parameter
571 os.utime(filename, times=(atime, mtime))
572 self._test_utime(set_time)
573
574 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
575 "follow_symlinks support for utime required "
576 "for this test.")
577 def test_utime_nofollow_symlinks(self):
578 def set_time(filename, ns):
579 # use follow_symlinks=False to test utimensat(timespec)
580 # or lutimes(timeval)
581 os.utime(filename, ns=ns, follow_symlinks=False)
582 self._test_utime(set_time)
583
584 @unittest.skipUnless(os.utime in os.supports_fd,
585 "fd support for utime required for this test.")
586 def test_utime_fd(self):
587 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100588 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200589 # use a file descriptor to test futimens(timespec)
590 # or futimes(timeval)
591 os.utime(fp.fileno(), ns=ns)
592 self._test_utime(set_time)
593
594 @unittest.skipUnless(os.utime in os.supports_dir_fd,
595 "dir_fd support for utime required for this test.")
596 def test_utime_dir_fd(self):
597 def set_time(filename, ns):
598 dirname, name = os.path.split(filename)
599 dirfd = os.open(dirname, os.O_RDONLY)
600 try:
601 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
602 os.utime(name, dir_fd=dirfd, ns=ns)
603 finally:
604 os.close(dirfd)
605 self._test_utime(set_time)
606
607 def test_utime_directory(self):
608 def set_time(filename, ns):
609 # test calling os.utime() on a directory
610 os.utime(filename, ns=ns)
611 self._test_utime(set_time, filename=self.dirname)
612
613 def _test_utime_current(self, set_time):
614 # Get the system clock
615 current = time.time()
616
617 # Call os.utime() to set the timestamp to the current system clock
618 set_time(self.fname)
619
620 if not self.support_subsecond(self.fname):
621 delta = 1.0
622 else:
623 # On Windows, the usual resolution of time.time() is 15.6 ms
624 delta = 0.020
625 st = os.stat(self.fname)
626 msg = ("st_time=%r, current=%r, dt=%r"
627 % (st.st_mtime, current, st.st_mtime - current))
628 self.assertAlmostEqual(st.st_mtime, current,
629 delta=delta, msg=msg)
630
631 def test_utime_current(self):
632 def set_time(filename):
633 # Set to the current time in the new way
634 os.utime(self.fname)
635 self._test_utime_current(set_time)
636
637 def test_utime_current_old(self):
638 def set_time(filename):
639 # Set to the current time in the old explicit way.
640 os.utime(self.fname, None)
641 self._test_utime_current(set_time)
642
643 def get_file_system(self, path):
644 if sys.platform == 'win32':
645 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
646 import ctypes
647 kernel32 = ctypes.windll.kernel32
648 buf = ctypes.create_unicode_buffer("", 100)
649 ok = kernel32.GetVolumeInformationW(root, None, 0,
650 None, None, None,
651 buf, len(buf))
652 if ok:
653 return buf.value
654 # return None if the filesystem is unknown
655
656 def test_large_time(self):
657 # Many filesystems are limited to the year 2038. At least, the test
658 # pass with NTFS filesystem.
659 if self.get_file_system(self.dirname) != "NTFS":
660 self.skipTest("requires NTFS")
661
662 large = 5000000000 # some day in 2128
663 os.utime(self.fname, (large, large))
664 self.assertEqual(os.stat(self.fname).st_mtime, large)
665
666 def test_utime_invalid_arguments(self):
667 # seconds and nanoseconds parameters are mutually exclusive
668 with self.assertRaises(ValueError):
669 os.utime(self.fname, (5, 5), ns=(5, 5))
670
671
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000672from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000673
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000674class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000675 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000676 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000677
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678 def setUp(self):
679 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000680 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000681 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000682 for key, value in self._reference().items():
683 os.environ[key] = value
684
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000685 def tearDown(self):
686 os.environ.clear()
687 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000688 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000689 os.environb.clear()
690 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000691
Christian Heimes90333392007-11-01 19:08:42 +0000692 def _reference(self):
693 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
694
695 def _empty_mapping(self):
696 os.environ.clear()
697 return os.environ
698
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000699 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200700 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
701 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000702 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000703 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300704 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200705 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300706 value = popen.read().strip()
707 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000708
Xavier de Gayed1415312016-07-22 12:15:29 +0200709 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
710 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000711 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200712 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
713 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300714 it = iter(popen)
715 self.assertEqual(next(it), "line1\n")
716 self.assertEqual(next(it), "line2\n")
717 self.assertEqual(next(it), "line3\n")
718 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000719
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000720 # Verify environ keys and values from the OS are of the
721 # correct str type.
722 def test_keyvalue_types(self):
723 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000724 self.assertEqual(type(key), str)
725 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000726
Christian Heimes90333392007-11-01 19:08:42 +0000727 def test_items(self):
728 for key, value in self._reference().items():
729 self.assertEqual(os.environ.get(key), value)
730
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000731 # Issue 7310
732 def test___repr__(self):
733 """Check that the repr() of os.environ looks like environ({...})."""
734 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000735 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
736 '{!r}: {!r}'.format(key, value)
737 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000738
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000739 def test_get_exec_path(self):
740 defpath_list = os.defpath.split(os.pathsep)
741 test_path = ['/monty', '/python', '', '/flying/circus']
742 test_env = {'PATH': os.pathsep.join(test_path)}
743
744 saved_environ = os.environ
745 try:
746 os.environ = dict(test_env)
747 # Test that defaulting to os.environ works.
748 self.assertSequenceEqual(test_path, os.get_exec_path())
749 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
750 finally:
751 os.environ = saved_environ
752
753 # No PATH environment variable
754 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
755 # Empty PATH environment variable
756 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
757 # Supplied PATH environment variable
758 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
759
Victor Stinnerb745a742010-05-18 17:17:23 +0000760 if os.supports_bytes_environ:
761 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000762 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000763 # ignore BytesWarning warning
764 with warnings.catch_warnings(record=True):
765 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000766 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000767 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000768 pass
769 else:
770 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000771
772 # bytes key and/or value
773 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
774 ['abc'])
775 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
776 ['abc'])
777 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
778 ['abc'])
779
780 @unittest.skipUnless(os.supports_bytes_environ,
781 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000782 def test_environb(self):
783 # os.environ -> os.environb
784 value = 'euro\u20ac'
785 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000786 value_bytes = value.encode(sys.getfilesystemencoding(),
787 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000788 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000789 msg = "U+20AC character is not encodable to %s" % (
790 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000791 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000792 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000793 self.assertEqual(os.environ['unicode'], value)
794 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000795
796 # os.environb -> os.environ
797 value = b'\xff'
798 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000799 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000801 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000802
Charles-François Natali2966f102011-11-26 11:32:46 +0100803 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
804 # #13415).
805 @support.requires_freebsd_version(7)
806 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100807 def test_unset_error(self):
808 if sys.platform == "win32":
809 # an environment variable is limited to 32,767 characters
810 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100811 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100812 else:
813 # "=" is not allowed in a variable name
814 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100815 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100816
Victor Stinner6d101392013-04-14 16:35:04 +0200817 def test_key_type(self):
818 missing = 'missingkey'
819 self.assertNotIn(missing, os.environ)
820
Victor Stinner839e5ea2013-04-14 16:43:03 +0200821 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200822 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200823 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200824 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200825
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200827 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200828 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200829 self.assertTrue(cm.exception.__suppress_context__)
830
Victor Stinner6d101392013-04-14 16:35:04 +0200831
Tim Petersc4e09402003-04-25 07:11:48 +0000832class WalkTests(unittest.TestCase):
833 """Tests for os.walk()."""
834
Victor Stinner0561c532015-03-12 10:28:24 +0100835 # Wrapper to hide minor differences between os.walk and os.fwalk
836 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200837 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200838 if 'follow_symlinks' in kwargs:
839 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200840 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100841
Charles-François Natali7372b062012-02-05 15:15:38 +0100842 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100843 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100844 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000845
846 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000847 # TESTFN/
848 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000849 # tmp1
850 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000851 # tmp2
852 # SUB11/ no kids
853 # SUB2/ a file kid and a dirsymlink kid
854 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300855 # SUB21/ not readable
856 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000857 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200858 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300859 # broken_link2
860 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000861 # TEST2/
862 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100863 self.walk_path = join(support.TESTFN, "TEST1")
864 self.sub1_path = join(self.walk_path, "SUB1")
865 self.sub11_path = join(self.sub1_path, "SUB11")
866 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300867 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100868 tmp1_path = join(self.walk_path, "tmp1")
869 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000870 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300871 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100872 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000873 t2_path = join(support.TESTFN, "TEST2")
874 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200875 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300876 broken_link2_path = join(sub2_path, "broken_link2")
877 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000878
879 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100880 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000881 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300882 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000883 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100884
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300885 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100886 with open(path, "x") as f:
887 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000888
Victor Stinner0561c532015-03-12 10:28:24 +0100889 if support.can_symlink():
890 os.symlink(os.path.abspath(t2_path), self.link_path)
891 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300892 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
893 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300894 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300895 ["broken_link", "broken_link2", "broken_link3",
896 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100897 else:
898 self.sub2_tree = (sub2_path, [], ["tmp3"])
899
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300900 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300901 try:
902 os.listdir(sub21_path)
903 except PermissionError:
904 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
905 else:
906 os.chmod(sub21_path, stat.S_IRWXU)
907 os.unlink(tmp5_path)
908 os.rmdir(sub21_path)
909 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300910
Victor Stinner0561c532015-03-12 10:28:24 +0100911 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000912 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300913 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100914
Tim Petersc4e09402003-04-25 07:11:48 +0000915 self.assertEqual(len(all), 4)
916 # We can't know which order SUB1 and SUB2 will appear in.
917 # Not flipped: TESTFN, SUB1, SUB11, SUB2
918 # flipped: TESTFN, SUB2, SUB1, SUB11
919 flipped = all[0][1][0] != "SUB1"
920 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200921 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300922 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100923 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
924 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
925 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
926 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000927
Brett Cannon3f9183b2016-08-26 14:44:48 -0700928 def test_walk_prune(self, walk_path=None):
929 if walk_path is None:
930 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000931 # Prune the search.
932 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700933 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000934 all.append((root, dirs, files))
935 # Don't descend into SUB1.
936 if 'SUB1' in dirs:
937 # Note that this also mutates the dirs we appended to all!
938 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000939
Victor Stinner0561c532015-03-12 10:28:24 +0100940 self.assertEqual(len(all), 2)
941 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700942 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100943
944 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300945 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100946 self.assertEqual(all[1], self.sub2_tree)
947
Brett Cannon3f9183b2016-08-26 14:44:48 -0700948 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700949 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700950
Victor Stinner0561c532015-03-12 10:28:24 +0100951 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000952 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100953 all = list(self.walk(self.walk_path, topdown=False))
954
Victor Stinner53b0a412016-03-26 01:12:36 +0100955 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000956 # We can't know which order SUB1 and SUB2 will appear in.
957 # Not flipped: SUB11, SUB1, SUB2, TESTFN
958 # flipped: SUB2, SUB11, SUB1, TESTFN
959 flipped = all[3][1][0] != "SUB1"
960 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200961 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300962 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100963 self.assertEqual(all[3],
964 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
965 self.assertEqual(all[flipped],
966 (self.sub11_path, [], []))
967 self.assertEqual(all[flipped + 1],
968 (self.sub1_path, ["SUB11"], ["tmp2"]))
969 self.assertEqual(all[2 - 2 * flipped],
970 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000971
Victor Stinner0561c532015-03-12 10:28:24 +0100972 def test_walk_symlink(self):
973 if not support.can_symlink():
974 self.skipTest("need symlink support")
975
976 # Walk, following symlinks.
977 walk_it = self.walk(self.walk_path, follow_symlinks=True)
978 for root, dirs, files in walk_it:
979 if root == self.link_path:
980 self.assertEqual(dirs, [])
981 self.assertEqual(files, ["tmp4"])
982 break
983 else:
984 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000985
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200986 def test_walk_bad_dir(self):
987 # Walk top-down.
988 errors = []
989 walk_it = self.walk(self.walk_path, onerror=errors.append)
990 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300991 self.assertEqual(errors, [])
992 dir1 = 'SUB1'
993 path1 = os.path.join(root, dir1)
994 path1new = os.path.join(root, dir1 + '.new')
995 os.rename(path1, path1new)
996 try:
997 roots = [r for r, d, f in walk_it]
998 self.assertTrue(errors)
999 self.assertNotIn(path1, roots)
1000 self.assertNotIn(path1new, roots)
1001 for dir2 in dirs:
1002 if dir2 != dir1:
1003 self.assertIn(os.path.join(root, dir2), roots)
1004 finally:
1005 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001006
Charles-François Natali7372b062012-02-05 15:15:38 +01001007
1008@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1009class FwalkTests(WalkTests):
1010 """Tests for os.fwalk()."""
1011
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001012 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001013 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001014 yield (root, dirs, files)
1015
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001016 def fwalk(self, *args, **kwargs):
1017 return os.fwalk(*args, **kwargs)
1018
Larry Hastingsc48fe982012-06-25 04:49:05 -07001019 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1020 """
1021 compare with walk() results.
1022 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001023 walk_kwargs = walk_kwargs.copy()
1024 fwalk_kwargs = fwalk_kwargs.copy()
1025 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1026 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1027 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001028
Charles-François Natali7372b062012-02-05 15:15:38 +01001029 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001030 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001031 expected[root] = (set(dirs), set(files))
1032
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001033 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001034 self.assertIn(root, expected)
1035 self.assertEqual(expected[root], (set(dirs), set(files)))
1036
Larry Hastingsc48fe982012-06-25 04:49:05 -07001037 def test_compare_to_walk(self):
1038 kwargs = {'top': support.TESTFN}
1039 self._compare_to_walk(kwargs, kwargs)
1040
Charles-François Natali7372b062012-02-05 15:15:38 +01001041 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001042 try:
1043 fd = os.open(".", os.O_RDONLY)
1044 walk_kwargs = {'top': support.TESTFN}
1045 fwalk_kwargs = walk_kwargs.copy()
1046 fwalk_kwargs['dir_fd'] = fd
1047 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1048 finally:
1049 os.close(fd)
1050
1051 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001052 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001053 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1054 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001055 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 # check that the FD is valid
1057 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001058 # redundant check
1059 os.stat(rootfd)
1060 # check that listdir() returns consistent information
1061 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001062
1063 def test_fd_leak(self):
1064 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1065 # we both check that calling fwalk() a large number of times doesn't
1066 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1067 minfd = os.dup(1)
1068 os.close(minfd)
1069 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001070 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001071 pass
1072 newfd = os.dup(1)
1073 self.addCleanup(os.close, newfd)
1074 self.assertEqual(newfd, minfd)
1075
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001076class BytesWalkTests(WalkTests):
1077 """Tests for os.walk() with bytes."""
1078 def walk(self, top, **kwargs):
1079 if 'follow_symlinks' in kwargs:
1080 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1081 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1082 root = os.fsdecode(broot)
1083 dirs = list(map(os.fsdecode, bdirs))
1084 files = list(map(os.fsdecode, bfiles))
1085 yield (root, dirs, files)
1086 bdirs[:] = list(map(os.fsencode, dirs))
1087 bfiles[:] = list(map(os.fsencode, files))
1088
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001089@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1090class BytesFwalkTests(FwalkTests):
1091 """Tests for os.walk() with bytes."""
1092 def fwalk(self, top='.', *args, **kwargs):
1093 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1094 root = os.fsdecode(broot)
1095 dirs = list(map(os.fsdecode, bdirs))
1096 files = list(map(os.fsdecode, bfiles))
1097 yield (root, dirs, files, topfd)
1098 bdirs[:] = list(map(os.fsencode, dirs))
1099 bfiles[:] = list(map(os.fsencode, files))
1100
Charles-François Natali7372b062012-02-05 15:15:38 +01001101
Guido van Rossume7ba4952007-06-06 23:52:48 +00001102class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001103 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001104 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001105
1106 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001107 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001108 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1109 os.makedirs(path) # Should work
1110 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1111 os.makedirs(path)
1112
1113 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001114 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001115 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1116 os.makedirs(path)
1117 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1118 'dir5', 'dir6')
1119 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001120
Terry Reedy5a22b652010-12-02 07:05:56 +00001121 def test_exist_ok_existing_directory(self):
1122 path = os.path.join(support.TESTFN, 'dir1')
1123 mode = 0o777
1124 old_mask = os.umask(0o022)
1125 os.makedirs(path, mode)
1126 self.assertRaises(OSError, os.makedirs, path, mode)
1127 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001128 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001129 os.makedirs(path, mode=mode, exist_ok=True)
1130 os.umask(old_mask)
1131
Martin Pantera82642f2015-11-19 04:48:44 +00001132 # Issue #25583: A drive root could raise PermissionError on Windows
1133 os.makedirs(os.path.abspath('/'), exist_ok=True)
1134
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001135 def test_exist_ok_s_isgid_directory(self):
1136 path = os.path.join(support.TESTFN, 'dir1')
1137 S_ISGID = stat.S_ISGID
1138 mode = 0o777
1139 old_mask = os.umask(0o022)
1140 try:
1141 existing_testfn_mode = stat.S_IMODE(
1142 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001143 try:
1144 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001145 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001146 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001147 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1148 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1149 # The os should apply S_ISGID from the parent dir for us, but
1150 # this test need not depend on that behavior. Be explicit.
1151 os.makedirs(path, mode | S_ISGID)
1152 # http://bugs.python.org/issue14992
1153 # Should not fail when the bit is already set.
1154 os.makedirs(path, mode, exist_ok=True)
1155 # remove the bit.
1156 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001157 # May work even when the bit is not already set when demanded.
1158 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001159 finally:
1160 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001161
1162 def test_exist_ok_existing_regular_file(self):
1163 base = support.TESTFN
1164 path = os.path.join(support.TESTFN, 'dir1')
1165 f = open(path, 'w')
1166 f.write('abc')
1167 f.close()
1168 self.assertRaises(OSError, os.makedirs, path)
1169 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1170 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1171 os.remove(path)
1172
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001173 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001174 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001175 'dir4', 'dir5', 'dir6')
1176 # If the tests failed, the bottom-most directory ('../dir6')
1177 # may not have been created, so we look for the outermost directory
1178 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001179 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001180 path = os.path.dirname(path)
1181
1182 os.removedirs(path)
1183
Andrew Svetlov405faed2012-12-25 12:18:09 +02001184
R David Murrayf2ad1732014-12-25 18:36:56 -05001185@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1186class ChownFileTests(unittest.TestCase):
1187
Berker Peksag036a71b2015-07-21 09:29:48 +03001188 @classmethod
1189 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001190 os.mkdir(support.TESTFN)
1191
1192 def test_chown_uid_gid_arguments_must_be_index(self):
1193 stat = os.stat(support.TESTFN)
1194 uid = stat.st_uid
1195 gid = stat.st_gid
1196 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1197 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1198 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1199 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1200 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1201
1202 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1203 def test_chown(self):
1204 gid_1, gid_2 = groups[:2]
1205 uid = os.stat(support.TESTFN).st_uid
1206 os.chown(support.TESTFN, uid, gid_1)
1207 gid = os.stat(support.TESTFN).st_gid
1208 self.assertEqual(gid, gid_1)
1209 os.chown(support.TESTFN, uid, gid_2)
1210 gid = os.stat(support.TESTFN).st_gid
1211 self.assertEqual(gid, gid_2)
1212
1213 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1214 "test needs root privilege and more than one user")
1215 def test_chown_with_root(self):
1216 uid_1, uid_2 = all_users[:2]
1217 gid = os.stat(support.TESTFN).st_gid
1218 os.chown(support.TESTFN, uid_1, gid)
1219 uid = os.stat(support.TESTFN).st_uid
1220 self.assertEqual(uid, uid_1)
1221 os.chown(support.TESTFN, uid_2, gid)
1222 uid = os.stat(support.TESTFN).st_uid
1223 self.assertEqual(uid, uid_2)
1224
1225 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1226 "test needs non-root account and more than one user")
1227 def test_chown_without_permission(self):
1228 uid_1, uid_2 = all_users[:2]
1229 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001230 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001231 os.chown(support.TESTFN, uid_1, gid)
1232 os.chown(support.TESTFN, uid_2, gid)
1233
Berker Peksag036a71b2015-07-21 09:29:48 +03001234 @classmethod
1235 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001236 os.rmdir(support.TESTFN)
1237
1238
Andrew Svetlov405faed2012-12-25 12:18:09 +02001239class RemoveDirsTests(unittest.TestCase):
1240 def setUp(self):
1241 os.makedirs(support.TESTFN)
1242
1243 def tearDown(self):
1244 support.rmtree(support.TESTFN)
1245
1246 def test_remove_all(self):
1247 dira = os.path.join(support.TESTFN, 'dira')
1248 os.mkdir(dira)
1249 dirb = os.path.join(dira, 'dirb')
1250 os.mkdir(dirb)
1251 os.removedirs(dirb)
1252 self.assertFalse(os.path.exists(dirb))
1253 self.assertFalse(os.path.exists(dira))
1254 self.assertFalse(os.path.exists(support.TESTFN))
1255
1256 def test_remove_partial(self):
1257 dira = os.path.join(support.TESTFN, 'dira')
1258 os.mkdir(dira)
1259 dirb = os.path.join(dira, 'dirb')
1260 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001261 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001262 os.removedirs(dirb)
1263 self.assertFalse(os.path.exists(dirb))
1264 self.assertTrue(os.path.exists(dira))
1265 self.assertTrue(os.path.exists(support.TESTFN))
1266
1267 def test_remove_nothing(self):
1268 dira = os.path.join(support.TESTFN, 'dira')
1269 os.mkdir(dira)
1270 dirb = os.path.join(dira, 'dirb')
1271 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001272 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001273 with self.assertRaises(OSError):
1274 os.removedirs(dirb)
1275 self.assertTrue(os.path.exists(dirb))
1276 self.assertTrue(os.path.exists(dira))
1277 self.assertTrue(os.path.exists(support.TESTFN))
1278
1279
Guido van Rossume7ba4952007-06-06 23:52:48 +00001280class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001281 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001282 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001283 f.write(b'hello')
1284 f.close()
1285 with open(os.devnull, 'rb') as f:
1286 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001287
Andrew Svetlov405faed2012-12-25 12:18:09 +02001288
Guido van Rossume7ba4952007-06-06 23:52:48 +00001289class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001290 def test_urandom_length(self):
1291 self.assertEqual(len(os.urandom(0)), 0)
1292 self.assertEqual(len(os.urandom(1)), 1)
1293 self.assertEqual(len(os.urandom(10)), 10)
1294 self.assertEqual(len(os.urandom(100)), 100)
1295 self.assertEqual(len(os.urandom(1000)), 1000)
1296
1297 def test_urandom_value(self):
1298 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001299 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001300 data2 = os.urandom(16)
1301 self.assertNotEqual(data1, data2)
1302
1303 def get_urandom_subprocess(self, count):
1304 code = '\n'.join((
1305 'import os, sys',
1306 'data = os.urandom(%s)' % count,
1307 'sys.stdout.buffer.write(data)',
1308 'sys.stdout.buffer.flush()'))
1309 out = assert_python_ok('-c', code)
1310 stdout = out[1]
1311 self.assertEqual(len(stdout), 16)
1312 return stdout
1313
1314 def test_urandom_subprocess(self):
1315 data1 = self.get_urandom_subprocess(16)
1316 data2 = self.get_urandom_subprocess(16)
1317 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001318
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001319
Victor Stinner9b1f4742016-09-06 16:18:52 -07001320@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1321class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001322 @classmethod
1323 def setUpClass(cls):
1324 try:
1325 os.getrandom(1)
1326 except OSError as exc:
1327 if exc.errno == errno.ENOSYS:
1328 # Python compiled on a more recent Linux version
1329 # than the current Linux kernel
1330 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1331 else:
1332 raise
1333
Victor Stinner9b1f4742016-09-06 16:18:52 -07001334 def test_getrandom_type(self):
1335 data = os.getrandom(16)
1336 self.assertIsInstance(data, bytes)
1337 self.assertEqual(len(data), 16)
1338
1339 def test_getrandom0(self):
1340 empty = os.getrandom(0)
1341 self.assertEqual(empty, b'')
1342
1343 def test_getrandom_random(self):
1344 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1345
1346 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1347 # resource /dev/random
1348
1349 def test_getrandom_nonblock(self):
1350 # The call must not fail. Check also that the flag exists
1351 try:
1352 os.getrandom(1, os.GRND_NONBLOCK)
1353 except BlockingIOError:
1354 # System urandom is not initialized yet
1355 pass
1356
1357 def test_getrandom_value(self):
1358 data1 = os.getrandom(16)
1359 data2 = os.getrandom(16)
1360 self.assertNotEqual(data1, data2)
1361
1362
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001363# os.urandom() doesn't use a file descriptor when it is implemented with the
1364# getentropy() function, the getrandom() function or the getrandom() syscall
1365OS_URANDOM_DONT_USE_FD = (
1366 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1367 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1368 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001369
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001370@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1371 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001372class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001373 @unittest.skipUnless(resource, "test requires the resource module")
1374 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001375 # Check urandom() failing when it is not able to open /dev/random.
1376 # We spawn a new process to make the test more robust (if getrlimit()
1377 # failed to restore the file descriptor limit after this, the whole
1378 # test suite would crash; this actually happened on the OS X Tiger
1379 # buildbot).
1380 code = """if 1:
1381 import errno
1382 import os
1383 import resource
1384
1385 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1386 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1387 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001388 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001389 except OSError as e:
1390 assert e.errno == errno.EMFILE, e.errno
1391 else:
1392 raise AssertionError("OSError not raised")
1393 """
1394 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001395
Antoine Pitroue472aea2014-04-26 14:33:03 +02001396 def test_urandom_fd_closed(self):
1397 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1398 # closed.
1399 code = """if 1:
1400 import os
1401 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001402 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001403 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001404 with test.support.SuppressCrashReport():
1405 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001406 sys.stdout.buffer.write(os.urandom(4))
1407 """
1408 rc, out, err = assert_python_ok('-Sc', code)
1409
1410 def test_urandom_fd_reopened(self):
1411 # Issue #21207: urandom() should detect its fd to /dev/urandom
1412 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001413 self.addCleanup(support.unlink, support.TESTFN)
1414 create_file(support.TESTFN, b"x" * 256)
1415
Antoine Pitroue472aea2014-04-26 14:33:03 +02001416 code = """if 1:
1417 import os
1418 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001419 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001420 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001421 with test.support.SuppressCrashReport():
1422 for fd in range(3, 256):
1423 try:
1424 os.close(fd)
1425 except OSError:
1426 pass
1427 else:
1428 # Found the urandom fd (XXX hopefully)
1429 break
1430 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001431 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001432 new_fd = f.fileno()
1433 # Issue #26935: posix allows new_fd and fd to be equal but
1434 # some libc implementations have dup2 return an error in this
1435 # case.
1436 if new_fd != fd:
1437 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001438 sys.stdout.buffer.write(os.urandom(4))
1439 sys.stdout.buffer.write(os.urandom(4))
1440 """.format(TESTFN=support.TESTFN)
1441 rc, out, err = assert_python_ok('-Sc', code)
1442 self.assertEqual(len(out), 8)
1443 self.assertNotEqual(out[0:4], out[4:8])
1444 rc, out2, err2 = assert_python_ok('-Sc', code)
1445 self.assertEqual(len(out2), 8)
1446 self.assertNotEqual(out2, out)
1447
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001448
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001449@contextlib.contextmanager
1450def _execvpe_mockup(defpath=None):
1451 """
1452 Stubs out execv and execve functions when used as context manager.
1453 Records exec calls. The mock execv and execve functions always raise an
1454 exception as they would normally never return.
1455 """
1456 # A list of tuples containing (function name, first arg, args)
1457 # of calls to execv or execve that have been made.
1458 calls = []
1459
1460 def mock_execv(name, *args):
1461 calls.append(('execv', name, args))
1462 raise RuntimeError("execv called")
1463
1464 def mock_execve(name, *args):
1465 calls.append(('execve', name, args))
1466 raise OSError(errno.ENOTDIR, "execve called")
1467
1468 try:
1469 orig_execv = os.execv
1470 orig_execve = os.execve
1471 orig_defpath = os.defpath
1472 os.execv = mock_execv
1473 os.execve = mock_execve
1474 if defpath is not None:
1475 os.defpath = defpath
1476 yield calls
1477 finally:
1478 os.execv = orig_execv
1479 os.execve = orig_execve
1480 os.defpath = orig_defpath
1481
Victor Stinner4659ccf2016-09-14 10:57:00 +02001482
Guido van Rossume7ba4952007-06-06 23:52:48 +00001483class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001484 @unittest.skipIf(USING_LINUXTHREADS,
1485 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001486 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001487 self.assertRaises(OSError, os.execvpe, 'no such app-',
1488 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001489
Steve Dowerbce26262016-11-19 19:17:26 -08001490 def test_execv_with_bad_arglist(self):
1491 self.assertRaises(ValueError, os.execv, 'notepad', ())
1492 self.assertRaises(ValueError, os.execv, 'notepad', [])
1493 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1494 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1495
Thomas Heller6790d602007-08-30 17:15:14 +00001496 def test_execvpe_with_bad_arglist(self):
1497 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001498 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1499 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001500
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001501 @unittest.skipUnless(hasattr(os, '_execvpe'),
1502 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001503 def _test_internal_execvpe(self, test_type):
1504 program_path = os.sep + 'absolutepath'
1505 if test_type is bytes:
1506 program = b'executable'
1507 fullpath = os.path.join(os.fsencode(program_path), program)
1508 native_fullpath = fullpath
1509 arguments = [b'progname', 'arg1', 'arg2']
1510 else:
1511 program = 'executable'
1512 arguments = ['progname', 'arg1', 'arg2']
1513 fullpath = os.path.join(program_path, program)
1514 if os.name != "nt":
1515 native_fullpath = os.fsencode(fullpath)
1516 else:
1517 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001518 env = {'spam': 'beans'}
1519
Victor Stinnerb745a742010-05-18 17:17:23 +00001520 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001521 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001522 self.assertRaises(RuntimeError,
1523 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001524 self.assertEqual(len(calls), 1)
1525 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1526
Victor Stinnerb745a742010-05-18 17:17:23 +00001527 # test os._execvpe() with a relative path:
1528 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001529 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001530 self.assertRaises(OSError,
1531 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001532 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001533 self.assertSequenceEqual(calls[0],
1534 ('execve', native_fullpath, (arguments, env)))
1535
1536 # test os._execvpe() with a relative path:
1537 # os.get_exec_path() reads the 'PATH' variable
1538 with _execvpe_mockup() as calls:
1539 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001540 if test_type is bytes:
1541 env_path[b'PATH'] = program_path
1542 else:
1543 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001544 self.assertRaises(OSError,
1545 os._execvpe, program, arguments, env=env_path)
1546 self.assertEqual(len(calls), 1)
1547 self.assertSequenceEqual(calls[0],
1548 ('execve', native_fullpath, (arguments, env_path)))
1549
1550 def test_internal_execvpe_str(self):
1551 self._test_internal_execvpe(str)
1552 if os.name != "nt":
1553 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001554
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001555
Serhiy Storchaka43767632013-11-03 21:31:38 +02001556@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001557class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001558 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001559 try:
1560 os.stat(support.TESTFN)
1561 except FileNotFoundError:
1562 exists = False
1563 except OSError as exc:
1564 exists = True
1565 self.fail("file %s must not exist; os.stat failed with %s"
1566 % (support.TESTFN, exc))
1567 else:
1568 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001569
Thomas Wouters477c8d52006-05-27 19:21:47 +00001570 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001571 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001572
1573 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001574 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001575
1576 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001577 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001578
1579 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001580 self.addCleanup(support.unlink, support.TESTFN)
1581
Victor Stinnere77c9742016-03-25 10:28:23 +01001582 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001583 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001584
1585 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001586 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001587
Thomas Wouters477c8d52006-05-27 19:21:47 +00001588 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001589 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001590
Victor Stinnere77c9742016-03-25 10:28:23 +01001591
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001592class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001593 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001594 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1595 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001596 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001597 def get_single(f):
1598 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001599 if hasattr(os, f):
1600 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001601 return helper
1602 for f in singles:
1603 locals()["test_"+f] = get_single(f)
1604
Benjamin Peterson7522c742009-01-19 21:00:09 +00001605 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001606 try:
1607 f(support.make_bad_fd(), *args)
1608 except OSError as e:
1609 self.assertEqual(e.errno, errno.EBADF)
1610 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001611 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001612 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001613
Serhiy Storchaka43767632013-11-03 21:31:38 +02001614 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001615 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001616 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001617
Serhiy Storchaka43767632013-11-03 21:31:38 +02001618 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001619 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001620 fd = support.make_bad_fd()
1621 # Make sure none of the descriptors we are about to close are
1622 # currently valid (issue 6542).
1623 for i in range(10):
1624 try: os.fstat(fd+i)
1625 except OSError:
1626 pass
1627 else:
1628 break
1629 if i < 2:
1630 raise unittest.SkipTest(
1631 "Unable to acquire a range of invalid file descriptors")
1632 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001633
Serhiy Storchaka43767632013-11-03 21:31:38 +02001634 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001635 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001636 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001637
Serhiy Storchaka43767632013-11-03 21:31:38 +02001638 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001639 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001640 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001641
Serhiy Storchaka43767632013-11-03 21:31:38 +02001642 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001643 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001644 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001645
Serhiy Storchaka43767632013-11-03 21:31:38 +02001646 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001647 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001648 self.check(os.pathconf, "PC_NAME_MAX")
1649 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001650
Serhiy Storchaka43767632013-11-03 21:31:38 +02001651 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001652 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001653 self.check(os.truncate, 0)
1654 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001655
Serhiy Storchaka43767632013-11-03 21:31:38 +02001656 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001657 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001658 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001659
Serhiy Storchaka43767632013-11-03 21:31:38 +02001660 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001661 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001662 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001663
Victor Stinner57ddf782014-01-08 15:21:28 +01001664 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1665 def test_readv(self):
1666 buf = bytearray(10)
1667 self.check(os.readv, [buf])
1668
Serhiy Storchaka43767632013-11-03 21:31:38 +02001669 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001670 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001671 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001672
Serhiy Storchaka43767632013-11-03 21:31:38 +02001673 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001674 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001675 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001676
Victor Stinner57ddf782014-01-08 15:21:28 +01001677 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1678 def test_writev(self):
1679 self.check(os.writev, [b'abc'])
1680
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001681 def test_inheritable(self):
1682 self.check(os.get_inheritable)
1683 self.check(os.set_inheritable, True)
1684
1685 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1686 'needs os.get_blocking() and os.set_blocking()')
1687 def test_blocking(self):
1688 self.check(os.get_blocking)
1689 self.check(os.set_blocking, True)
1690
Brian Curtin1b9df392010-11-24 20:24:31 +00001691
1692class LinkTests(unittest.TestCase):
1693 def setUp(self):
1694 self.file1 = support.TESTFN
1695 self.file2 = os.path.join(support.TESTFN + "2")
1696
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001697 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001698 for file in (self.file1, self.file2):
1699 if os.path.exists(file):
1700 os.unlink(file)
1701
Brian Curtin1b9df392010-11-24 20:24:31 +00001702 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001703 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001704
Steve Dowercc16be82016-09-08 10:35:16 -07001705 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001706 with open(file1, "r") as f1, open(file2, "r") as f2:
1707 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1708
1709 def test_link(self):
1710 self._test_link(self.file1, self.file2)
1711
1712 def test_link_bytes(self):
1713 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1714 bytes(self.file2, sys.getfilesystemencoding()))
1715
Brian Curtinf498b752010-11-30 15:54:04 +00001716 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001717 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001718 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001719 except UnicodeError:
1720 raise unittest.SkipTest("Unable to encode for this platform.")
1721
Brian Curtinf498b752010-11-30 15:54:04 +00001722 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001723 self.file2 = self.file1 + "2"
1724 self._test_link(self.file1, self.file2)
1725
Serhiy Storchaka43767632013-11-03 21:31:38 +02001726@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1727class PosixUidGidTests(unittest.TestCase):
1728 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1729 def test_setuid(self):
1730 if os.getuid() != 0:
1731 self.assertRaises(OSError, os.setuid, 0)
1732 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001733
Serhiy Storchaka43767632013-11-03 21:31:38 +02001734 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1735 def test_setgid(self):
1736 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1737 self.assertRaises(OSError, os.setgid, 0)
1738 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001739
Serhiy Storchaka43767632013-11-03 21:31:38 +02001740 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1741 def test_seteuid(self):
1742 if os.getuid() != 0:
1743 self.assertRaises(OSError, os.seteuid, 0)
1744 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001745
Serhiy Storchaka43767632013-11-03 21:31:38 +02001746 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1747 def test_setegid(self):
1748 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1749 self.assertRaises(OSError, os.setegid, 0)
1750 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001751
Serhiy Storchaka43767632013-11-03 21:31:38 +02001752 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1753 def test_setreuid(self):
1754 if os.getuid() != 0:
1755 self.assertRaises(OSError, os.setreuid, 0, 0)
1756 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1757 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001758
Serhiy Storchaka43767632013-11-03 21:31:38 +02001759 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1760 def test_setreuid_neg1(self):
1761 # Needs to accept -1. We run this in a subprocess to avoid
1762 # altering the test runner's process state (issue8045).
1763 subprocess.check_call([
1764 sys.executable, '-c',
1765 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001766
Serhiy Storchaka43767632013-11-03 21:31:38 +02001767 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1768 def test_setregid(self):
1769 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1770 self.assertRaises(OSError, os.setregid, 0, 0)
1771 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1772 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001773
Serhiy Storchaka43767632013-11-03 21:31:38 +02001774 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1775 def test_setregid_neg1(self):
1776 # Needs to accept -1. We run this in a subprocess to avoid
1777 # altering the test runner's process state (issue8045).
1778 subprocess.check_call([
1779 sys.executable, '-c',
1780 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001781
Serhiy Storchaka43767632013-11-03 21:31:38 +02001782@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1783class Pep383Tests(unittest.TestCase):
1784 def setUp(self):
1785 if support.TESTFN_UNENCODABLE:
1786 self.dir = support.TESTFN_UNENCODABLE
1787 elif support.TESTFN_NONASCII:
1788 self.dir = support.TESTFN_NONASCII
1789 else:
1790 self.dir = support.TESTFN
1791 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001792
Serhiy Storchaka43767632013-11-03 21:31:38 +02001793 bytesfn = []
1794 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001795 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001796 fn = os.fsencode(fn)
1797 except UnicodeEncodeError:
1798 return
1799 bytesfn.append(fn)
1800 add_filename(support.TESTFN_UNICODE)
1801 if support.TESTFN_UNENCODABLE:
1802 add_filename(support.TESTFN_UNENCODABLE)
1803 if support.TESTFN_NONASCII:
1804 add_filename(support.TESTFN_NONASCII)
1805 if not bytesfn:
1806 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001807
Serhiy Storchaka43767632013-11-03 21:31:38 +02001808 self.unicodefn = set()
1809 os.mkdir(self.dir)
1810 try:
1811 for fn in bytesfn:
1812 support.create_empty_file(os.path.join(self.bdir, fn))
1813 fn = os.fsdecode(fn)
1814 if fn in self.unicodefn:
1815 raise ValueError("duplicate filename")
1816 self.unicodefn.add(fn)
1817 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001818 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001819 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001820
Serhiy Storchaka43767632013-11-03 21:31:38 +02001821 def tearDown(self):
1822 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001823
Serhiy Storchaka43767632013-11-03 21:31:38 +02001824 def test_listdir(self):
1825 expected = self.unicodefn
1826 found = set(os.listdir(self.dir))
1827 self.assertEqual(found, expected)
1828 # test listdir without arguments
1829 current_directory = os.getcwd()
1830 try:
1831 os.chdir(os.sep)
1832 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1833 finally:
1834 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001835
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 def test_open(self):
1837 for fn in self.unicodefn:
1838 f = open(os.path.join(self.dir, fn), 'rb')
1839 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001840
Serhiy Storchaka43767632013-11-03 21:31:38 +02001841 @unittest.skipUnless(hasattr(os, 'statvfs'),
1842 "need os.statvfs()")
1843 def test_statvfs(self):
1844 # issue #9645
1845 for fn in self.unicodefn:
1846 # should not fail with file not found error
1847 fullname = os.path.join(self.dir, fn)
1848 os.statvfs(fullname)
1849
1850 def test_stat(self):
1851 for fn in self.unicodefn:
1852 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001853
Brian Curtineb24d742010-04-12 17:16:38 +00001854@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1855class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001856 def _kill(self, sig):
1857 # Start sys.executable as a subprocess and communicate from the
1858 # subprocess to the parent that the interpreter is ready. When it
1859 # becomes ready, send *sig* via os.kill to the subprocess and check
1860 # that the return code is equal to *sig*.
1861 import ctypes
1862 from ctypes import wintypes
1863 import msvcrt
1864
1865 # Since we can't access the contents of the process' stdout until the
1866 # process has exited, use PeekNamedPipe to see what's inside stdout
1867 # without waiting. This is done so we can tell that the interpreter
1868 # is started and running at a point where it could handle a signal.
1869 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1870 PeekNamedPipe.restype = wintypes.BOOL
1871 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1872 ctypes.POINTER(ctypes.c_char), # stdout buf
1873 wintypes.DWORD, # Buffer size
1874 ctypes.POINTER(wintypes.DWORD), # bytes read
1875 ctypes.POINTER(wintypes.DWORD), # bytes avail
1876 ctypes.POINTER(wintypes.DWORD)) # bytes left
1877 msg = "running"
1878 proc = subprocess.Popen([sys.executable, "-c",
1879 "import sys;"
1880 "sys.stdout.write('{}');"
1881 "sys.stdout.flush();"
1882 "input()".format(msg)],
1883 stdout=subprocess.PIPE,
1884 stderr=subprocess.PIPE,
1885 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001886 self.addCleanup(proc.stdout.close)
1887 self.addCleanup(proc.stderr.close)
1888 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001889
1890 count, max = 0, 100
1891 while count < max and proc.poll() is None:
1892 # Create a string buffer to store the result of stdout from the pipe
1893 buf = ctypes.create_string_buffer(len(msg))
1894 # Obtain the text currently in proc.stdout
1895 # Bytes read/avail/left are left as NULL and unused
1896 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1897 buf, ctypes.sizeof(buf), None, None, None)
1898 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1899 if buf.value:
1900 self.assertEqual(msg, buf.value.decode())
1901 break
1902 time.sleep(0.1)
1903 count += 1
1904 else:
1905 self.fail("Did not receive communication from the subprocess")
1906
Brian Curtineb24d742010-04-12 17:16:38 +00001907 os.kill(proc.pid, sig)
1908 self.assertEqual(proc.wait(), sig)
1909
1910 def test_kill_sigterm(self):
1911 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001912 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001913
1914 def test_kill_int(self):
1915 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001916 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001917
1918 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001919 tagname = "test_os_%s" % uuid.uuid1()
1920 m = mmap.mmap(-1, 1, tagname)
1921 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001922 # Run a script which has console control handling enabled.
1923 proc = subprocess.Popen([sys.executable,
1924 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001925 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001926 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1927 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001928 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001929 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001930 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001931 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001932 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001933 count += 1
1934 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001935 # Forcefully kill the process if we weren't able to signal it.
1936 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001937 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001938 os.kill(proc.pid, event)
1939 # proc.send_signal(event) could also be done here.
1940 # Allow time for the signal to be passed and the process to exit.
1941 time.sleep(0.5)
1942 if not proc.poll():
1943 # Forcefully kill the process if we weren't able to signal it.
1944 os.kill(proc.pid, signal.SIGINT)
1945 self.fail("subprocess did not stop on {}".format(name))
1946
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001947 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001948 def test_CTRL_C_EVENT(self):
1949 from ctypes import wintypes
1950 import ctypes
1951
1952 # Make a NULL value by creating a pointer with no argument.
1953 NULL = ctypes.POINTER(ctypes.c_int)()
1954 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1955 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1956 wintypes.BOOL)
1957 SetConsoleCtrlHandler.restype = wintypes.BOOL
1958
1959 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001960 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001961 # by subprocesses.
1962 SetConsoleCtrlHandler(NULL, 0)
1963
1964 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1965
1966 def test_CTRL_BREAK_EVENT(self):
1967 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1968
1969
Brian Curtind40e6f72010-07-08 21:39:08 +00001970@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001971class Win32ListdirTests(unittest.TestCase):
1972 """Test listdir on Windows."""
1973
1974 def setUp(self):
1975 self.created_paths = []
1976 for i in range(2):
1977 dir_name = 'SUB%d' % i
1978 dir_path = os.path.join(support.TESTFN, dir_name)
1979 file_name = 'FILE%d' % i
1980 file_path = os.path.join(support.TESTFN, file_name)
1981 os.makedirs(dir_path)
1982 with open(file_path, 'w') as f:
1983 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1984 self.created_paths.extend([dir_name, file_name])
1985 self.created_paths.sort()
1986
1987 def tearDown(self):
1988 shutil.rmtree(support.TESTFN)
1989
1990 def test_listdir_no_extended_path(self):
1991 """Test when the path is not an "extended" path."""
1992 # unicode
1993 self.assertEqual(
1994 sorted(os.listdir(support.TESTFN)),
1995 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001996
Tim Golden781bbeb2013-10-25 20:24:06 +01001997 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001998 self.assertEqual(
1999 sorted(os.listdir(os.fsencode(support.TESTFN))),
2000 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002001
2002 def test_listdir_extended_path(self):
2003 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002004 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002005 # unicode
2006 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2007 self.assertEqual(
2008 sorted(os.listdir(path)),
2009 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002010
Tim Golden781bbeb2013-10-25 20:24:06 +01002011 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002012 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2013 self.assertEqual(
2014 sorted(os.listdir(path)),
2015 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002016
2017
2018@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002019@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002020class Win32SymlinkTests(unittest.TestCase):
2021 filelink = 'filelinktest'
2022 filelink_target = os.path.abspath(__file__)
2023 dirlink = 'dirlinktest'
2024 dirlink_target = os.path.dirname(filelink_target)
2025 missing_link = 'missing link'
2026
2027 def setUp(self):
2028 assert os.path.exists(self.dirlink_target)
2029 assert os.path.exists(self.filelink_target)
2030 assert not os.path.exists(self.dirlink)
2031 assert not os.path.exists(self.filelink)
2032 assert not os.path.exists(self.missing_link)
2033
2034 def tearDown(self):
2035 if os.path.exists(self.filelink):
2036 os.remove(self.filelink)
2037 if os.path.exists(self.dirlink):
2038 os.rmdir(self.dirlink)
2039 if os.path.lexists(self.missing_link):
2040 os.remove(self.missing_link)
2041
2042 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002043 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002044 self.assertTrue(os.path.exists(self.dirlink))
2045 self.assertTrue(os.path.isdir(self.dirlink))
2046 self.assertTrue(os.path.islink(self.dirlink))
2047 self.check_stat(self.dirlink, self.dirlink_target)
2048
2049 def test_file_link(self):
2050 os.symlink(self.filelink_target, self.filelink)
2051 self.assertTrue(os.path.exists(self.filelink))
2052 self.assertTrue(os.path.isfile(self.filelink))
2053 self.assertTrue(os.path.islink(self.filelink))
2054 self.check_stat(self.filelink, self.filelink_target)
2055
2056 def _create_missing_dir_link(self):
2057 'Create a "directory" link to a non-existent target'
2058 linkname = self.missing_link
2059 if os.path.lexists(linkname):
2060 os.remove(linkname)
2061 target = r'c:\\target does not exist.29r3c740'
2062 assert not os.path.exists(target)
2063 target_is_dir = True
2064 os.symlink(target, linkname, target_is_dir)
2065
2066 def test_remove_directory_link_to_missing_target(self):
2067 self._create_missing_dir_link()
2068 # For compatibility with Unix, os.remove will check the
2069 # directory status and call RemoveDirectory if the symlink
2070 # was created with target_is_dir==True.
2071 os.remove(self.missing_link)
2072
2073 @unittest.skip("currently fails; consider for improvement")
2074 def test_isdir_on_directory_link_to_missing_target(self):
2075 self._create_missing_dir_link()
2076 # consider having isdir return true for directory links
2077 self.assertTrue(os.path.isdir(self.missing_link))
2078
2079 @unittest.skip("currently fails; consider for improvement")
2080 def test_rmdir_on_directory_link_to_missing_target(self):
2081 self._create_missing_dir_link()
2082 # consider allowing rmdir to remove directory links
2083 os.rmdir(self.missing_link)
2084
2085 def check_stat(self, link, target):
2086 self.assertEqual(os.stat(link), os.stat(target))
2087 self.assertNotEqual(os.lstat(link), os.stat(link))
2088
Brian Curtind25aef52011-06-13 15:16:04 -05002089 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002090 self.assertEqual(os.stat(bytes_link), os.stat(target))
2091 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002092
2093 def test_12084(self):
2094 level1 = os.path.abspath(support.TESTFN)
2095 level2 = os.path.join(level1, "level2")
2096 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002097 self.addCleanup(support.rmtree, level1)
2098
2099 os.mkdir(level1)
2100 os.mkdir(level2)
2101 os.mkdir(level3)
2102
2103 file1 = os.path.abspath(os.path.join(level1, "file1"))
2104 create_file(file1)
2105
2106 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002107 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002108 os.chdir(level2)
2109 link = os.path.join(level2, "link")
2110 os.symlink(os.path.relpath(file1), "link")
2111 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002112
Victor Stinnerae39d232016-03-24 17:12:55 +01002113 # Check os.stat calls from the same dir as the link
2114 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002115
Victor Stinnerae39d232016-03-24 17:12:55 +01002116 # Check os.stat calls from a dir below the link
2117 os.chdir(level1)
2118 self.assertEqual(os.stat(file1),
2119 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002120
Victor Stinnerae39d232016-03-24 17:12:55 +01002121 # Check os.stat calls from a dir above the link
2122 os.chdir(level3)
2123 self.assertEqual(os.stat(file1),
2124 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002125 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002126 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002127
Brian Curtind40e6f72010-07-08 21:39:08 +00002128
Tim Golden0321cf22014-05-05 19:46:17 +01002129@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2130class Win32JunctionTests(unittest.TestCase):
2131 junction = 'junctiontest'
2132 junction_target = os.path.dirname(os.path.abspath(__file__))
2133
2134 def setUp(self):
2135 assert os.path.exists(self.junction_target)
2136 assert not os.path.exists(self.junction)
2137
2138 def tearDown(self):
2139 if os.path.exists(self.junction):
2140 # os.rmdir delegates to Windows' RemoveDirectoryW,
2141 # which removes junction points safely.
2142 os.rmdir(self.junction)
2143
2144 def test_create_junction(self):
2145 _winapi.CreateJunction(self.junction_target, self.junction)
2146 self.assertTrue(os.path.exists(self.junction))
2147 self.assertTrue(os.path.isdir(self.junction))
2148
2149 # Junctions are not recognized as links.
2150 self.assertFalse(os.path.islink(self.junction))
2151
2152 def test_unlink_removes_junction(self):
2153 _winapi.CreateJunction(self.junction_target, self.junction)
2154 self.assertTrue(os.path.exists(self.junction))
2155
2156 os.unlink(self.junction)
2157 self.assertFalse(os.path.exists(self.junction))
2158
2159
Jason R. Coombs3a092862013-05-27 23:21:28 -04002160@support.skip_unless_symlink
2161class NonLocalSymlinkTests(unittest.TestCase):
2162
2163 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002164 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002165 Create this structure:
2166
2167 base
2168 \___ some_dir
2169 """
2170 os.makedirs('base/some_dir')
2171
2172 def tearDown(self):
2173 shutil.rmtree('base')
2174
2175 def test_directory_link_nonlocal(self):
2176 """
2177 The symlink target should resolve relative to the link, not relative
2178 to the current directory.
2179
2180 Then, link base/some_link -> base/some_dir and ensure that some_link
2181 is resolved as a directory.
2182
2183 In issue13772, it was discovered that directory detection failed if
2184 the symlink target was not specified relative to the current
2185 directory, which was a defect in the implementation.
2186 """
2187 src = os.path.join('base', 'some_link')
2188 os.symlink('some_dir', src)
2189 assert os.path.isdir(src)
2190
2191
Victor Stinnere8d51452010-08-19 01:05:19 +00002192class FSEncodingTests(unittest.TestCase):
2193 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002194 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2195 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002196
Victor Stinnere8d51452010-08-19 01:05:19 +00002197 def test_identity(self):
2198 # assert fsdecode(fsencode(x)) == x
2199 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2200 try:
2201 bytesfn = os.fsencode(fn)
2202 except UnicodeEncodeError:
2203 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002205
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002206
Brett Cannonefb00c02012-02-29 18:31:31 -05002207
2208class DeviceEncodingTests(unittest.TestCase):
2209
2210 def test_bad_fd(self):
2211 # Return None when an fd doesn't actually exist.
2212 self.assertIsNone(os.device_encoding(123456))
2213
Philip Jenveye308b7c2012-02-29 16:16:15 -08002214 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2215 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002216 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002217 def test_device_encoding(self):
2218 encoding = os.device_encoding(0)
2219 self.assertIsNotNone(encoding)
2220 self.assertTrue(codecs.lookup(encoding))
2221
2222
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002223class PidTests(unittest.TestCase):
2224 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2225 def test_getppid(self):
2226 p = subprocess.Popen([sys.executable, '-c',
2227 'import os; print(os.getppid())'],
2228 stdout=subprocess.PIPE)
2229 stdout, _ = p.communicate()
2230 # We are the parent of our subprocess
2231 self.assertEqual(int(stdout), os.getpid())
2232
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002233 def test_waitpid(self):
2234 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002235 # Add an implicit test for PyUnicode_FSConverter().
2236 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002237 status = os.waitpid(pid, 0)
2238 self.assertEqual(status, (pid, 0))
2239
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002240
Victor Stinner4659ccf2016-09-14 10:57:00 +02002241class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002242 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002243 self.exitcode = 17
2244
2245 filename = support.TESTFN
2246 self.addCleanup(support.unlink, filename)
2247
2248 if not with_env:
2249 code = 'import sys; sys.exit(%s)' % self.exitcode
2250 else:
2251 self.env = dict(os.environ)
2252 # create an unique key
2253 self.key = str(uuid.uuid4())
2254 self.env[self.key] = self.key
2255 # read the variable from os.environ to check that it exists
2256 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2257 % (self.key, self.exitcode))
2258
2259 with open(filename, "w") as fp:
2260 fp.write(code)
2261
Berker Peksag81816462016-09-15 20:19:47 +03002262 args = [sys.executable, filename]
2263 if use_bytes:
2264 args = [os.fsencode(a) for a in args]
2265 self.env = {os.fsencode(k): os.fsencode(v)
2266 for k, v in self.env.items()}
2267
2268 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002269
Berker Peksag4af23d72016-09-15 20:32:44 +03002270 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002271 def test_spawnl(self):
2272 args = self.create_args()
2273 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2274 self.assertEqual(exitcode, self.exitcode)
2275
Berker Peksag4af23d72016-09-15 20:32:44 +03002276 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002277 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002278 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002279 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2280 self.assertEqual(exitcode, self.exitcode)
2281
Berker Peksag4af23d72016-09-15 20:32:44 +03002282 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002283 def test_spawnlp(self):
2284 args = self.create_args()
2285 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2286 self.assertEqual(exitcode, self.exitcode)
2287
Berker Peksag4af23d72016-09-15 20:32:44 +03002288 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002289 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002290 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002291 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2292 self.assertEqual(exitcode, self.exitcode)
2293
Berker Peksag4af23d72016-09-15 20:32:44 +03002294 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002295 def test_spawnv(self):
2296 args = self.create_args()
2297 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2298 self.assertEqual(exitcode, self.exitcode)
2299
Berker Peksag4af23d72016-09-15 20:32:44 +03002300 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002301 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002302 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002303 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2304 self.assertEqual(exitcode, self.exitcode)
2305
Berker Peksag4af23d72016-09-15 20:32:44 +03002306 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002307 def test_spawnvp(self):
2308 args = self.create_args()
2309 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2310 self.assertEqual(exitcode, self.exitcode)
2311
Berker Peksag4af23d72016-09-15 20:32:44 +03002312 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002313 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002314 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002315 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2316 self.assertEqual(exitcode, self.exitcode)
2317
Berker Peksag4af23d72016-09-15 20:32:44 +03002318 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002319 def test_nowait(self):
2320 args = self.create_args()
2321 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2322 result = os.waitpid(pid, 0)
2323 self.assertEqual(result[0], pid)
2324 status = result[1]
2325 if hasattr(os, 'WIFEXITED'):
2326 self.assertTrue(os.WIFEXITED(status))
2327 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2328 else:
2329 self.assertEqual(status, self.exitcode << 8)
2330
Berker Peksag4af23d72016-09-15 20:32:44 +03002331 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002332 def test_spawnve_bytes(self):
2333 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2334 args = self.create_args(with_env=True, use_bytes=True)
2335 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2336 self.assertEqual(exitcode, self.exitcode)
2337
Steve Dower859fd7b2016-11-19 18:53:19 -08002338 @requires_os_func('spawnl')
2339 def test_spawnl_noargs(self):
2340 args = self.create_args()
2341 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002342 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002343
2344 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002345 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002346 args = self.create_args()
2347 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002348 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002349
2350 @requires_os_func('spawnv')
2351 def test_spawnv_noargs(self):
2352 args = self.create_args()
2353 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2354 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002355 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2356 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002357
2358 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002359 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002360 args = self.create_args()
2361 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2362 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002363 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2364 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002365
Brian Curtin0151b8e2010-09-24 13:43:43 +00002366# The introduction of this TestCase caused at least two different errors on
2367# *nix buildbots. Temporarily skip this to let the buildbots move along.
2368@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002369@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2370class LoginTests(unittest.TestCase):
2371 def test_getlogin(self):
2372 user_name = os.getlogin()
2373 self.assertNotEqual(len(user_name), 0)
2374
2375
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002376@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2377 "needs os.getpriority and os.setpriority")
2378class ProgramPriorityTests(unittest.TestCase):
2379 """Tests for os.getpriority() and os.setpriority()."""
2380
2381 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002382
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002383 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2384 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2385 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002386 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2387 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002388 raise unittest.SkipTest("unable to reliably test setpriority "
2389 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002390 else:
2391 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002392 finally:
2393 try:
2394 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2395 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002396 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002397 raise
2398
2399
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002400if threading is not None:
2401 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002402
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002403 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002404
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002405 def __init__(self, conn):
2406 asynchat.async_chat.__init__(self, conn)
2407 self.in_buffer = []
2408 self.closed = False
2409 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002410
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002411 def handle_read(self):
2412 data = self.recv(4096)
2413 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002414
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002415 def get_data(self):
2416 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002417
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002418 def handle_close(self):
2419 self.close()
2420 self.closed = True
2421
2422 def handle_error(self):
2423 raise
2424
2425 def __init__(self, address):
2426 threading.Thread.__init__(self)
2427 asyncore.dispatcher.__init__(self)
2428 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2429 self.bind(address)
2430 self.listen(5)
2431 self.host, self.port = self.socket.getsockname()[:2]
2432 self.handler_instance = None
2433 self._active = False
2434 self._active_lock = threading.Lock()
2435
2436 # --- public API
2437
2438 @property
2439 def running(self):
2440 return self._active
2441
2442 def start(self):
2443 assert not self.running
2444 self.__flag = threading.Event()
2445 threading.Thread.start(self)
2446 self.__flag.wait()
2447
2448 def stop(self):
2449 assert self.running
2450 self._active = False
2451 self.join()
2452
2453 def wait(self):
2454 # wait for handler connection to be closed, then stop the server
2455 while not getattr(self.handler_instance, "closed", False):
2456 time.sleep(0.001)
2457 self.stop()
2458
2459 # --- internals
2460
2461 def run(self):
2462 self._active = True
2463 self.__flag.set()
2464 while self._active and asyncore.socket_map:
2465 self._active_lock.acquire()
2466 asyncore.loop(timeout=0.001, count=1)
2467 self._active_lock.release()
2468 asyncore.close_all()
2469
2470 def handle_accept(self):
2471 conn, addr = self.accept()
2472 self.handler_instance = self.Handler(conn)
2473
2474 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002475 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002476 handle_read = handle_connect
2477
2478 def writable(self):
2479 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002480
2481 def handle_error(self):
2482 raise
2483
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002484
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002485@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002486@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2487class TestSendfile(unittest.TestCase):
2488
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002489 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002490 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002491 not sys.platform.startswith("solaris") and \
2492 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002493 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2494 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002495
2496 @classmethod
2497 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002498 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002499 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002500
2501 @classmethod
2502 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002503 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002504 support.unlink(support.TESTFN)
2505
2506 def setUp(self):
2507 self.server = SendfileTestServer((support.HOST, 0))
2508 self.server.start()
2509 self.client = socket.socket()
2510 self.client.connect((self.server.host, self.server.port))
2511 self.client.settimeout(1)
2512 # synchronize by waiting for "220 ready" response
2513 self.client.recv(1024)
2514 self.sockno = self.client.fileno()
2515 self.file = open(support.TESTFN, 'rb')
2516 self.fileno = self.file.fileno()
2517
2518 def tearDown(self):
2519 self.file.close()
2520 self.client.close()
2521 if self.server.running:
2522 self.server.stop()
2523
2524 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2525 """A higher level wrapper representing how an application is
2526 supposed to use sendfile().
2527 """
2528 while 1:
2529 try:
2530 if self.SUPPORT_HEADERS_TRAILERS:
2531 return os.sendfile(sock, file, offset, nbytes, headers,
2532 trailers)
2533 else:
2534 return os.sendfile(sock, file, offset, nbytes)
2535 except OSError as err:
2536 if err.errno == errno.ECONNRESET:
2537 # disconnected
2538 raise
2539 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2540 # we have to retry send data
2541 continue
2542 else:
2543 raise
2544
2545 def test_send_whole_file(self):
2546 # normal send
2547 total_sent = 0
2548 offset = 0
2549 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002550 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002551 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2552 if sent == 0:
2553 break
2554 offset += sent
2555 total_sent += sent
2556 self.assertTrue(sent <= nbytes)
2557 self.assertEqual(offset, total_sent)
2558
2559 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002560 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002561 self.client.close()
2562 self.server.wait()
2563 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002564 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002565 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002566
2567 def test_send_at_certain_offset(self):
2568 # start sending a file at a certain offset
2569 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002570 offset = len(self.DATA) // 2
2571 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002572 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002573 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002574 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2575 if sent == 0:
2576 break
2577 offset += sent
2578 total_sent += sent
2579 self.assertTrue(sent <= nbytes)
2580
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002581 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002582 self.client.close()
2583 self.server.wait()
2584 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002585 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002586 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002587 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002588 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002589
2590 def test_offset_overflow(self):
2591 # specify an offset > file size
2592 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002593 try:
2594 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2595 except OSError as e:
2596 # Solaris can raise EINVAL if offset >= file length, ignore.
2597 if e.errno != errno.EINVAL:
2598 raise
2599 else:
2600 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002601 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002602 self.client.close()
2603 self.server.wait()
2604 data = self.server.handler_instance.get_data()
2605 self.assertEqual(data, b'')
2606
2607 def test_invalid_offset(self):
2608 with self.assertRaises(OSError) as cm:
2609 os.sendfile(self.sockno, self.fileno, -1, 4096)
2610 self.assertEqual(cm.exception.errno, errno.EINVAL)
2611
Martin Panterbf19d162015-09-09 01:01:13 +00002612 def test_keywords(self):
2613 # Keyword arguments should be supported
2614 os.sendfile(out=self.sockno, offset=0, count=4096,
2615 **{'in': self.fileno})
2616 if self.SUPPORT_HEADERS_TRAILERS:
2617 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002618 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002619
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002620 # --- headers / trailers tests
2621
Serhiy Storchaka43767632013-11-03 21:31:38 +02002622 @requires_headers_trailers
2623 def test_headers(self):
2624 total_sent = 0
2625 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2626 headers=[b"x" * 512])
2627 total_sent += sent
2628 offset = 4096
2629 nbytes = 4096
2630 while 1:
2631 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2632 offset, nbytes)
2633 if sent == 0:
2634 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002635 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002636 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002637
Serhiy Storchaka43767632013-11-03 21:31:38 +02002638 expected_data = b"x" * 512 + self.DATA
2639 self.assertEqual(total_sent, len(expected_data))
2640 self.client.close()
2641 self.server.wait()
2642 data = self.server.handler_instance.get_data()
2643 self.assertEqual(hash(data), hash(expected_data))
2644
2645 @requires_headers_trailers
2646 def test_trailers(self):
2647 TESTFN2 = support.TESTFN + "2"
2648 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002649
2650 self.addCleanup(support.unlink, TESTFN2)
2651 create_file(TESTFN2, file_data)
2652
2653 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002654 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2655 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002656 self.client.close()
2657 self.server.wait()
2658 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002659 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002660
Serhiy Storchaka43767632013-11-03 21:31:38 +02002661 @requires_headers_trailers
2662 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2663 'test needs os.SF_NODISKIO')
2664 def test_flags(self):
2665 try:
2666 os.sendfile(self.sockno, self.fileno, 0, 4096,
2667 flags=os.SF_NODISKIO)
2668 except OSError as err:
2669 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2670 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002671
2672
Larry Hastings9cf065c2012-06-22 16:30:09 -07002673def supports_extended_attributes():
2674 if not hasattr(os, "setxattr"):
2675 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002676
Larry Hastings9cf065c2012-06-22 16:30:09 -07002677 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002678 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002679 try:
2680 os.setxattr(fp.fileno(), b"user.test", b"")
2681 except OSError:
2682 return False
2683 finally:
2684 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002685
2686 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002687
2688
2689@unittest.skipUnless(supports_extended_attributes(),
2690 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002691# Kernels < 2.6.39 don't respect setxattr flags.
2692@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002693class ExtendedAttributeTests(unittest.TestCase):
2694
Larry Hastings9cf065c2012-06-22 16:30:09 -07002695 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002696 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002697 self.addCleanup(support.unlink, fn)
2698 create_file(fn)
2699
Benjamin Peterson799bd802011-08-31 22:15:17 -04002700 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002701 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002702 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002703
Victor Stinnerf12e5062011-10-16 22:12:03 +02002704 init_xattr = listxattr(fn)
2705 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002706
Larry Hastings9cf065c2012-06-22 16:30:09 -07002707 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002708 xattr = set(init_xattr)
2709 xattr.add("user.test")
2710 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002711 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2712 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2713 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002714
Benjamin Peterson799bd802011-08-31 22:15:17 -04002715 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002716 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002717 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002718
Benjamin Peterson799bd802011-08-31 22:15:17 -04002719 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002720 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002721 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002722
Larry Hastings9cf065c2012-06-22 16:30:09 -07002723 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002724 xattr.add("user.test2")
2725 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002726 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002727
Benjamin Peterson799bd802011-08-31 22:15:17 -04002728 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002729 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002730 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002731
Victor Stinnerf12e5062011-10-16 22:12:03 +02002732 xattr.remove("user.test")
2733 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002734 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2735 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2736 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2737 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002738 many = sorted("user.test{}".format(i) for i in range(100))
2739 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002740 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002741 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002742
Larry Hastings9cf065c2012-06-22 16:30:09 -07002743 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002744 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002745 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002746
2747 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2748 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002749
2750 def test_simple(self):
2751 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2752 os.listxattr)
2753
2754 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002755 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2756 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002757
2758 def test_fds(self):
2759 def getxattr(path, *args):
2760 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002761 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002762 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002763 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002764 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002765 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002766 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002767 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002768 def listxattr(path, *args):
2769 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002770 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002771 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2772
2773
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002774@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2775class TermsizeTests(unittest.TestCase):
2776 def test_does_not_crash(self):
2777 """Check if get_terminal_size() returns a meaningful value.
2778
2779 There's no easy portable way to actually check the size of the
2780 terminal, so let's check if it returns something sensible instead.
2781 """
2782 try:
2783 size = os.get_terminal_size()
2784 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002785 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002786 # Under win32 a generic OSError can be thrown if the
2787 # handle cannot be retrieved
2788 self.skipTest("failed to query terminal size")
2789 raise
2790
Antoine Pitroucfade362012-02-08 23:48:59 +01002791 self.assertGreaterEqual(size.columns, 0)
2792 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002793
2794 def test_stty_match(self):
2795 """Check if stty returns the same results
2796
2797 stty actually tests stdin, so get_terminal_size is invoked on
2798 stdin explicitly. If stty succeeded, then get_terminal_size()
2799 should work too.
2800 """
2801 try:
2802 size = subprocess.check_output(['stty', 'size']).decode().split()
2803 except (FileNotFoundError, subprocess.CalledProcessError):
2804 self.skipTest("stty invocation failed")
2805 expected = (int(size[1]), int(size[0])) # reversed order
2806
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002807 try:
2808 actual = os.get_terminal_size(sys.__stdin__.fileno())
2809 except OSError as e:
2810 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2811 # Under win32 a generic OSError can be thrown if the
2812 # handle cannot be retrieved
2813 self.skipTest("failed to query terminal size")
2814 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002815 self.assertEqual(expected, actual)
2816
2817
Victor Stinner292c8352012-10-30 02:17:38 +01002818class OSErrorTests(unittest.TestCase):
2819 def setUp(self):
2820 class Str(str):
2821 pass
2822
Victor Stinnerafe17062012-10-31 22:47:43 +01002823 self.bytes_filenames = []
2824 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002825 if support.TESTFN_UNENCODABLE is not None:
2826 decoded = support.TESTFN_UNENCODABLE
2827 else:
2828 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002829 self.unicode_filenames.append(decoded)
2830 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002831 if support.TESTFN_UNDECODABLE is not None:
2832 encoded = support.TESTFN_UNDECODABLE
2833 else:
2834 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002835 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002836 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002837 self.bytes_filenames.append(memoryview(encoded))
2838
2839 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002840
2841 def test_oserror_filename(self):
2842 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002843 (self.filenames, os.chdir,),
2844 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002845 (self.filenames, os.lstat,),
2846 (self.filenames, os.open, os.O_RDONLY),
2847 (self.filenames, os.rmdir,),
2848 (self.filenames, os.stat,),
2849 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002850 ]
2851 if sys.platform == "win32":
2852 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002853 (self.bytes_filenames, os.rename, b"dst"),
2854 (self.bytes_filenames, os.replace, b"dst"),
2855 (self.unicode_filenames, os.rename, "dst"),
2856 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002857 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002858 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002859 else:
2860 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002861 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002862 (self.filenames, os.rename, "dst"),
2863 (self.filenames, os.replace, "dst"),
2864 ))
2865 if hasattr(os, "chown"):
2866 funcs.append((self.filenames, os.chown, 0, 0))
2867 if hasattr(os, "lchown"):
2868 funcs.append((self.filenames, os.lchown, 0, 0))
2869 if hasattr(os, "truncate"):
2870 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002871 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002872 funcs.append((self.filenames, os.chflags, 0))
2873 if hasattr(os, "lchflags"):
2874 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002875 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002876 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002877 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002878 if sys.platform == "win32":
2879 funcs.append((self.bytes_filenames, os.link, b"dst"))
2880 funcs.append((self.unicode_filenames, os.link, "dst"))
2881 else:
2882 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002883 if hasattr(os, "listxattr"):
2884 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002885 (self.filenames, os.listxattr,),
2886 (self.filenames, os.getxattr, "user.test"),
2887 (self.filenames, os.setxattr, "user.test", b'user'),
2888 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002889 ))
2890 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002891 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002892 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002893 if sys.platform == "win32":
2894 funcs.append((self.unicode_filenames, os.readlink,))
2895 else:
2896 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002897
Steve Dowercc16be82016-09-08 10:35:16 -07002898
Victor Stinnerafe17062012-10-31 22:47:43 +01002899 for filenames, func, *func_args in funcs:
2900 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002901 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002902 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002903 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002904 else:
2905 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2906 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002907 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002908 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002909 except UnicodeDecodeError:
2910 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002911 else:
2912 self.fail("No exception thrown by {}".format(func))
2913
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002914class CPUCountTests(unittest.TestCase):
2915 def test_cpu_count(self):
2916 cpus = os.cpu_count()
2917 if cpus is not None:
2918 self.assertIsInstance(cpus, int)
2919 self.assertGreater(cpus, 0)
2920 else:
2921 self.skipTest("Could not determine the number of CPUs")
2922
Victor Stinnerdaf45552013-08-28 00:53:59 +02002923
2924class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002925 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002926 fd = os.open(__file__, os.O_RDONLY)
2927 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002928 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002929
Victor Stinnerdaf45552013-08-28 00:53:59 +02002930 os.set_inheritable(fd, True)
2931 self.assertEqual(os.get_inheritable(fd), True)
2932
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002933 @unittest.skipIf(fcntl is None, "need fcntl")
2934 def test_get_inheritable_cloexec(self):
2935 fd = os.open(__file__, os.O_RDONLY)
2936 self.addCleanup(os.close, fd)
2937 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002938
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002939 # clear FD_CLOEXEC flag
2940 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2941 flags &= ~fcntl.FD_CLOEXEC
2942 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002943
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002944 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002945
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002946 @unittest.skipIf(fcntl is None, "need fcntl")
2947 def test_set_inheritable_cloexec(self):
2948 fd = os.open(__file__, os.O_RDONLY)
2949 self.addCleanup(os.close, fd)
2950 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2951 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002952
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002953 os.set_inheritable(fd, True)
2954 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2955 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002956
Victor Stinnerdaf45552013-08-28 00:53:59 +02002957 def test_open(self):
2958 fd = os.open(__file__, os.O_RDONLY)
2959 self.addCleanup(os.close, fd)
2960 self.assertEqual(os.get_inheritable(fd), False)
2961
2962 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2963 def test_pipe(self):
2964 rfd, wfd = os.pipe()
2965 self.addCleanup(os.close, rfd)
2966 self.addCleanup(os.close, wfd)
2967 self.assertEqual(os.get_inheritable(rfd), False)
2968 self.assertEqual(os.get_inheritable(wfd), False)
2969
2970 def test_dup(self):
2971 fd1 = os.open(__file__, os.O_RDONLY)
2972 self.addCleanup(os.close, fd1)
2973
2974 fd2 = os.dup(fd1)
2975 self.addCleanup(os.close, fd2)
2976 self.assertEqual(os.get_inheritable(fd2), False)
2977
2978 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2979 def test_dup2(self):
2980 fd = os.open(__file__, os.O_RDONLY)
2981 self.addCleanup(os.close, fd)
2982
2983 # inheritable by default
2984 fd2 = os.open(__file__, os.O_RDONLY)
2985 try:
2986 os.dup2(fd, fd2)
2987 self.assertEqual(os.get_inheritable(fd2), True)
2988 finally:
2989 os.close(fd2)
2990
2991 # force non-inheritable
2992 fd3 = os.open(__file__, os.O_RDONLY)
2993 try:
2994 os.dup2(fd, fd3, inheritable=False)
2995 self.assertEqual(os.get_inheritable(fd3), False)
2996 finally:
2997 os.close(fd3)
2998
2999 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3000 def test_openpty(self):
3001 master_fd, slave_fd = os.openpty()
3002 self.addCleanup(os.close, master_fd)
3003 self.addCleanup(os.close, slave_fd)
3004 self.assertEqual(os.get_inheritable(master_fd), False)
3005 self.assertEqual(os.get_inheritable(slave_fd), False)
3006
3007
Brett Cannon3f9183b2016-08-26 14:44:48 -07003008class PathTConverterTests(unittest.TestCase):
3009 # tuples of (function name, allows fd arguments, additional arguments to
3010 # function, cleanup function)
3011 functions = [
3012 ('stat', True, (), None),
3013 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003014 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003015 ('chflags', False, (0,), None),
3016 ('lchflags', False, (0,), None),
3017 ('open', False, (0,), getattr(os, 'close', None)),
3018 ]
3019
3020 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003021 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003022 if os.name == 'nt':
3023 bytes_fspath = bytes_filename = None
3024 else:
3025 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003026 bytes_fspath = _PathLike(bytes_filename)
3027 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003028 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003029 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003030
Brett Cannonec6ce872016-09-06 15:50:29 -07003031 int_fspath = _PathLike(fd)
3032 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003033
3034 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3035 with self.subTest(name=name):
3036 try:
3037 fn = getattr(os, name)
3038 except AttributeError:
3039 continue
3040
Brett Cannon8f96a302016-08-26 19:30:11 -07003041 for path in (str_filename, bytes_filename, str_fspath,
3042 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003043 if path is None:
3044 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003045 with self.subTest(name=name, path=path):
3046 result = fn(path, *extra_args)
3047 if cleanup_fn is not None:
3048 cleanup_fn(result)
3049
3050 with self.assertRaisesRegex(
3051 TypeError, 'should be string, bytes'):
3052 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003053
3054 if allow_fd:
3055 result = fn(fd, *extra_args) # should not fail
3056 if cleanup_fn is not None:
3057 cleanup_fn(result)
3058 else:
3059 with self.assertRaisesRegex(
3060 TypeError,
3061 'os.PathLike'):
3062 fn(fd, *extra_args)
3063
3064
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003065@unittest.skipUnless(hasattr(os, 'get_blocking'),
3066 'needs os.get_blocking() and os.set_blocking()')
3067class BlockingTests(unittest.TestCase):
3068 def test_blocking(self):
3069 fd = os.open(__file__, os.O_RDONLY)
3070 self.addCleanup(os.close, fd)
3071 self.assertEqual(os.get_blocking(fd), True)
3072
3073 os.set_blocking(fd, False)
3074 self.assertEqual(os.get_blocking(fd), False)
3075
3076 os.set_blocking(fd, True)
3077 self.assertEqual(os.get_blocking(fd), True)
3078
3079
Yury Selivanov97e2e062014-09-26 12:33:06 -04003080
3081class ExportsTests(unittest.TestCase):
3082 def test_os_all(self):
3083 self.assertIn('open', os.__all__)
3084 self.assertIn('walk', os.__all__)
3085
3086
Victor Stinner6036e442015-03-08 01:58:04 +01003087class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003088 check_no_resource_warning = support.check_no_resource_warning
3089
Victor Stinner6036e442015-03-08 01:58:04 +01003090 def setUp(self):
3091 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003092 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003093 self.addCleanup(support.rmtree, self.path)
3094 os.mkdir(self.path)
3095
3096 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003097 path = self.bytes_path if isinstance(name, bytes) else self.path
3098 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003099 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003100 return filename
3101
3102 def get_entries(self, names):
3103 entries = dict((entry.name, entry)
3104 for entry in os.scandir(self.path))
3105 self.assertEqual(sorted(entries.keys()), names)
3106 return entries
3107
3108 def assert_stat_equal(self, stat1, stat2, skip_fields):
3109 if skip_fields:
3110 for attr in dir(stat1):
3111 if not attr.startswith("st_"):
3112 continue
3113 if attr in ("st_dev", "st_ino", "st_nlink"):
3114 continue
3115 self.assertEqual(getattr(stat1, attr),
3116 getattr(stat2, attr),
3117 (stat1, stat2, attr))
3118 else:
3119 self.assertEqual(stat1, stat2)
3120
3121 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003122 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003123 self.assertEqual(entry.name, name)
3124 self.assertEqual(entry.path, os.path.join(self.path, name))
3125 self.assertEqual(entry.inode(),
3126 os.stat(entry.path, follow_symlinks=False).st_ino)
3127
3128 entry_stat = os.stat(entry.path)
3129 self.assertEqual(entry.is_dir(),
3130 stat.S_ISDIR(entry_stat.st_mode))
3131 self.assertEqual(entry.is_file(),
3132 stat.S_ISREG(entry_stat.st_mode))
3133 self.assertEqual(entry.is_symlink(),
3134 os.path.islink(entry.path))
3135
3136 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3137 self.assertEqual(entry.is_dir(follow_symlinks=False),
3138 stat.S_ISDIR(entry_lstat.st_mode))
3139 self.assertEqual(entry.is_file(follow_symlinks=False),
3140 stat.S_ISREG(entry_lstat.st_mode))
3141
3142 self.assert_stat_equal(entry.stat(),
3143 entry_stat,
3144 os.name == 'nt' and not is_symlink)
3145 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3146 entry_lstat,
3147 os.name == 'nt')
3148
3149 def test_attributes(self):
3150 link = hasattr(os, 'link')
3151 symlink = support.can_symlink()
3152
3153 dirname = os.path.join(self.path, "dir")
3154 os.mkdir(dirname)
3155 filename = self.create_file("file.txt")
3156 if link:
3157 os.link(filename, os.path.join(self.path, "link_file.txt"))
3158 if symlink:
3159 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3160 target_is_directory=True)
3161 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3162
3163 names = ['dir', 'file.txt']
3164 if link:
3165 names.append('link_file.txt')
3166 if symlink:
3167 names.extend(('symlink_dir', 'symlink_file.txt'))
3168 entries = self.get_entries(names)
3169
3170 entry = entries['dir']
3171 self.check_entry(entry, 'dir', True, False, False)
3172
3173 entry = entries['file.txt']
3174 self.check_entry(entry, 'file.txt', False, True, False)
3175
3176 if link:
3177 entry = entries['link_file.txt']
3178 self.check_entry(entry, 'link_file.txt', False, True, False)
3179
3180 if symlink:
3181 entry = entries['symlink_dir']
3182 self.check_entry(entry, 'symlink_dir', True, False, True)
3183
3184 entry = entries['symlink_file.txt']
3185 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3186
3187 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003188 path = self.bytes_path if isinstance(name, bytes) else self.path
3189 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003190 self.assertEqual(len(entries), 1)
3191
3192 entry = entries[0]
3193 self.assertEqual(entry.name, name)
3194 return entry
3195
Brett Cannon96881cd2016-06-10 14:37:21 -07003196 def create_file_entry(self, name='file.txt'):
3197 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003198 return self.get_entry(os.path.basename(filename))
3199
3200 def test_current_directory(self):
3201 filename = self.create_file()
3202 old_dir = os.getcwd()
3203 try:
3204 os.chdir(self.path)
3205
3206 # call scandir() without parameter: it must list the content
3207 # of the current directory
3208 entries = dict((entry.name, entry) for entry in os.scandir())
3209 self.assertEqual(sorted(entries.keys()),
3210 [os.path.basename(filename)])
3211 finally:
3212 os.chdir(old_dir)
3213
3214 def test_repr(self):
3215 entry = self.create_file_entry()
3216 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3217
Brett Cannon96881cd2016-06-10 14:37:21 -07003218 def test_fspath_protocol(self):
3219 entry = self.create_file_entry()
3220 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3221
3222 def test_fspath_protocol_bytes(self):
3223 bytes_filename = os.fsencode('bytesfile.txt')
3224 bytes_entry = self.create_file_entry(name=bytes_filename)
3225 fspath = os.fspath(bytes_entry)
3226 self.assertIsInstance(fspath, bytes)
3227 self.assertEqual(fspath,
3228 os.path.join(os.fsencode(self.path),bytes_filename))
3229
Victor Stinner6036e442015-03-08 01:58:04 +01003230 def test_removed_dir(self):
3231 path = os.path.join(self.path, 'dir')
3232
3233 os.mkdir(path)
3234 entry = self.get_entry('dir')
3235 os.rmdir(path)
3236
3237 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3238 if os.name == 'nt':
3239 self.assertTrue(entry.is_dir())
3240 self.assertFalse(entry.is_file())
3241 self.assertFalse(entry.is_symlink())
3242 if os.name == 'nt':
3243 self.assertRaises(FileNotFoundError, entry.inode)
3244 # don't fail
3245 entry.stat()
3246 entry.stat(follow_symlinks=False)
3247 else:
3248 self.assertGreater(entry.inode(), 0)
3249 self.assertRaises(FileNotFoundError, entry.stat)
3250 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3251
3252 def test_removed_file(self):
3253 entry = self.create_file_entry()
3254 os.unlink(entry.path)
3255
3256 self.assertFalse(entry.is_dir())
3257 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3258 if os.name == 'nt':
3259 self.assertTrue(entry.is_file())
3260 self.assertFalse(entry.is_symlink())
3261 if os.name == 'nt':
3262 self.assertRaises(FileNotFoundError, entry.inode)
3263 # don't fail
3264 entry.stat()
3265 entry.stat(follow_symlinks=False)
3266 else:
3267 self.assertGreater(entry.inode(), 0)
3268 self.assertRaises(FileNotFoundError, entry.stat)
3269 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3270
3271 def test_broken_symlink(self):
3272 if not support.can_symlink():
3273 return self.skipTest('cannot create symbolic link')
3274
3275 filename = self.create_file("file.txt")
3276 os.symlink(filename,
3277 os.path.join(self.path, "symlink.txt"))
3278 entries = self.get_entries(['file.txt', 'symlink.txt'])
3279 entry = entries['symlink.txt']
3280 os.unlink(filename)
3281
3282 self.assertGreater(entry.inode(), 0)
3283 self.assertFalse(entry.is_dir())
3284 self.assertFalse(entry.is_file()) # broken symlink returns False
3285 self.assertFalse(entry.is_dir(follow_symlinks=False))
3286 self.assertFalse(entry.is_file(follow_symlinks=False))
3287 self.assertTrue(entry.is_symlink())
3288 self.assertRaises(FileNotFoundError, entry.stat)
3289 # don't fail
3290 entry.stat(follow_symlinks=False)
3291
3292 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003293 self.create_file("file.txt")
3294
3295 path_bytes = os.fsencode(self.path)
3296 entries = list(os.scandir(path_bytes))
3297 self.assertEqual(len(entries), 1, entries)
3298 entry = entries[0]
3299
3300 self.assertEqual(entry.name, b'file.txt')
3301 self.assertEqual(entry.path,
3302 os.fsencode(os.path.join(self.path, 'file.txt')))
3303
3304 def test_empty_path(self):
3305 self.assertRaises(FileNotFoundError, os.scandir, '')
3306
3307 def test_consume_iterator_twice(self):
3308 self.create_file("file.txt")
3309 iterator = os.scandir(self.path)
3310
3311 entries = list(iterator)
3312 self.assertEqual(len(entries), 1, entries)
3313
3314 # check than consuming the iterator twice doesn't raise exception
3315 entries2 = list(iterator)
3316 self.assertEqual(len(entries2), 0, entries2)
3317
3318 def test_bad_path_type(self):
3319 for obj in [1234, 1.234, {}, []]:
3320 self.assertRaises(TypeError, os.scandir, obj)
3321
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003322 def test_close(self):
3323 self.create_file("file.txt")
3324 self.create_file("file2.txt")
3325 iterator = os.scandir(self.path)
3326 next(iterator)
3327 iterator.close()
3328 # multiple closes
3329 iterator.close()
3330 with self.check_no_resource_warning():
3331 del iterator
3332
3333 def test_context_manager(self):
3334 self.create_file("file.txt")
3335 self.create_file("file2.txt")
3336 with os.scandir(self.path) as iterator:
3337 next(iterator)
3338 with self.check_no_resource_warning():
3339 del iterator
3340
3341 def test_context_manager_close(self):
3342 self.create_file("file.txt")
3343 self.create_file("file2.txt")
3344 with os.scandir(self.path) as iterator:
3345 next(iterator)
3346 iterator.close()
3347
3348 def test_context_manager_exception(self):
3349 self.create_file("file.txt")
3350 self.create_file("file2.txt")
3351 with self.assertRaises(ZeroDivisionError):
3352 with os.scandir(self.path) as iterator:
3353 next(iterator)
3354 1/0
3355 with self.check_no_resource_warning():
3356 del iterator
3357
3358 def test_resource_warning(self):
3359 self.create_file("file.txt")
3360 self.create_file("file2.txt")
3361 iterator = os.scandir(self.path)
3362 next(iterator)
3363 with self.assertWarns(ResourceWarning):
3364 del iterator
3365 support.gc_collect()
3366 # exhausted iterator
3367 iterator = os.scandir(self.path)
3368 list(iterator)
3369 with self.check_no_resource_warning():
3370 del iterator
3371
Victor Stinner6036e442015-03-08 01:58:04 +01003372
Ethan Furmancdc08792016-06-02 15:06:09 -07003373class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003374
3375 # Abstracted so it can be overridden to test pure Python implementation
3376 # if a C version is provided.
3377 fspath = staticmethod(os.fspath)
3378
Ethan Furmancdc08792016-06-02 15:06:09 -07003379 def test_return_bytes(self):
3380 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003381 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003382
3383 def test_return_string(self):
3384 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003385 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003386
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003387 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003388 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003389 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003390
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003391 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003392 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3393 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3394
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003395 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003396 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3397 self.assertTrue(issubclass(_PathLike, os.PathLike))
3398 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003399
Ethan Furmancdc08792016-06-02 15:06:09 -07003400 def test_garbage_in_exception_out(self):
3401 vapor = type('blah', (), {})
3402 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003403 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003404
3405 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003406 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003407
Brett Cannon044283a2016-07-15 10:41:49 -07003408 def test_bad_pathlike(self):
3409 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003410 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003411 # __fspath__ attribute that is not callable.
3412 c = type('foo', (), {})
3413 c.__fspath__ = 1
3414 self.assertRaises(TypeError, self.fspath, c())
3415 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003416 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003417 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003418
3419# Only test if the C version is provided, otherwise TestPEP519 already tested
3420# the pure Python implementation.
3421if hasattr(os, "_fspath"):
3422 class TestPEP519PurePython(TestPEP519):
3423
3424 """Explicitly test the pure Python implementation of os.fspath()."""
3425
3426 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003427
3428
Fred Drake2e2be372001-09-20 21:33:42 +00003429if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003430 unittest.main()