blob: cb15234079fc0dbd56d1c9df59c63acebfdebbce [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020025import time
26import unittest
27import uuid
28import warnings
29from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import grp
48 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
49 if hasattr(os, 'getgid'):
50 process_gid = os.getgid()
51 if process_gid not in groups:
52 groups.append(process_gid)
53except ImportError:
54 groups = []
55try:
56 import pwd
57 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010058except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050059 all_users = []
60try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020062except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020066from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000067
Victor Stinner923590e2016-03-24 09:11:48 +010068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Victor Stinner923590e2016-03-24 09:11:48 +010085
86@contextlib.contextmanager
87def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
Berker Peksag4af23d72016-09-15 20:32:44 +030092def requires_os_func(name):
93 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
94
95
Brett Cannonec6ce872016-09-06 15:50:29 -070096class _PathLike(os.PathLike):
97
98 def __init__(self, path=""):
99 self.path = path
100
101 def __str__(self):
102 return str(self.path)
103
104 def __fspath__(self):
105 if isinstance(self.path, BaseException):
106 raise self.path
107 else:
108 return self.path
109
110
Victor Stinnerae39d232016-03-24 17:12:55 +0100111def create_file(filename, content=b'content'):
112 with open(filename, "xb", 0) as fp:
113 fp.write(content)
114
115
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116# Tests creating TESTFN
117class FileTests(unittest.TestCase):
118 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000119 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000120 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121 tearDown = setUp
122
123 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Christian Heimesfdab48e2008-01-20 09:06:41 +0000128 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000129 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
130 # We must allocate two consecutive file descriptors, otherwise
131 # it will mess up other file descriptors (perhaps even the three
132 # standard ones).
133 second = os.dup(first)
134 try:
135 retries = 0
136 while second != first + 1:
137 os.close(first)
138 retries += 1
139 if retries > 10:
140 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000141 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000142 first, second = second, os.dup(second)
143 finally:
144 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000145 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000146 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000147 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000148
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000149 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000150 def test_rename(self):
151 path = support.TESTFN
152 old = sys.getrefcount(path)
153 self.assertRaises(TypeError, os.rename, path, 0)
154 new = sys.getrefcount(path)
155 self.assertEqual(old, new)
156
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157 def test_read(self):
158 with open(support.TESTFN, "w+b") as fobj:
159 fobj.write(b"spam")
160 fobj.flush()
161 fd = fobj.fileno()
162 os.lseek(fd, 0, 0)
163 s = os.read(fd, 4)
164 self.assertEqual(type(s), bytes)
165 self.assertEqual(s, b"spam")
166
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200167 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200168 # Skip the test on 32-bit platforms: the number of bytes must fit in a
169 # Py_ssize_t type
170 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
171 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200172 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
173 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200174 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100175 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200176
177 # Issue #21932: Make sure that os.read() does not raise an
178 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200179 with open(support.TESTFN, "rb") as fp:
180 data = os.read(fp.fileno(), size)
181
182 # The test does not try to read more than 2 GB at once because the
183 # operating system is free to return less bytes than requested.
184 self.assertEqual(data, b'test')
185
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000186 def test_write(self):
187 # os.write() accepts bytes- and buffer-like objects but not strings
188 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
189 self.assertRaises(TypeError, os.write, fd, "beans")
190 os.write(fd, b"bacon\n")
191 os.write(fd, bytearray(b"eggs\n"))
192 os.write(fd, memoryview(b"spam\n"))
193 os.close(fd)
194 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000195 self.assertEqual(fobj.read().splitlines(),
196 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000197
Victor Stinnere0daff12011-03-20 23:36:35 +0100198 def write_windows_console(self, *args):
199 retcode = subprocess.call(args,
200 # use a new console to not flood the test output
201 creationflags=subprocess.CREATE_NEW_CONSOLE,
202 # use a shell to hide the console window (SW_HIDE)
203 shell=True)
204 self.assertEqual(retcode, 0)
205
206 @unittest.skipUnless(sys.platform == 'win32',
207 'test specific to the Windows console')
208 def test_write_windows_console(self):
209 # Issue #11395: the Windows console returns an error (12: not enough
210 # space error) on writing into stdout if stdout mode is binary and the
211 # length is greater than 66,000 bytes (or less, depending on heap
212 # usage).
213 code = "print('x' * 100000)"
214 self.write_windows_console(sys.executable, "-c", code)
215 self.write_windows_console(sys.executable, "-u", "-c", code)
216
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000217 def fdopen_helper(self, *args):
218 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200219 f = os.fdopen(fd, *args)
220 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000221
222 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200223 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
224 os.close(fd)
225
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000226 self.fdopen_helper()
227 self.fdopen_helper('r')
228 self.fdopen_helper('r', 100)
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 def test_replace(self):
231 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100232 self.addCleanup(support.unlink, support.TESTFN)
233 self.addCleanup(support.unlink, TESTFN2)
234
235 create_file(support.TESTFN, b"1")
236 create_file(TESTFN2, b"2")
237
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100238 os.replace(support.TESTFN, TESTFN2)
239 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
240 with open(TESTFN2, 'r') as f:
241 self.assertEqual(f.read(), "1")
242
Martin Panterbf19d162015-09-09 01:01:13 +0000243 def test_open_keywords(self):
244 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
245 dir_fd=None)
246 os.close(f)
247
248 def test_symlink_keywords(self):
249 symlink = support.get_attribute(os, "symlink")
250 try:
251 symlink(src='target', dst=support.TESTFN,
252 target_is_directory=False, dir_fd=None)
253 except (NotImplementedError, OSError):
254 pass # No OS support or unprivileged user
255
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257# Test attributes on return values from os.*stat* family.
258class StatAttributeTests(unittest.TestCase):
259 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200260 self.fname = support.TESTFN
261 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100262 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Serhiy Storchaka43767632013-11-03 21:31:38 +0200264 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000265 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result[stat.ST_SIZE], 3)
270 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 # Make sure all the attributes are there
273 members = dir(result)
274 for name in dir(stat):
275 if name[:3] == 'ST_':
276 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000277 if name.endswith("TIME"):
278 def trunc(x): return int(x)
279 else:
280 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000281 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Larry Hastings6fe20b32012-04-19 15:07:49 -0700285 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700286 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700287 for name in 'st_atime st_mtime st_ctime'.split():
288 floaty = int(getattr(result, name) * 100000)
289 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700290 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700291
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except IndexError:
296 pass
297
298 # Make sure that assignment fails
299 try:
300 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000302 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 pass
304
305 try:
306 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000308 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309 pass
310
311 try:
312 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200313 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000314 except AttributeError:
315 pass
316
317 # Use the stat_result constructor with a too-short tuple.
318 try:
319 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200320 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 except TypeError:
322 pass
323
Ezio Melotti42da6632011-03-15 05:18:48 +0200324 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 try:
326 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
327 except TypeError:
328 pass
329
Antoine Pitrou38425292010-09-21 18:19:07 +0000330 def test_stat_attributes(self):
331 self.check_stat_attributes(self.fname)
332
333 def test_stat_attributes_bytes(self):
334 try:
335 fname = self.fname.encode(sys.getfilesystemencoding())
336 except UnicodeEncodeError:
337 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700338 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000339
Christian Heimes25827622013-10-12 01:27:08 +0200340 def test_stat_result_pickle(self):
341 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200342 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
343 p = pickle.dumps(result, proto)
344 self.assertIn(b'stat_result', p)
345 if proto < 4:
346 self.assertIn(b'cos\nstat_result\n', p)
347 unpickled = pickle.loads(p)
348 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200349
Serhiy Storchaka43767632013-11-03 21:31:38 +0200350 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000352 try:
353 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000354 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000355 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200357 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000358
359 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000362 # Make sure all the attributes are there.
363 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
364 'ffree', 'favail', 'flag', 'namemax')
365 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000366 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000367
368 # Make sure that assignment really fails
369 try:
370 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000372 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000373 pass
374
375 try:
376 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except AttributeError:
379 pass
380
381 # Use the constructor with a too-short tuple.
382 try:
383 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200384 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385 except TypeError:
386 pass
387
Ezio Melotti42da6632011-03-15 05:18:48 +0200388 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 try:
390 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
391 except TypeError:
392 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000393
Christian Heimes25827622013-10-12 01:27:08 +0200394 @unittest.skipUnless(hasattr(os, 'statvfs'),
395 "need os.statvfs()")
396 def test_statvfs_result_pickle(self):
397 try:
398 result = os.statvfs(self.fname)
399 except OSError as e:
400 # On AtheOS, glibc always returns ENOSYS
401 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200402 self.skipTest('os.statvfs() failed with ENOSYS')
403
Serhiy Storchakabad12572014-12-15 14:03:42 +0200404 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405 p = pickle.dumps(result, proto)
406 self.assertIn(b'statvfs_result', p)
407 if proto < 4:
408 self.assertIn(b'cos\nstatvfs_result\n', p)
409 unpickled = pickle.loads(p)
410 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200411
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
413 def test_1686475(self):
414 # Verify that an open file can be stat'ed
415 try:
416 os.stat(r"c:\pagefile.sys")
417 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600418 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200419 except OSError as e:
420 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000421
Serhiy Storchaka43767632013-11-03 21:31:38 +0200422 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
423 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
424 def test_15261(self):
425 # Verify that stat'ing a closed fd does not cause crash
426 r, w = os.pipe()
427 try:
428 os.stat(r) # should not raise error
429 finally:
430 os.close(r)
431 os.close(w)
432 with self.assertRaises(OSError) as ctx:
433 os.stat(r)
434 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100435
Zachary Ware63f277b2014-06-19 09:46:37 -0500436 def check_file_attributes(self, result):
437 self.assertTrue(hasattr(result, 'st_file_attributes'))
438 self.assertTrue(isinstance(result.st_file_attributes, int))
439 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
440
441 @unittest.skipUnless(sys.platform == "win32",
442 "st_file_attributes is Win32 specific")
443 def test_file_attributes(self):
444 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
445 result = os.stat(self.fname)
446 self.check_file_attributes(result)
447 self.assertEqual(
448 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
449 0)
450
451 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200452 dirname = support.TESTFN + "dir"
453 os.mkdir(dirname)
454 self.addCleanup(os.rmdir, dirname)
455
456 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500457 self.check_file_attributes(result)
458 self.assertEqual(
459 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
460 stat.FILE_ATTRIBUTE_DIRECTORY)
461
Berker Peksag0b4dc482016-09-17 15:49:59 +0300462 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
463 def test_access_denied(self):
464 # Default to FindFirstFile WIN32_FIND_DATA when access is
465 # denied. See issue 28075.
466 # os.environ['TEMP'] should be located on a volume that
467 # supports file ACLs.
468 fname = os.path.join(os.environ['TEMP'], self.fname)
469 self.addCleanup(support.unlink, fname)
470 create_file(fname, b'ABC')
471 # Deny the right to [S]YNCHRONIZE on the file to
472 # force CreateFile to fail with ERROR_ACCESS_DENIED.
473 DETACHED_PROCESS = 8
474 subprocess.check_call(
475 ['icacls.exe', fname, '/deny', 'Users:(S)'],
476 creationflags=DETACHED_PROCESS
477 )
478 result = os.stat(fname)
479 self.assertNotEqual(result.st_size, 0)
480
Victor Stinner47aacc82015-06-12 17:26:23 +0200481
482class UtimeTests(unittest.TestCase):
483 def setUp(self):
484 self.dirname = support.TESTFN
485 self.fname = os.path.join(self.dirname, "f1")
486
487 self.addCleanup(support.rmtree, self.dirname)
488 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100489 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200490
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200491 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100492 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200493 os.stat_float_times(state)
494
Victor Stinner47aacc82015-06-12 17:26:23 +0200495 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100496 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200497 old_float_times = os.stat_float_times(-1)
498 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200499
500 os.stat_float_times(True)
501
502 def support_subsecond(self, filename):
503 # Heuristic to check if the filesystem supports timestamp with
504 # subsecond resolution: check if float and int timestamps are different
505 st = os.stat(filename)
506 return ((st.st_atime != st[7])
507 or (st.st_mtime != st[8])
508 or (st.st_ctime != st[9]))
509
510 def _test_utime(self, set_time, filename=None):
511 if not filename:
512 filename = self.fname
513
514 support_subsecond = self.support_subsecond(filename)
515 if support_subsecond:
516 # Timestamp with a resolution of 1 microsecond (10^-6).
517 #
518 # The resolution of the C internal function used by os.utime()
519 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
520 # test with a resolution of 1 ns requires more work:
521 # see the issue #15745.
522 atime_ns = 1002003000 # 1.002003 seconds
523 mtime_ns = 4005006000 # 4.005006 seconds
524 else:
525 # use a resolution of 1 second
526 atime_ns = 5 * 10**9
527 mtime_ns = 8 * 10**9
528
529 set_time(filename, (atime_ns, mtime_ns))
530 st = os.stat(filename)
531
532 if support_subsecond:
533 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
534 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
535 else:
536 self.assertEqual(st.st_atime, atime_ns * 1e-9)
537 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
538 self.assertEqual(st.st_atime_ns, atime_ns)
539 self.assertEqual(st.st_mtime_ns, mtime_ns)
540
541 def test_utime(self):
542 def set_time(filename, ns):
543 # test the ns keyword parameter
544 os.utime(filename, ns=ns)
545 self._test_utime(set_time)
546
547 @staticmethod
548 def ns_to_sec(ns):
549 # Convert a number of nanosecond (int) to a number of seconds (float).
550 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
551 # issue, os.utime() rounds towards minus infinity.
552 return (ns * 1e-9) + 0.5e-9
553
554 def test_utime_by_indexed(self):
555 # pass times as floating point seconds as the second indexed parameter
556 def set_time(filename, ns):
557 atime_ns, mtime_ns = ns
558 atime = self.ns_to_sec(atime_ns)
559 mtime = self.ns_to_sec(mtime_ns)
560 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
561 # or utime(time_t)
562 os.utime(filename, (atime, mtime))
563 self._test_utime(set_time)
564
565 def test_utime_by_times(self):
566 def set_time(filename, ns):
567 atime_ns, mtime_ns = ns
568 atime = self.ns_to_sec(atime_ns)
569 mtime = self.ns_to_sec(mtime_ns)
570 # test the times keyword parameter
571 os.utime(filename, times=(atime, mtime))
572 self._test_utime(set_time)
573
574 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
575 "follow_symlinks support for utime required "
576 "for this test.")
577 def test_utime_nofollow_symlinks(self):
578 def set_time(filename, ns):
579 # use follow_symlinks=False to test utimensat(timespec)
580 # or lutimes(timeval)
581 os.utime(filename, ns=ns, follow_symlinks=False)
582 self._test_utime(set_time)
583
584 @unittest.skipUnless(os.utime in os.supports_fd,
585 "fd support for utime required for this test.")
586 def test_utime_fd(self):
587 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100588 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200589 # use a file descriptor to test futimens(timespec)
590 # or futimes(timeval)
591 os.utime(fp.fileno(), ns=ns)
592 self._test_utime(set_time)
593
594 @unittest.skipUnless(os.utime in os.supports_dir_fd,
595 "dir_fd support for utime required for this test.")
596 def test_utime_dir_fd(self):
597 def set_time(filename, ns):
598 dirname, name = os.path.split(filename)
599 dirfd = os.open(dirname, os.O_RDONLY)
600 try:
601 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
602 os.utime(name, dir_fd=dirfd, ns=ns)
603 finally:
604 os.close(dirfd)
605 self._test_utime(set_time)
606
607 def test_utime_directory(self):
608 def set_time(filename, ns):
609 # test calling os.utime() on a directory
610 os.utime(filename, ns=ns)
611 self._test_utime(set_time, filename=self.dirname)
612
613 def _test_utime_current(self, set_time):
614 # Get the system clock
615 current = time.time()
616
617 # Call os.utime() to set the timestamp to the current system clock
618 set_time(self.fname)
619
620 if not self.support_subsecond(self.fname):
621 delta = 1.0
622 else:
623 # On Windows, the usual resolution of time.time() is 15.6 ms
624 delta = 0.020
625 st = os.stat(self.fname)
626 msg = ("st_time=%r, current=%r, dt=%r"
627 % (st.st_mtime, current, st.st_mtime - current))
628 self.assertAlmostEqual(st.st_mtime, current,
629 delta=delta, msg=msg)
630
631 def test_utime_current(self):
632 def set_time(filename):
633 # Set to the current time in the new way
634 os.utime(self.fname)
635 self._test_utime_current(set_time)
636
637 def test_utime_current_old(self):
638 def set_time(filename):
639 # Set to the current time in the old explicit way.
640 os.utime(self.fname, None)
641 self._test_utime_current(set_time)
642
643 def get_file_system(self, path):
644 if sys.platform == 'win32':
645 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
646 import ctypes
647 kernel32 = ctypes.windll.kernel32
648 buf = ctypes.create_unicode_buffer("", 100)
649 ok = kernel32.GetVolumeInformationW(root, None, 0,
650 None, None, None,
651 buf, len(buf))
652 if ok:
653 return buf.value
654 # return None if the filesystem is unknown
655
656 def test_large_time(self):
657 # Many filesystems are limited to the year 2038. At least, the test
658 # pass with NTFS filesystem.
659 if self.get_file_system(self.dirname) != "NTFS":
660 self.skipTest("requires NTFS")
661
662 large = 5000000000 # some day in 2128
663 os.utime(self.fname, (large, large))
664 self.assertEqual(os.stat(self.fname).st_mtime, large)
665
666 def test_utime_invalid_arguments(self):
667 # seconds and nanoseconds parameters are mutually exclusive
668 with self.assertRaises(ValueError):
669 os.utime(self.fname, (5, 5), ns=(5, 5))
670
671
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000672from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000673
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000674class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000675 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000676 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000677
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678 def setUp(self):
679 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000680 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000681 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000682 for key, value in self._reference().items():
683 os.environ[key] = value
684
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000685 def tearDown(self):
686 os.environ.clear()
687 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000688 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000689 os.environb.clear()
690 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000691
Christian Heimes90333392007-11-01 19:08:42 +0000692 def _reference(self):
693 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
694
695 def _empty_mapping(self):
696 os.environ.clear()
697 return os.environ
698
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000699 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200700 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
701 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000702 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000703 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300704 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200705 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300706 value = popen.read().strip()
707 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000708
Xavier de Gayed1415312016-07-22 12:15:29 +0200709 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
710 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000711 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200712 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
713 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300714 it = iter(popen)
715 self.assertEqual(next(it), "line1\n")
716 self.assertEqual(next(it), "line2\n")
717 self.assertEqual(next(it), "line3\n")
718 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000719
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000720 # Verify environ keys and values from the OS are of the
721 # correct str type.
722 def test_keyvalue_types(self):
723 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000724 self.assertEqual(type(key), str)
725 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000726
Christian Heimes90333392007-11-01 19:08:42 +0000727 def test_items(self):
728 for key, value in self._reference().items():
729 self.assertEqual(os.environ.get(key), value)
730
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000731 # Issue 7310
732 def test___repr__(self):
733 """Check that the repr() of os.environ looks like environ({...})."""
734 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000735 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
736 '{!r}: {!r}'.format(key, value)
737 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000738
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000739 def test_get_exec_path(self):
740 defpath_list = os.defpath.split(os.pathsep)
741 test_path = ['/monty', '/python', '', '/flying/circus']
742 test_env = {'PATH': os.pathsep.join(test_path)}
743
744 saved_environ = os.environ
745 try:
746 os.environ = dict(test_env)
747 # Test that defaulting to os.environ works.
748 self.assertSequenceEqual(test_path, os.get_exec_path())
749 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
750 finally:
751 os.environ = saved_environ
752
753 # No PATH environment variable
754 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
755 # Empty PATH environment variable
756 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
757 # Supplied PATH environment variable
758 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
759
Victor Stinnerb745a742010-05-18 17:17:23 +0000760 if os.supports_bytes_environ:
761 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000762 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000763 # ignore BytesWarning warning
764 with warnings.catch_warnings(record=True):
765 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000766 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000767 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000768 pass
769 else:
770 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000771
772 # bytes key and/or value
773 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
774 ['abc'])
775 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
776 ['abc'])
777 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
778 ['abc'])
779
780 @unittest.skipUnless(os.supports_bytes_environ,
781 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000782 def test_environb(self):
783 # os.environ -> os.environb
784 value = 'euro\u20ac'
785 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000786 value_bytes = value.encode(sys.getfilesystemencoding(),
787 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000788 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000789 msg = "U+20AC character is not encodable to %s" % (
790 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000791 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000792 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000793 self.assertEqual(os.environ['unicode'], value)
794 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000795
796 # os.environb -> os.environ
797 value = b'\xff'
798 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000799 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000801 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000802
Charles-François Natali2966f102011-11-26 11:32:46 +0100803 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
804 # #13415).
805 @support.requires_freebsd_version(7)
806 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100807 def test_unset_error(self):
808 if sys.platform == "win32":
809 # an environment variable is limited to 32,767 characters
810 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100811 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100812 else:
813 # "=" is not allowed in a variable name
814 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100815 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100816
Victor Stinner6d101392013-04-14 16:35:04 +0200817 def test_key_type(self):
818 missing = 'missingkey'
819 self.assertNotIn(missing, os.environ)
820
Victor Stinner839e5ea2013-04-14 16:43:03 +0200821 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200822 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200823 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200824 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200825
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200827 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200828 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200829 self.assertTrue(cm.exception.__suppress_context__)
830
Victor Stinner6d101392013-04-14 16:35:04 +0200831
Tim Petersc4e09402003-04-25 07:11:48 +0000832class WalkTests(unittest.TestCase):
833 """Tests for os.walk()."""
834
Victor Stinner0561c532015-03-12 10:28:24 +0100835 # Wrapper to hide minor differences between os.walk and os.fwalk
836 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200837 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200838 if 'follow_symlinks' in kwargs:
839 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200840 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100841
Charles-François Natali7372b062012-02-05 15:15:38 +0100842 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100843 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100844 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000845
846 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000847 # TESTFN/
848 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000849 # tmp1
850 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000851 # tmp2
852 # SUB11/ no kids
853 # SUB2/ a file kid and a dirsymlink kid
854 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300855 # SUB21/ not readable
856 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000857 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200858 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300859 # broken_link2
860 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000861 # TEST2/
862 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100863 self.walk_path = join(support.TESTFN, "TEST1")
864 self.sub1_path = join(self.walk_path, "SUB1")
865 self.sub11_path = join(self.sub1_path, "SUB11")
866 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300867 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100868 tmp1_path = join(self.walk_path, "tmp1")
869 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000870 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300871 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100872 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000873 t2_path = join(support.TESTFN, "TEST2")
874 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200875 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300876 broken_link2_path = join(sub2_path, "broken_link2")
877 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000878
879 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100880 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000881 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300882 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000883 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100884
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300885 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100886 with open(path, "x") as f:
887 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000888
Victor Stinner0561c532015-03-12 10:28:24 +0100889 if support.can_symlink():
890 os.symlink(os.path.abspath(t2_path), self.link_path)
891 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300892 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
893 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300894 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300895 ["broken_link", "broken_link2", "broken_link3",
896 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100897 else:
898 self.sub2_tree = (sub2_path, [], ["tmp3"])
899
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300900 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300901 try:
902 os.listdir(sub21_path)
903 except PermissionError:
904 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
905 else:
906 os.chmod(sub21_path, stat.S_IRWXU)
907 os.unlink(tmp5_path)
908 os.rmdir(sub21_path)
909 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300910
Victor Stinner0561c532015-03-12 10:28:24 +0100911 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000912 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300913 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100914
Tim Petersc4e09402003-04-25 07:11:48 +0000915 self.assertEqual(len(all), 4)
916 # We can't know which order SUB1 and SUB2 will appear in.
917 # Not flipped: TESTFN, SUB1, SUB11, SUB2
918 # flipped: TESTFN, SUB2, SUB1, SUB11
919 flipped = all[0][1][0] != "SUB1"
920 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200921 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300922 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100923 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
924 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
925 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
926 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000927
Brett Cannon3f9183b2016-08-26 14:44:48 -0700928 def test_walk_prune(self, walk_path=None):
929 if walk_path is None:
930 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000931 # Prune the search.
932 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700933 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000934 all.append((root, dirs, files))
935 # Don't descend into SUB1.
936 if 'SUB1' in dirs:
937 # Note that this also mutates the dirs we appended to all!
938 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000939
Victor Stinner0561c532015-03-12 10:28:24 +0100940 self.assertEqual(len(all), 2)
941 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700942 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100943
944 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300945 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100946 self.assertEqual(all[1], self.sub2_tree)
947
Brett Cannon3f9183b2016-08-26 14:44:48 -0700948 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700949 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700950
Victor Stinner0561c532015-03-12 10:28:24 +0100951 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000952 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100953 all = list(self.walk(self.walk_path, topdown=False))
954
Victor Stinner53b0a412016-03-26 01:12:36 +0100955 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000956 # We can't know which order SUB1 and SUB2 will appear in.
957 # Not flipped: SUB11, SUB1, SUB2, TESTFN
958 # flipped: SUB2, SUB11, SUB1, TESTFN
959 flipped = all[3][1][0] != "SUB1"
960 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200961 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300962 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100963 self.assertEqual(all[3],
964 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
965 self.assertEqual(all[flipped],
966 (self.sub11_path, [], []))
967 self.assertEqual(all[flipped + 1],
968 (self.sub1_path, ["SUB11"], ["tmp2"]))
969 self.assertEqual(all[2 - 2 * flipped],
970 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000971
Victor Stinner0561c532015-03-12 10:28:24 +0100972 def test_walk_symlink(self):
973 if not support.can_symlink():
974 self.skipTest("need symlink support")
975
976 # Walk, following symlinks.
977 walk_it = self.walk(self.walk_path, follow_symlinks=True)
978 for root, dirs, files in walk_it:
979 if root == self.link_path:
980 self.assertEqual(dirs, [])
981 self.assertEqual(files, ["tmp4"])
982 break
983 else:
984 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000985
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200986 def test_walk_bad_dir(self):
987 # Walk top-down.
988 errors = []
989 walk_it = self.walk(self.walk_path, onerror=errors.append)
990 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300991 self.assertEqual(errors, [])
992 dir1 = 'SUB1'
993 path1 = os.path.join(root, dir1)
994 path1new = os.path.join(root, dir1 + '.new')
995 os.rename(path1, path1new)
996 try:
997 roots = [r for r, d, f in walk_it]
998 self.assertTrue(errors)
999 self.assertNotIn(path1, roots)
1000 self.assertNotIn(path1new, roots)
1001 for dir2 in dirs:
1002 if dir2 != dir1:
1003 self.assertIn(os.path.join(root, dir2), roots)
1004 finally:
1005 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001006
Charles-François Natali7372b062012-02-05 15:15:38 +01001007
1008@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1009class FwalkTests(WalkTests):
1010 """Tests for os.fwalk()."""
1011
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001012 def walk(self, top, **kwargs):
1013 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001014 yield (root, dirs, files)
1015
Larry Hastingsc48fe982012-06-25 04:49:05 -07001016 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1017 """
1018 compare with walk() results.
1019 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001020 walk_kwargs = walk_kwargs.copy()
1021 fwalk_kwargs = fwalk_kwargs.copy()
1022 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1023 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1024 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001025
Charles-François Natali7372b062012-02-05 15:15:38 +01001026 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001027 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001028 expected[root] = (set(dirs), set(files))
1029
Larry Hastingsc48fe982012-06-25 04:49:05 -07001030 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001031 self.assertIn(root, expected)
1032 self.assertEqual(expected[root], (set(dirs), set(files)))
1033
Larry Hastingsc48fe982012-06-25 04:49:05 -07001034 def test_compare_to_walk(self):
1035 kwargs = {'top': support.TESTFN}
1036 self._compare_to_walk(kwargs, kwargs)
1037
Charles-François Natali7372b062012-02-05 15:15:38 +01001038 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001039 try:
1040 fd = os.open(".", os.O_RDONLY)
1041 walk_kwargs = {'top': support.TESTFN}
1042 fwalk_kwargs = walk_kwargs.copy()
1043 fwalk_kwargs['dir_fd'] = fd
1044 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1045 finally:
1046 os.close(fd)
1047
1048 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001049 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001050 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1051 args = support.TESTFN, topdown, None
1052 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001053 # check that the FD is valid
1054 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001055 # redundant check
1056 os.stat(rootfd)
1057 # check that listdir() returns consistent information
1058 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001059
1060 def test_fd_leak(self):
1061 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1062 # we both check that calling fwalk() a large number of times doesn't
1063 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1064 minfd = os.dup(1)
1065 os.close(minfd)
1066 for i in range(256):
1067 for x in os.fwalk(support.TESTFN):
1068 pass
1069 newfd = os.dup(1)
1070 self.addCleanup(os.close, newfd)
1071 self.assertEqual(newfd, minfd)
1072
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001073class BytesWalkTests(WalkTests):
1074 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001075 def setUp(self):
1076 super().setUp()
1077 self.stack = contextlib.ExitStack()
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001078
1079 def tearDown(self):
1080 self.stack.close()
1081 super().tearDown()
1082
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001083 def walk(self, top, **kwargs):
1084 if 'follow_symlinks' in kwargs:
1085 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1086 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1087 root = os.fsdecode(broot)
1088 dirs = list(map(os.fsdecode, bdirs))
1089 files = list(map(os.fsdecode, bfiles))
1090 yield (root, dirs, files)
1091 bdirs[:] = list(map(os.fsencode, dirs))
1092 bfiles[:] = list(map(os.fsencode, files))
1093
Charles-François Natali7372b062012-02-05 15:15:38 +01001094
Guido van Rossume7ba4952007-06-06 23:52:48 +00001095class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001096 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001097 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001098
1099 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001100 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001101 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1102 os.makedirs(path) # Should work
1103 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1104 os.makedirs(path)
1105
1106 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001107 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001108 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1109 os.makedirs(path)
1110 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1111 'dir5', 'dir6')
1112 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001113
Terry Reedy5a22b652010-12-02 07:05:56 +00001114 def test_exist_ok_existing_directory(self):
1115 path = os.path.join(support.TESTFN, 'dir1')
1116 mode = 0o777
1117 old_mask = os.umask(0o022)
1118 os.makedirs(path, mode)
1119 self.assertRaises(OSError, os.makedirs, path, mode)
1120 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001121 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001122 os.makedirs(path, mode=mode, exist_ok=True)
1123 os.umask(old_mask)
1124
Martin Pantera82642f2015-11-19 04:48:44 +00001125 # Issue #25583: A drive root could raise PermissionError on Windows
1126 os.makedirs(os.path.abspath('/'), exist_ok=True)
1127
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001128 def test_exist_ok_s_isgid_directory(self):
1129 path = os.path.join(support.TESTFN, 'dir1')
1130 S_ISGID = stat.S_ISGID
1131 mode = 0o777
1132 old_mask = os.umask(0o022)
1133 try:
1134 existing_testfn_mode = stat.S_IMODE(
1135 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001136 try:
1137 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001138 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001139 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001140 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1141 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1142 # The os should apply S_ISGID from the parent dir for us, but
1143 # this test need not depend on that behavior. Be explicit.
1144 os.makedirs(path, mode | S_ISGID)
1145 # http://bugs.python.org/issue14992
1146 # Should not fail when the bit is already set.
1147 os.makedirs(path, mode, exist_ok=True)
1148 # remove the bit.
1149 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001150 # May work even when the bit is not already set when demanded.
1151 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001152 finally:
1153 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001154
1155 def test_exist_ok_existing_regular_file(self):
1156 base = support.TESTFN
1157 path = os.path.join(support.TESTFN, 'dir1')
1158 f = open(path, 'w')
1159 f.write('abc')
1160 f.close()
1161 self.assertRaises(OSError, os.makedirs, path)
1162 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1163 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1164 os.remove(path)
1165
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001166 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001167 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001168 'dir4', 'dir5', 'dir6')
1169 # If the tests failed, the bottom-most directory ('../dir6')
1170 # may not have been created, so we look for the outermost directory
1171 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001172 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001173 path = os.path.dirname(path)
1174
1175 os.removedirs(path)
1176
Andrew Svetlov405faed2012-12-25 12:18:09 +02001177
R David Murrayf2ad1732014-12-25 18:36:56 -05001178@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1179class ChownFileTests(unittest.TestCase):
1180
Berker Peksag036a71b2015-07-21 09:29:48 +03001181 @classmethod
1182 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001183 os.mkdir(support.TESTFN)
1184
1185 def test_chown_uid_gid_arguments_must_be_index(self):
1186 stat = os.stat(support.TESTFN)
1187 uid = stat.st_uid
1188 gid = stat.st_gid
1189 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1190 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1191 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1192 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1193 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1194
1195 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1196 def test_chown(self):
1197 gid_1, gid_2 = groups[:2]
1198 uid = os.stat(support.TESTFN).st_uid
1199 os.chown(support.TESTFN, uid, gid_1)
1200 gid = os.stat(support.TESTFN).st_gid
1201 self.assertEqual(gid, gid_1)
1202 os.chown(support.TESTFN, uid, gid_2)
1203 gid = os.stat(support.TESTFN).st_gid
1204 self.assertEqual(gid, gid_2)
1205
1206 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1207 "test needs root privilege and more than one user")
1208 def test_chown_with_root(self):
1209 uid_1, uid_2 = all_users[:2]
1210 gid = os.stat(support.TESTFN).st_gid
1211 os.chown(support.TESTFN, uid_1, gid)
1212 uid = os.stat(support.TESTFN).st_uid
1213 self.assertEqual(uid, uid_1)
1214 os.chown(support.TESTFN, uid_2, gid)
1215 uid = os.stat(support.TESTFN).st_uid
1216 self.assertEqual(uid, uid_2)
1217
1218 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1219 "test needs non-root account and more than one user")
1220 def test_chown_without_permission(self):
1221 uid_1, uid_2 = all_users[:2]
1222 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001223 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001224 os.chown(support.TESTFN, uid_1, gid)
1225 os.chown(support.TESTFN, uid_2, gid)
1226
Berker Peksag036a71b2015-07-21 09:29:48 +03001227 @classmethod
1228 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001229 os.rmdir(support.TESTFN)
1230
1231
Andrew Svetlov405faed2012-12-25 12:18:09 +02001232class RemoveDirsTests(unittest.TestCase):
1233 def setUp(self):
1234 os.makedirs(support.TESTFN)
1235
1236 def tearDown(self):
1237 support.rmtree(support.TESTFN)
1238
1239 def test_remove_all(self):
1240 dira = os.path.join(support.TESTFN, 'dira')
1241 os.mkdir(dira)
1242 dirb = os.path.join(dira, 'dirb')
1243 os.mkdir(dirb)
1244 os.removedirs(dirb)
1245 self.assertFalse(os.path.exists(dirb))
1246 self.assertFalse(os.path.exists(dira))
1247 self.assertFalse(os.path.exists(support.TESTFN))
1248
1249 def test_remove_partial(self):
1250 dira = os.path.join(support.TESTFN, 'dira')
1251 os.mkdir(dira)
1252 dirb = os.path.join(dira, 'dirb')
1253 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001254 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001255 os.removedirs(dirb)
1256 self.assertFalse(os.path.exists(dirb))
1257 self.assertTrue(os.path.exists(dira))
1258 self.assertTrue(os.path.exists(support.TESTFN))
1259
1260 def test_remove_nothing(self):
1261 dira = os.path.join(support.TESTFN, 'dira')
1262 os.mkdir(dira)
1263 dirb = os.path.join(dira, 'dirb')
1264 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001265 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001266 with self.assertRaises(OSError):
1267 os.removedirs(dirb)
1268 self.assertTrue(os.path.exists(dirb))
1269 self.assertTrue(os.path.exists(dira))
1270 self.assertTrue(os.path.exists(support.TESTFN))
1271
1272
Guido van Rossume7ba4952007-06-06 23:52:48 +00001273class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001274 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001275 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001276 f.write(b'hello')
1277 f.close()
1278 with open(os.devnull, 'rb') as f:
1279 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001280
Andrew Svetlov405faed2012-12-25 12:18:09 +02001281
Guido van Rossume7ba4952007-06-06 23:52:48 +00001282class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001283 def test_urandom_length(self):
1284 self.assertEqual(len(os.urandom(0)), 0)
1285 self.assertEqual(len(os.urandom(1)), 1)
1286 self.assertEqual(len(os.urandom(10)), 10)
1287 self.assertEqual(len(os.urandom(100)), 100)
1288 self.assertEqual(len(os.urandom(1000)), 1000)
1289
1290 def test_urandom_value(self):
1291 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001292 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001293 data2 = os.urandom(16)
1294 self.assertNotEqual(data1, data2)
1295
1296 def get_urandom_subprocess(self, count):
1297 code = '\n'.join((
1298 'import os, sys',
1299 'data = os.urandom(%s)' % count,
1300 'sys.stdout.buffer.write(data)',
1301 'sys.stdout.buffer.flush()'))
1302 out = assert_python_ok('-c', code)
1303 stdout = out[1]
1304 self.assertEqual(len(stdout), 16)
1305 return stdout
1306
1307 def test_urandom_subprocess(self):
1308 data1 = self.get_urandom_subprocess(16)
1309 data2 = self.get_urandom_subprocess(16)
1310 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001311
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001312
Victor Stinner9b1f4742016-09-06 16:18:52 -07001313@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1314class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001315 @classmethod
1316 def setUpClass(cls):
1317 try:
1318 os.getrandom(1)
1319 except OSError as exc:
1320 if exc.errno == errno.ENOSYS:
1321 # Python compiled on a more recent Linux version
1322 # than the current Linux kernel
1323 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1324 else:
1325 raise
1326
Victor Stinner9b1f4742016-09-06 16:18:52 -07001327 def test_getrandom_type(self):
1328 data = os.getrandom(16)
1329 self.assertIsInstance(data, bytes)
1330 self.assertEqual(len(data), 16)
1331
1332 def test_getrandom0(self):
1333 empty = os.getrandom(0)
1334 self.assertEqual(empty, b'')
1335
1336 def test_getrandom_random(self):
1337 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1338
1339 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1340 # resource /dev/random
1341
1342 def test_getrandom_nonblock(self):
1343 # The call must not fail. Check also that the flag exists
1344 try:
1345 os.getrandom(1, os.GRND_NONBLOCK)
1346 except BlockingIOError:
1347 # System urandom is not initialized yet
1348 pass
1349
1350 def test_getrandom_value(self):
1351 data1 = os.getrandom(16)
1352 data2 = os.getrandom(16)
1353 self.assertNotEqual(data1, data2)
1354
1355
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001356# os.urandom() doesn't use a file descriptor when it is implemented with the
1357# getentropy() function, the getrandom() function or the getrandom() syscall
1358OS_URANDOM_DONT_USE_FD = (
1359 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1360 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1361 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001362
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001363@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1364 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001365class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001366 @unittest.skipUnless(resource, "test requires the resource module")
1367 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001368 # Check urandom() failing when it is not able to open /dev/random.
1369 # We spawn a new process to make the test more robust (if getrlimit()
1370 # failed to restore the file descriptor limit after this, the whole
1371 # test suite would crash; this actually happened on the OS X Tiger
1372 # buildbot).
1373 code = """if 1:
1374 import errno
1375 import os
1376 import resource
1377
1378 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1379 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1380 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001381 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001382 except OSError as e:
1383 assert e.errno == errno.EMFILE, e.errno
1384 else:
1385 raise AssertionError("OSError not raised")
1386 """
1387 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001388
Antoine Pitroue472aea2014-04-26 14:33:03 +02001389 def test_urandom_fd_closed(self):
1390 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1391 # closed.
1392 code = """if 1:
1393 import os
1394 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001395 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001396 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001397 with test.support.SuppressCrashReport():
1398 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001399 sys.stdout.buffer.write(os.urandom(4))
1400 """
1401 rc, out, err = assert_python_ok('-Sc', code)
1402
1403 def test_urandom_fd_reopened(self):
1404 # Issue #21207: urandom() should detect its fd to /dev/urandom
1405 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001406 self.addCleanup(support.unlink, support.TESTFN)
1407 create_file(support.TESTFN, b"x" * 256)
1408
Antoine Pitroue472aea2014-04-26 14:33:03 +02001409 code = """if 1:
1410 import os
1411 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001412 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001413 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001414 with test.support.SuppressCrashReport():
1415 for fd in range(3, 256):
1416 try:
1417 os.close(fd)
1418 except OSError:
1419 pass
1420 else:
1421 # Found the urandom fd (XXX hopefully)
1422 break
1423 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001424 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001425 new_fd = f.fileno()
1426 # Issue #26935: posix allows new_fd and fd to be equal but
1427 # some libc implementations have dup2 return an error in this
1428 # case.
1429 if new_fd != fd:
1430 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001431 sys.stdout.buffer.write(os.urandom(4))
1432 sys.stdout.buffer.write(os.urandom(4))
1433 """.format(TESTFN=support.TESTFN)
1434 rc, out, err = assert_python_ok('-Sc', code)
1435 self.assertEqual(len(out), 8)
1436 self.assertNotEqual(out[0:4], out[4:8])
1437 rc, out2, err2 = assert_python_ok('-Sc', code)
1438 self.assertEqual(len(out2), 8)
1439 self.assertNotEqual(out2, out)
1440
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001441
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001442@contextlib.contextmanager
1443def _execvpe_mockup(defpath=None):
1444 """
1445 Stubs out execv and execve functions when used as context manager.
1446 Records exec calls. The mock execv and execve functions always raise an
1447 exception as they would normally never return.
1448 """
1449 # A list of tuples containing (function name, first arg, args)
1450 # of calls to execv or execve that have been made.
1451 calls = []
1452
1453 def mock_execv(name, *args):
1454 calls.append(('execv', name, args))
1455 raise RuntimeError("execv called")
1456
1457 def mock_execve(name, *args):
1458 calls.append(('execve', name, args))
1459 raise OSError(errno.ENOTDIR, "execve called")
1460
1461 try:
1462 orig_execv = os.execv
1463 orig_execve = os.execve
1464 orig_defpath = os.defpath
1465 os.execv = mock_execv
1466 os.execve = mock_execve
1467 if defpath is not None:
1468 os.defpath = defpath
1469 yield calls
1470 finally:
1471 os.execv = orig_execv
1472 os.execve = orig_execve
1473 os.defpath = orig_defpath
1474
Victor Stinner4659ccf2016-09-14 10:57:00 +02001475
Guido van Rossume7ba4952007-06-06 23:52:48 +00001476class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001477 @unittest.skipIf(USING_LINUXTHREADS,
1478 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001479 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001480 self.assertRaises(OSError, os.execvpe, 'no such app-',
1481 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001482
Steve Dowerbce26262016-11-19 19:17:26 -08001483 def test_execv_with_bad_arglist(self):
1484 self.assertRaises(ValueError, os.execv, 'notepad', ())
1485 self.assertRaises(ValueError, os.execv, 'notepad', [])
1486 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1487 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1488
Thomas Heller6790d602007-08-30 17:15:14 +00001489 def test_execvpe_with_bad_arglist(self):
1490 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001491 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1492 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001493
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001494 @unittest.skipUnless(hasattr(os, '_execvpe'),
1495 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001496 def _test_internal_execvpe(self, test_type):
1497 program_path = os.sep + 'absolutepath'
1498 if test_type is bytes:
1499 program = b'executable'
1500 fullpath = os.path.join(os.fsencode(program_path), program)
1501 native_fullpath = fullpath
1502 arguments = [b'progname', 'arg1', 'arg2']
1503 else:
1504 program = 'executable'
1505 arguments = ['progname', 'arg1', 'arg2']
1506 fullpath = os.path.join(program_path, program)
1507 if os.name != "nt":
1508 native_fullpath = os.fsencode(fullpath)
1509 else:
1510 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001511 env = {'spam': 'beans'}
1512
Victor Stinnerb745a742010-05-18 17:17:23 +00001513 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001514 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001515 self.assertRaises(RuntimeError,
1516 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001517 self.assertEqual(len(calls), 1)
1518 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1519
Victor Stinnerb745a742010-05-18 17:17:23 +00001520 # test os._execvpe() with a relative path:
1521 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001522 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001523 self.assertRaises(OSError,
1524 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001525 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001526 self.assertSequenceEqual(calls[0],
1527 ('execve', native_fullpath, (arguments, env)))
1528
1529 # test os._execvpe() with a relative path:
1530 # os.get_exec_path() reads the 'PATH' variable
1531 with _execvpe_mockup() as calls:
1532 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001533 if test_type is bytes:
1534 env_path[b'PATH'] = program_path
1535 else:
1536 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001537 self.assertRaises(OSError,
1538 os._execvpe, program, arguments, env=env_path)
1539 self.assertEqual(len(calls), 1)
1540 self.assertSequenceEqual(calls[0],
1541 ('execve', native_fullpath, (arguments, env_path)))
1542
1543 def test_internal_execvpe_str(self):
1544 self._test_internal_execvpe(str)
1545 if os.name != "nt":
1546 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001547
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001548
Serhiy Storchaka43767632013-11-03 21:31:38 +02001549@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001550class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001551 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001552 try:
1553 os.stat(support.TESTFN)
1554 except FileNotFoundError:
1555 exists = False
1556 except OSError as exc:
1557 exists = True
1558 self.fail("file %s must not exist; os.stat failed with %s"
1559 % (support.TESTFN, exc))
1560 else:
1561 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001562
Thomas Wouters477c8d52006-05-27 19:21:47 +00001563 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001564 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001565
1566 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001567 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001568
1569 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001570 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001571
1572 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001573 self.addCleanup(support.unlink, support.TESTFN)
1574
Victor Stinnere77c9742016-03-25 10:28:23 +01001575 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001576 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001577
1578 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001579 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001580
Thomas Wouters477c8d52006-05-27 19:21:47 +00001581 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001582 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001583
Victor Stinnere77c9742016-03-25 10:28:23 +01001584
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001585class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001586 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001587 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1588 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001589 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001590 def get_single(f):
1591 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001592 if hasattr(os, f):
1593 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001594 return helper
1595 for f in singles:
1596 locals()["test_"+f] = get_single(f)
1597
Benjamin Peterson7522c742009-01-19 21:00:09 +00001598 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001599 try:
1600 f(support.make_bad_fd(), *args)
1601 except OSError as e:
1602 self.assertEqual(e.errno, errno.EBADF)
1603 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001604 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001605 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001606
Serhiy Storchaka43767632013-11-03 21:31:38 +02001607 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001608 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001609 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001610
Serhiy Storchaka43767632013-11-03 21:31:38 +02001611 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001612 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001613 fd = support.make_bad_fd()
1614 # Make sure none of the descriptors we are about to close are
1615 # currently valid (issue 6542).
1616 for i in range(10):
1617 try: os.fstat(fd+i)
1618 except OSError:
1619 pass
1620 else:
1621 break
1622 if i < 2:
1623 raise unittest.SkipTest(
1624 "Unable to acquire a range of invalid file descriptors")
1625 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001626
Serhiy Storchaka43767632013-11-03 21:31:38 +02001627 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001628 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001629 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001630
Serhiy Storchaka43767632013-11-03 21:31:38 +02001631 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001632 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001633 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001634
Serhiy Storchaka43767632013-11-03 21:31:38 +02001635 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001636 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001638
Serhiy Storchaka43767632013-11-03 21:31:38 +02001639 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001640 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001641 self.check(os.pathconf, "PC_NAME_MAX")
1642 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001643
Serhiy Storchaka43767632013-11-03 21:31:38 +02001644 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001645 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001646 self.check(os.truncate, 0)
1647 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001648
Serhiy Storchaka43767632013-11-03 21:31:38 +02001649 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001650 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001651 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001652
Serhiy Storchaka43767632013-11-03 21:31:38 +02001653 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001654 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656
Victor Stinner57ddf782014-01-08 15:21:28 +01001657 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1658 def test_readv(self):
1659 buf = bytearray(10)
1660 self.check(os.readv, [buf])
1661
Serhiy Storchaka43767632013-11-03 21:31:38 +02001662 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001663 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001664 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001665
Serhiy Storchaka43767632013-11-03 21:31:38 +02001666 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001667 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001668 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001669
Victor Stinner57ddf782014-01-08 15:21:28 +01001670 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1671 def test_writev(self):
1672 self.check(os.writev, [b'abc'])
1673
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001674 def test_inheritable(self):
1675 self.check(os.get_inheritable)
1676 self.check(os.set_inheritable, True)
1677
1678 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1679 'needs os.get_blocking() and os.set_blocking()')
1680 def test_blocking(self):
1681 self.check(os.get_blocking)
1682 self.check(os.set_blocking, True)
1683
Brian Curtin1b9df392010-11-24 20:24:31 +00001684
1685class LinkTests(unittest.TestCase):
1686 def setUp(self):
1687 self.file1 = support.TESTFN
1688 self.file2 = os.path.join(support.TESTFN + "2")
1689
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001690 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001691 for file in (self.file1, self.file2):
1692 if os.path.exists(file):
1693 os.unlink(file)
1694
Brian Curtin1b9df392010-11-24 20:24:31 +00001695 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001696 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001697
Steve Dowercc16be82016-09-08 10:35:16 -07001698 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001699 with open(file1, "r") as f1, open(file2, "r") as f2:
1700 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1701
1702 def test_link(self):
1703 self._test_link(self.file1, self.file2)
1704
1705 def test_link_bytes(self):
1706 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1707 bytes(self.file2, sys.getfilesystemencoding()))
1708
Brian Curtinf498b752010-11-30 15:54:04 +00001709 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001710 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001711 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001712 except UnicodeError:
1713 raise unittest.SkipTest("Unable to encode for this platform.")
1714
Brian Curtinf498b752010-11-30 15:54:04 +00001715 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001716 self.file2 = self.file1 + "2"
1717 self._test_link(self.file1, self.file2)
1718
Serhiy Storchaka43767632013-11-03 21:31:38 +02001719@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1720class PosixUidGidTests(unittest.TestCase):
1721 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1722 def test_setuid(self):
1723 if os.getuid() != 0:
1724 self.assertRaises(OSError, os.setuid, 0)
1725 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001726
Serhiy Storchaka43767632013-11-03 21:31:38 +02001727 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1728 def test_setgid(self):
1729 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1730 self.assertRaises(OSError, os.setgid, 0)
1731 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001732
Serhiy Storchaka43767632013-11-03 21:31:38 +02001733 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1734 def test_seteuid(self):
1735 if os.getuid() != 0:
1736 self.assertRaises(OSError, os.seteuid, 0)
1737 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001738
Serhiy Storchaka43767632013-11-03 21:31:38 +02001739 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1740 def test_setegid(self):
1741 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1742 self.assertRaises(OSError, os.setegid, 0)
1743 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001744
Serhiy Storchaka43767632013-11-03 21:31:38 +02001745 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1746 def test_setreuid(self):
1747 if os.getuid() != 0:
1748 self.assertRaises(OSError, os.setreuid, 0, 0)
1749 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1750 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001751
Serhiy Storchaka43767632013-11-03 21:31:38 +02001752 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1753 def test_setreuid_neg1(self):
1754 # Needs to accept -1. We run this in a subprocess to avoid
1755 # altering the test runner's process state (issue8045).
1756 subprocess.check_call([
1757 sys.executable, '-c',
1758 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001759
Serhiy Storchaka43767632013-11-03 21:31:38 +02001760 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1761 def test_setregid(self):
1762 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1763 self.assertRaises(OSError, os.setregid, 0, 0)
1764 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1765 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001766
Serhiy Storchaka43767632013-11-03 21:31:38 +02001767 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1768 def test_setregid_neg1(self):
1769 # Needs to accept -1. We run this in a subprocess to avoid
1770 # altering the test runner's process state (issue8045).
1771 subprocess.check_call([
1772 sys.executable, '-c',
1773 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001774
Serhiy Storchaka43767632013-11-03 21:31:38 +02001775@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1776class Pep383Tests(unittest.TestCase):
1777 def setUp(self):
1778 if support.TESTFN_UNENCODABLE:
1779 self.dir = support.TESTFN_UNENCODABLE
1780 elif support.TESTFN_NONASCII:
1781 self.dir = support.TESTFN_NONASCII
1782 else:
1783 self.dir = support.TESTFN
1784 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001785
Serhiy Storchaka43767632013-11-03 21:31:38 +02001786 bytesfn = []
1787 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001788 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001789 fn = os.fsencode(fn)
1790 except UnicodeEncodeError:
1791 return
1792 bytesfn.append(fn)
1793 add_filename(support.TESTFN_UNICODE)
1794 if support.TESTFN_UNENCODABLE:
1795 add_filename(support.TESTFN_UNENCODABLE)
1796 if support.TESTFN_NONASCII:
1797 add_filename(support.TESTFN_NONASCII)
1798 if not bytesfn:
1799 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001800
Serhiy Storchaka43767632013-11-03 21:31:38 +02001801 self.unicodefn = set()
1802 os.mkdir(self.dir)
1803 try:
1804 for fn in bytesfn:
1805 support.create_empty_file(os.path.join(self.bdir, fn))
1806 fn = os.fsdecode(fn)
1807 if fn in self.unicodefn:
1808 raise ValueError("duplicate filename")
1809 self.unicodefn.add(fn)
1810 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001811 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001812 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001813
Serhiy Storchaka43767632013-11-03 21:31:38 +02001814 def tearDown(self):
1815 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001816
Serhiy Storchaka43767632013-11-03 21:31:38 +02001817 def test_listdir(self):
1818 expected = self.unicodefn
1819 found = set(os.listdir(self.dir))
1820 self.assertEqual(found, expected)
1821 # test listdir without arguments
1822 current_directory = os.getcwd()
1823 try:
1824 os.chdir(os.sep)
1825 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1826 finally:
1827 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001828
Serhiy Storchaka43767632013-11-03 21:31:38 +02001829 def test_open(self):
1830 for fn in self.unicodefn:
1831 f = open(os.path.join(self.dir, fn), 'rb')
1832 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001833
Serhiy Storchaka43767632013-11-03 21:31:38 +02001834 @unittest.skipUnless(hasattr(os, 'statvfs'),
1835 "need os.statvfs()")
1836 def test_statvfs(self):
1837 # issue #9645
1838 for fn in self.unicodefn:
1839 # should not fail with file not found error
1840 fullname = os.path.join(self.dir, fn)
1841 os.statvfs(fullname)
1842
1843 def test_stat(self):
1844 for fn in self.unicodefn:
1845 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001846
Brian Curtineb24d742010-04-12 17:16:38 +00001847@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1848class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001849 def _kill(self, sig):
1850 # Start sys.executable as a subprocess and communicate from the
1851 # subprocess to the parent that the interpreter is ready. When it
1852 # becomes ready, send *sig* via os.kill to the subprocess and check
1853 # that the return code is equal to *sig*.
1854 import ctypes
1855 from ctypes import wintypes
1856 import msvcrt
1857
1858 # Since we can't access the contents of the process' stdout until the
1859 # process has exited, use PeekNamedPipe to see what's inside stdout
1860 # without waiting. This is done so we can tell that the interpreter
1861 # is started and running at a point where it could handle a signal.
1862 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1863 PeekNamedPipe.restype = wintypes.BOOL
1864 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1865 ctypes.POINTER(ctypes.c_char), # stdout buf
1866 wintypes.DWORD, # Buffer size
1867 ctypes.POINTER(wintypes.DWORD), # bytes read
1868 ctypes.POINTER(wintypes.DWORD), # bytes avail
1869 ctypes.POINTER(wintypes.DWORD)) # bytes left
1870 msg = "running"
1871 proc = subprocess.Popen([sys.executable, "-c",
1872 "import sys;"
1873 "sys.stdout.write('{}');"
1874 "sys.stdout.flush();"
1875 "input()".format(msg)],
1876 stdout=subprocess.PIPE,
1877 stderr=subprocess.PIPE,
1878 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001879 self.addCleanup(proc.stdout.close)
1880 self.addCleanup(proc.stderr.close)
1881 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001882
1883 count, max = 0, 100
1884 while count < max and proc.poll() is None:
1885 # Create a string buffer to store the result of stdout from the pipe
1886 buf = ctypes.create_string_buffer(len(msg))
1887 # Obtain the text currently in proc.stdout
1888 # Bytes read/avail/left are left as NULL and unused
1889 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1890 buf, ctypes.sizeof(buf), None, None, None)
1891 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1892 if buf.value:
1893 self.assertEqual(msg, buf.value.decode())
1894 break
1895 time.sleep(0.1)
1896 count += 1
1897 else:
1898 self.fail("Did not receive communication from the subprocess")
1899
Brian Curtineb24d742010-04-12 17:16:38 +00001900 os.kill(proc.pid, sig)
1901 self.assertEqual(proc.wait(), sig)
1902
1903 def test_kill_sigterm(self):
1904 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001905 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001906
1907 def test_kill_int(self):
1908 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001909 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001910
1911 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001912 tagname = "test_os_%s" % uuid.uuid1()
1913 m = mmap.mmap(-1, 1, tagname)
1914 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001915 # Run a script which has console control handling enabled.
1916 proc = subprocess.Popen([sys.executable,
1917 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001918 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001919 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1920 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001921 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001922 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001923 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001924 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001925 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001926 count += 1
1927 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001928 # Forcefully kill the process if we weren't able to signal it.
1929 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001930 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001931 os.kill(proc.pid, event)
1932 # proc.send_signal(event) could also be done here.
1933 # Allow time for the signal to be passed and the process to exit.
1934 time.sleep(0.5)
1935 if not proc.poll():
1936 # Forcefully kill the process if we weren't able to signal it.
1937 os.kill(proc.pid, signal.SIGINT)
1938 self.fail("subprocess did not stop on {}".format(name))
1939
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001940 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001941 def test_CTRL_C_EVENT(self):
1942 from ctypes import wintypes
1943 import ctypes
1944
1945 # Make a NULL value by creating a pointer with no argument.
1946 NULL = ctypes.POINTER(ctypes.c_int)()
1947 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1948 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1949 wintypes.BOOL)
1950 SetConsoleCtrlHandler.restype = wintypes.BOOL
1951
1952 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001953 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001954 # by subprocesses.
1955 SetConsoleCtrlHandler(NULL, 0)
1956
1957 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1958
1959 def test_CTRL_BREAK_EVENT(self):
1960 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1961
1962
Brian Curtind40e6f72010-07-08 21:39:08 +00001963@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001964class Win32ListdirTests(unittest.TestCase):
1965 """Test listdir on Windows."""
1966
1967 def setUp(self):
1968 self.created_paths = []
1969 for i in range(2):
1970 dir_name = 'SUB%d' % i
1971 dir_path = os.path.join(support.TESTFN, dir_name)
1972 file_name = 'FILE%d' % i
1973 file_path = os.path.join(support.TESTFN, file_name)
1974 os.makedirs(dir_path)
1975 with open(file_path, 'w') as f:
1976 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1977 self.created_paths.extend([dir_name, file_name])
1978 self.created_paths.sort()
1979
1980 def tearDown(self):
1981 shutil.rmtree(support.TESTFN)
1982
1983 def test_listdir_no_extended_path(self):
1984 """Test when the path is not an "extended" path."""
1985 # unicode
1986 self.assertEqual(
1987 sorted(os.listdir(support.TESTFN)),
1988 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001989
Tim Golden781bbeb2013-10-25 20:24:06 +01001990 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001991 self.assertEqual(
1992 sorted(os.listdir(os.fsencode(support.TESTFN))),
1993 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001994
1995 def test_listdir_extended_path(self):
1996 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001997 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001998 # unicode
1999 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2000 self.assertEqual(
2001 sorted(os.listdir(path)),
2002 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002003
Tim Golden781bbeb2013-10-25 20:24:06 +01002004 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002005 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2006 self.assertEqual(
2007 sorted(os.listdir(path)),
2008 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002009
2010
2011@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002012@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002013class Win32SymlinkTests(unittest.TestCase):
2014 filelink = 'filelinktest'
2015 filelink_target = os.path.abspath(__file__)
2016 dirlink = 'dirlinktest'
2017 dirlink_target = os.path.dirname(filelink_target)
2018 missing_link = 'missing link'
2019
2020 def setUp(self):
2021 assert os.path.exists(self.dirlink_target)
2022 assert os.path.exists(self.filelink_target)
2023 assert not os.path.exists(self.dirlink)
2024 assert not os.path.exists(self.filelink)
2025 assert not os.path.exists(self.missing_link)
2026
2027 def tearDown(self):
2028 if os.path.exists(self.filelink):
2029 os.remove(self.filelink)
2030 if os.path.exists(self.dirlink):
2031 os.rmdir(self.dirlink)
2032 if os.path.lexists(self.missing_link):
2033 os.remove(self.missing_link)
2034
2035 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002036 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002037 self.assertTrue(os.path.exists(self.dirlink))
2038 self.assertTrue(os.path.isdir(self.dirlink))
2039 self.assertTrue(os.path.islink(self.dirlink))
2040 self.check_stat(self.dirlink, self.dirlink_target)
2041
2042 def test_file_link(self):
2043 os.symlink(self.filelink_target, self.filelink)
2044 self.assertTrue(os.path.exists(self.filelink))
2045 self.assertTrue(os.path.isfile(self.filelink))
2046 self.assertTrue(os.path.islink(self.filelink))
2047 self.check_stat(self.filelink, self.filelink_target)
2048
2049 def _create_missing_dir_link(self):
2050 'Create a "directory" link to a non-existent target'
2051 linkname = self.missing_link
2052 if os.path.lexists(linkname):
2053 os.remove(linkname)
2054 target = r'c:\\target does not exist.29r3c740'
2055 assert not os.path.exists(target)
2056 target_is_dir = True
2057 os.symlink(target, linkname, target_is_dir)
2058
2059 def test_remove_directory_link_to_missing_target(self):
2060 self._create_missing_dir_link()
2061 # For compatibility with Unix, os.remove will check the
2062 # directory status and call RemoveDirectory if the symlink
2063 # was created with target_is_dir==True.
2064 os.remove(self.missing_link)
2065
2066 @unittest.skip("currently fails; consider for improvement")
2067 def test_isdir_on_directory_link_to_missing_target(self):
2068 self._create_missing_dir_link()
2069 # consider having isdir return true for directory links
2070 self.assertTrue(os.path.isdir(self.missing_link))
2071
2072 @unittest.skip("currently fails; consider for improvement")
2073 def test_rmdir_on_directory_link_to_missing_target(self):
2074 self._create_missing_dir_link()
2075 # consider allowing rmdir to remove directory links
2076 os.rmdir(self.missing_link)
2077
2078 def check_stat(self, link, target):
2079 self.assertEqual(os.stat(link), os.stat(target))
2080 self.assertNotEqual(os.lstat(link), os.stat(link))
2081
Brian Curtind25aef52011-06-13 15:16:04 -05002082 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002083 self.assertEqual(os.stat(bytes_link), os.stat(target))
2084 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002085
2086 def test_12084(self):
2087 level1 = os.path.abspath(support.TESTFN)
2088 level2 = os.path.join(level1, "level2")
2089 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002090 self.addCleanup(support.rmtree, level1)
2091
2092 os.mkdir(level1)
2093 os.mkdir(level2)
2094 os.mkdir(level3)
2095
2096 file1 = os.path.abspath(os.path.join(level1, "file1"))
2097 create_file(file1)
2098
2099 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002100 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002101 os.chdir(level2)
2102 link = os.path.join(level2, "link")
2103 os.symlink(os.path.relpath(file1), "link")
2104 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002105
Victor Stinnerae39d232016-03-24 17:12:55 +01002106 # Check os.stat calls from the same dir as the link
2107 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002108
Victor Stinnerae39d232016-03-24 17:12:55 +01002109 # Check os.stat calls from a dir below the link
2110 os.chdir(level1)
2111 self.assertEqual(os.stat(file1),
2112 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002113
Victor Stinnerae39d232016-03-24 17:12:55 +01002114 # Check os.stat calls from a dir above the link
2115 os.chdir(level3)
2116 self.assertEqual(os.stat(file1),
2117 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002118 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002119 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002120
Brian Curtind40e6f72010-07-08 21:39:08 +00002121
Tim Golden0321cf22014-05-05 19:46:17 +01002122@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2123class Win32JunctionTests(unittest.TestCase):
2124 junction = 'junctiontest'
2125 junction_target = os.path.dirname(os.path.abspath(__file__))
2126
2127 def setUp(self):
2128 assert os.path.exists(self.junction_target)
2129 assert not os.path.exists(self.junction)
2130
2131 def tearDown(self):
2132 if os.path.exists(self.junction):
2133 # os.rmdir delegates to Windows' RemoveDirectoryW,
2134 # which removes junction points safely.
2135 os.rmdir(self.junction)
2136
2137 def test_create_junction(self):
2138 _winapi.CreateJunction(self.junction_target, self.junction)
2139 self.assertTrue(os.path.exists(self.junction))
2140 self.assertTrue(os.path.isdir(self.junction))
2141
2142 # Junctions are not recognized as links.
2143 self.assertFalse(os.path.islink(self.junction))
2144
2145 def test_unlink_removes_junction(self):
2146 _winapi.CreateJunction(self.junction_target, self.junction)
2147 self.assertTrue(os.path.exists(self.junction))
2148
2149 os.unlink(self.junction)
2150 self.assertFalse(os.path.exists(self.junction))
2151
2152
Jason R. Coombs3a092862013-05-27 23:21:28 -04002153@support.skip_unless_symlink
2154class NonLocalSymlinkTests(unittest.TestCase):
2155
2156 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002157 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002158 Create this structure:
2159
2160 base
2161 \___ some_dir
2162 """
2163 os.makedirs('base/some_dir')
2164
2165 def tearDown(self):
2166 shutil.rmtree('base')
2167
2168 def test_directory_link_nonlocal(self):
2169 """
2170 The symlink target should resolve relative to the link, not relative
2171 to the current directory.
2172
2173 Then, link base/some_link -> base/some_dir and ensure that some_link
2174 is resolved as a directory.
2175
2176 In issue13772, it was discovered that directory detection failed if
2177 the symlink target was not specified relative to the current
2178 directory, which was a defect in the implementation.
2179 """
2180 src = os.path.join('base', 'some_link')
2181 os.symlink('some_dir', src)
2182 assert os.path.isdir(src)
2183
2184
Victor Stinnere8d51452010-08-19 01:05:19 +00002185class FSEncodingTests(unittest.TestCase):
2186 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002187 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2188 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002189
Victor Stinnere8d51452010-08-19 01:05:19 +00002190 def test_identity(self):
2191 # assert fsdecode(fsencode(x)) == x
2192 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2193 try:
2194 bytesfn = os.fsencode(fn)
2195 except UnicodeEncodeError:
2196 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002197 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002198
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002199
Brett Cannonefb00c02012-02-29 18:31:31 -05002200
2201class DeviceEncodingTests(unittest.TestCase):
2202
2203 def test_bad_fd(self):
2204 # Return None when an fd doesn't actually exist.
2205 self.assertIsNone(os.device_encoding(123456))
2206
Philip Jenveye308b7c2012-02-29 16:16:15 -08002207 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2208 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002209 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002210 def test_device_encoding(self):
2211 encoding = os.device_encoding(0)
2212 self.assertIsNotNone(encoding)
2213 self.assertTrue(codecs.lookup(encoding))
2214
2215
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002216class PidTests(unittest.TestCase):
2217 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2218 def test_getppid(self):
2219 p = subprocess.Popen([sys.executable, '-c',
2220 'import os; print(os.getppid())'],
2221 stdout=subprocess.PIPE)
2222 stdout, _ = p.communicate()
2223 # We are the parent of our subprocess
2224 self.assertEqual(int(stdout), os.getpid())
2225
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002226 def test_waitpid(self):
2227 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002228 # Add an implicit test for PyUnicode_FSConverter().
2229 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002230 status = os.waitpid(pid, 0)
2231 self.assertEqual(status, (pid, 0))
2232
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002233
Victor Stinner4659ccf2016-09-14 10:57:00 +02002234class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002235 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002236 self.exitcode = 17
2237
2238 filename = support.TESTFN
2239 self.addCleanup(support.unlink, filename)
2240
2241 if not with_env:
2242 code = 'import sys; sys.exit(%s)' % self.exitcode
2243 else:
2244 self.env = dict(os.environ)
2245 # create an unique key
2246 self.key = str(uuid.uuid4())
2247 self.env[self.key] = self.key
2248 # read the variable from os.environ to check that it exists
2249 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2250 % (self.key, self.exitcode))
2251
2252 with open(filename, "w") as fp:
2253 fp.write(code)
2254
Berker Peksag81816462016-09-15 20:19:47 +03002255 args = [sys.executable, filename]
2256 if use_bytes:
2257 args = [os.fsencode(a) for a in args]
2258 self.env = {os.fsencode(k): os.fsencode(v)
2259 for k, v in self.env.items()}
2260
2261 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002262
Berker Peksag4af23d72016-09-15 20:32:44 +03002263 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002264 def test_spawnl(self):
2265 args = self.create_args()
2266 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2267 self.assertEqual(exitcode, self.exitcode)
2268
Berker Peksag4af23d72016-09-15 20:32:44 +03002269 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002270 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002271 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002272 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2273 self.assertEqual(exitcode, self.exitcode)
2274
Berker Peksag4af23d72016-09-15 20:32:44 +03002275 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002276 def test_spawnlp(self):
2277 args = self.create_args()
2278 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2279 self.assertEqual(exitcode, self.exitcode)
2280
Berker Peksag4af23d72016-09-15 20:32:44 +03002281 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002282 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002283 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002284 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2285 self.assertEqual(exitcode, self.exitcode)
2286
Berker Peksag4af23d72016-09-15 20:32:44 +03002287 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002288 def test_spawnv(self):
2289 args = self.create_args()
2290 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2291 self.assertEqual(exitcode, self.exitcode)
2292
Berker Peksag4af23d72016-09-15 20:32:44 +03002293 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002294 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002295 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002296 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2297 self.assertEqual(exitcode, self.exitcode)
2298
Berker Peksag4af23d72016-09-15 20:32:44 +03002299 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002300 def test_spawnvp(self):
2301 args = self.create_args()
2302 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2303 self.assertEqual(exitcode, self.exitcode)
2304
Berker Peksag4af23d72016-09-15 20:32:44 +03002305 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002306 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002307 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002308 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2309 self.assertEqual(exitcode, self.exitcode)
2310
Berker Peksag4af23d72016-09-15 20:32:44 +03002311 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002312 def test_nowait(self):
2313 args = self.create_args()
2314 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2315 result = os.waitpid(pid, 0)
2316 self.assertEqual(result[0], pid)
2317 status = result[1]
2318 if hasattr(os, 'WIFEXITED'):
2319 self.assertTrue(os.WIFEXITED(status))
2320 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2321 else:
2322 self.assertEqual(status, self.exitcode << 8)
2323
Berker Peksag4af23d72016-09-15 20:32:44 +03002324 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002325 def test_spawnve_bytes(self):
2326 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2327 args = self.create_args(with_env=True, use_bytes=True)
2328 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2329 self.assertEqual(exitcode, self.exitcode)
2330
Steve Dower859fd7b2016-11-19 18:53:19 -08002331 @requires_os_func('spawnl')
2332 def test_spawnl_noargs(self):
2333 args = self.create_args()
2334 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002335 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002336
2337 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002338 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002339 args = self.create_args()
2340 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002341 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002342
2343 @requires_os_func('spawnv')
2344 def test_spawnv_noargs(self):
2345 args = self.create_args()
2346 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2347 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002348 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2349 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002350
2351 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002352 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002353 args = self.create_args()
2354 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2355 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002356 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2357 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002358
Brian Curtin0151b8e2010-09-24 13:43:43 +00002359# The introduction of this TestCase caused at least two different errors on
2360# *nix buildbots. Temporarily skip this to let the buildbots move along.
2361@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002362@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2363class LoginTests(unittest.TestCase):
2364 def test_getlogin(self):
2365 user_name = os.getlogin()
2366 self.assertNotEqual(len(user_name), 0)
2367
2368
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002369@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2370 "needs os.getpriority and os.setpriority")
2371class ProgramPriorityTests(unittest.TestCase):
2372 """Tests for os.getpriority() and os.setpriority()."""
2373
2374 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002375
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002376 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2377 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2378 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002379 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2380 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002381 raise unittest.SkipTest("unable to reliably test setpriority "
2382 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002383 else:
2384 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002385 finally:
2386 try:
2387 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2388 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002389 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002390 raise
2391
2392
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002393if threading is not None:
2394 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002395
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002396 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002397
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002398 def __init__(self, conn):
2399 asynchat.async_chat.__init__(self, conn)
2400 self.in_buffer = []
2401 self.closed = False
2402 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002403
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002404 def handle_read(self):
2405 data = self.recv(4096)
2406 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002407
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002408 def get_data(self):
2409 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002410
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002411 def handle_close(self):
2412 self.close()
2413 self.closed = True
2414
2415 def handle_error(self):
2416 raise
2417
2418 def __init__(self, address):
2419 threading.Thread.__init__(self)
2420 asyncore.dispatcher.__init__(self)
2421 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2422 self.bind(address)
2423 self.listen(5)
2424 self.host, self.port = self.socket.getsockname()[:2]
2425 self.handler_instance = None
2426 self._active = False
2427 self._active_lock = threading.Lock()
2428
2429 # --- public API
2430
2431 @property
2432 def running(self):
2433 return self._active
2434
2435 def start(self):
2436 assert not self.running
2437 self.__flag = threading.Event()
2438 threading.Thread.start(self)
2439 self.__flag.wait()
2440
2441 def stop(self):
2442 assert self.running
2443 self._active = False
2444 self.join()
2445
2446 def wait(self):
2447 # wait for handler connection to be closed, then stop the server
2448 while not getattr(self.handler_instance, "closed", False):
2449 time.sleep(0.001)
2450 self.stop()
2451
2452 # --- internals
2453
2454 def run(self):
2455 self._active = True
2456 self.__flag.set()
2457 while self._active and asyncore.socket_map:
2458 self._active_lock.acquire()
2459 asyncore.loop(timeout=0.001, count=1)
2460 self._active_lock.release()
2461 asyncore.close_all()
2462
2463 def handle_accept(self):
2464 conn, addr = self.accept()
2465 self.handler_instance = self.Handler(conn)
2466
2467 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002468 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002469 handle_read = handle_connect
2470
2471 def writable(self):
2472 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002473
2474 def handle_error(self):
2475 raise
2476
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002477
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002478@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002479@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2480class TestSendfile(unittest.TestCase):
2481
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002482 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002483 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002484 not sys.platform.startswith("solaris") and \
2485 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002486 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2487 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002488
2489 @classmethod
2490 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002491 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002492 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002493
2494 @classmethod
2495 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002496 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002497 support.unlink(support.TESTFN)
2498
2499 def setUp(self):
2500 self.server = SendfileTestServer((support.HOST, 0))
2501 self.server.start()
2502 self.client = socket.socket()
2503 self.client.connect((self.server.host, self.server.port))
2504 self.client.settimeout(1)
2505 # synchronize by waiting for "220 ready" response
2506 self.client.recv(1024)
2507 self.sockno = self.client.fileno()
2508 self.file = open(support.TESTFN, 'rb')
2509 self.fileno = self.file.fileno()
2510
2511 def tearDown(self):
2512 self.file.close()
2513 self.client.close()
2514 if self.server.running:
2515 self.server.stop()
2516
2517 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2518 """A higher level wrapper representing how an application is
2519 supposed to use sendfile().
2520 """
2521 while 1:
2522 try:
2523 if self.SUPPORT_HEADERS_TRAILERS:
2524 return os.sendfile(sock, file, offset, nbytes, headers,
2525 trailers)
2526 else:
2527 return os.sendfile(sock, file, offset, nbytes)
2528 except OSError as err:
2529 if err.errno == errno.ECONNRESET:
2530 # disconnected
2531 raise
2532 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2533 # we have to retry send data
2534 continue
2535 else:
2536 raise
2537
2538 def test_send_whole_file(self):
2539 # normal send
2540 total_sent = 0
2541 offset = 0
2542 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002543 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002544 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2545 if sent == 0:
2546 break
2547 offset += sent
2548 total_sent += sent
2549 self.assertTrue(sent <= nbytes)
2550 self.assertEqual(offset, total_sent)
2551
2552 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002553 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002554 self.client.close()
2555 self.server.wait()
2556 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002557 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002558 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002559
2560 def test_send_at_certain_offset(self):
2561 # start sending a file at a certain offset
2562 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002563 offset = len(self.DATA) // 2
2564 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002565 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002566 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002567 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2568 if sent == 0:
2569 break
2570 offset += sent
2571 total_sent += sent
2572 self.assertTrue(sent <= nbytes)
2573
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002574 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002575 self.client.close()
2576 self.server.wait()
2577 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002578 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002579 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002580 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002581 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002582
2583 def test_offset_overflow(self):
2584 # specify an offset > file size
2585 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002586 try:
2587 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2588 except OSError as e:
2589 # Solaris can raise EINVAL if offset >= file length, ignore.
2590 if e.errno != errno.EINVAL:
2591 raise
2592 else:
2593 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002594 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002595 self.client.close()
2596 self.server.wait()
2597 data = self.server.handler_instance.get_data()
2598 self.assertEqual(data, b'')
2599
2600 def test_invalid_offset(self):
2601 with self.assertRaises(OSError) as cm:
2602 os.sendfile(self.sockno, self.fileno, -1, 4096)
2603 self.assertEqual(cm.exception.errno, errno.EINVAL)
2604
Martin Panterbf19d162015-09-09 01:01:13 +00002605 def test_keywords(self):
2606 # Keyword arguments should be supported
2607 os.sendfile(out=self.sockno, offset=0, count=4096,
2608 **{'in': self.fileno})
2609 if self.SUPPORT_HEADERS_TRAILERS:
2610 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002611 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002612
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002613 # --- headers / trailers tests
2614
Serhiy Storchaka43767632013-11-03 21:31:38 +02002615 @requires_headers_trailers
2616 def test_headers(self):
2617 total_sent = 0
2618 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2619 headers=[b"x" * 512])
2620 total_sent += sent
2621 offset = 4096
2622 nbytes = 4096
2623 while 1:
2624 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2625 offset, nbytes)
2626 if sent == 0:
2627 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002628 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002629 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002630
Serhiy Storchaka43767632013-11-03 21:31:38 +02002631 expected_data = b"x" * 512 + self.DATA
2632 self.assertEqual(total_sent, len(expected_data))
2633 self.client.close()
2634 self.server.wait()
2635 data = self.server.handler_instance.get_data()
2636 self.assertEqual(hash(data), hash(expected_data))
2637
2638 @requires_headers_trailers
2639 def test_trailers(self):
2640 TESTFN2 = support.TESTFN + "2"
2641 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002642
2643 self.addCleanup(support.unlink, TESTFN2)
2644 create_file(TESTFN2, file_data)
2645
2646 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002647 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2648 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002649 self.client.close()
2650 self.server.wait()
2651 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002652 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002653
Serhiy Storchaka43767632013-11-03 21:31:38 +02002654 @requires_headers_trailers
2655 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2656 'test needs os.SF_NODISKIO')
2657 def test_flags(self):
2658 try:
2659 os.sendfile(self.sockno, self.fileno, 0, 4096,
2660 flags=os.SF_NODISKIO)
2661 except OSError as err:
2662 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2663 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002664
2665
Larry Hastings9cf065c2012-06-22 16:30:09 -07002666def supports_extended_attributes():
2667 if not hasattr(os, "setxattr"):
2668 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002669
Larry Hastings9cf065c2012-06-22 16:30:09 -07002670 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002671 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002672 try:
2673 os.setxattr(fp.fileno(), b"user.test", b"")
2674 except OSError:
2675 return False
2676 finally:
2677 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002678
2679 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002680
2681
2682@unittest.skipUnless(supports_extended_attributes(),
2683 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002684# Kernels < 2.6.39 don't respect setxattr flags.
2685@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002686class ExtendedAttributeTests(unittest.TestCase):
2687
Larry Hastings9cf065c2012-06-22 16:30:09 -07002688 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002689 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002690 self.addCleanup(support.unlink, fn)
2691 create_file(fn)
2692
Benjamin Peterson799bd802011-08-31 22:15:17 -04002693 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002694 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002695 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002696
Victor Stinnerf12e5062011-10-16 22:12:03 +02002697 init_xattr = listxattr(fn)
2698 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002699
Larry Hastings9cf065c2012-06-22 16:30:09 -07002700 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002701 xattr = set(init_xattr)
2702 xattr.add("user.test")
2703 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002704 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2705 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2706 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002707
Benjamin Peterson799bd802011-08-31 22:15:17 -04002708 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002709 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002710 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002711
Benjamin Peterson799bd802011-08-31 22:15:17 -04002712 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002713 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002714 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002715
Larry Hastings9cf065c2012-06-22 16:30:09 -07002716 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002717 xattr.add("user.test2")
2718 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002719 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002720
Benjamin Peterson799bd802011-08-31 22:15:17 -04002721 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002722 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002723 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002724
Victor Stinnerf12e5062011-10-16 22:12:03 +02002725 xattr.remove("user.test")
2726 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002727 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2728 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2729 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2730 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002731 many = sorted("user.test{}".format(i) for i in range(100))
2732 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002733 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002734 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002735
Larry Hastings9cf065c2012-06-22 16:30:09 -07002736 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002737 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002738 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002739
2740 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2741 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002742
2743 def test_simple(self):
2744 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2745 os.listxattr)
2746
2747 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002748 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2749 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002750
2751 def test_fds(self):
2752 def getxattr(path, *args):
2753 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002754 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002755 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002756 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002757 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002758 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002759 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002760 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002761 def listxattr(path, *args):
2762 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002763 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002764 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2765
2766
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002767@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2768class TermsizeTests(unittest.TestCase):
2769 def test_does_not_crash(self):
2770 """Check if get_terminal_size() returns a meaningful value.
2771
2772 There's no easy portable way to actually check the size of the
2773 terminal, so let's check if it returns something sensible instead.
2774 """
2775 try:
2776 size = os.get_terminal_size()
2777 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002778 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002779 # Under win32 a generic OSError can be thrown if the
2780 # handle cannot be retrieved
2781 self.skipTest("failed to query terminal size")
2782 raise
2783
Antoine Pitroucfade362012-02-08 23:48:59 +01002784 self.assertGreaterEqual(size.columns, 0)
2785 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002786
2787 def test_stty_match(self):
2788 """Check if stty returns the same results
2789
2790 stty actually tests stdin, so get_terminal_size is invoked on
2791 stdin explicitly. If stty succeeded, then get_terminal_size()
2792 should work too.
2793 """
2794 try:
2795 size = subprocess.check_output(['stty', 'size']).decode().split()
2796 except (FileNotFoundError, subprocess.CalledProcessError):
2797 self.skipTest("stty invocation failed")
2798 expected = (int(size[1]), int(size[0])) # reversed order
2799
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002800 try:
2801 actual = os.get_terminal_size(sys.__stdin__.fileno())
2802 except OSError as e:
2803 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2804 # Under win32 a generic OSError can be thrown if the
2805 # handle cannot be retrieved
2806 self.skipTest("failed to query terminal size")
2807 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002808 self.assertEqual(expected, actual)
2809
2810
Victor Stinner292c8352012-10-30 02:17:38 +01002811class OSErrorTests(unittest.TestCase):
2812 def setUp(self):
2813 class Str(str):
2814 pass
2815
Victor Stinnerafe17062012-10-31 22:47:43 +01002816 self.bytes_filenames = []
2817 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002818 if support.TESTFN_UNENCODABLE is not None:
2819 decoded = support.TESTFN_UNENCODABLE
2820 else:
2821 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002822 self.unicode_filenames.append(decoded)
2823 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002824 if support.TESTFN_UNDECODABLE is not None:
2825 encoded = support.TESTFN_UNDECODABLE
2826 else:
2827 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002828 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002829 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002830 self.bytes_filenames.append(memoryview(encoded))
2831
2832 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002833
2834 def test_oserror_filename(self):
2835 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002836 (self.filenames, os.chdir,),
2837 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002838 (self.filenames, os.lstat,),
2839 (self.filenames, os.open, os.O_RDONLY),
2840 (self.filenames, os.rmdir,),
2841 (self.filenames, os.stat,),
2842 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002843 ]
2844 if sys.platform == "win32":
2845 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002846 (self.bytes_filenames, os.rename, b"dst"),
2847 (self.bytes_filenames, os.replace, b"dst"),
2848 (self.unicode_filenames, os.rename, "dst"),
2849 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002850 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002851 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002852 else:
2853 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002854 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002855 (self.filenames, os.rename, "dst"),
2856 (self.filenames, os.replace, "dst"),
2857 ))
2858 if hasattr(os, "chown"):
2859 funcs.append((self.filenames, os.chown, 0, 0))
2860 if hasattr(os, "lchown"):
2861 funcs.append((self.filenames, os.lchown, 0, 0))
2862 if hasattr(os, "truncate"):
2863 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002864 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002865 funcs.append((self.filenames, os.chflags, 0))
2866 if hasattr(os, "lchflags"):
2867 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002868 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002869 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002870 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002871 if sys.platform == "win32":
2872 funcs.append((self.bytes_filenames, os.link, b"dst"))
2873 funcs.append((self.unicode_filenames, os.link, "dst"))
2874 else:
2875 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002876 if hasattr(os, "listxattr"):
2877 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002878 (self.filenames, os.listxattr,),
2879 (self.filenames, os.getxattr, "user.test"),
2880 (self.filenames, os.setxattr, "user.test", b'user'),
2881 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002882 ))
2883 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002884 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002885 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002886 if sys.platform == "win32":
2887 funcs.append((self.unicode_filenames, os.readlink,))
2888 else:
2889 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002890
Steve Dowercc16be82016-09-08 10:35:16 -07002891
Victor Stinnerafe17062012-10-31 22:47:43 +01002892 for filenames, func, *func_args in funcs:
2893 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002894 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002895 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002896 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002897 else:
2898 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2899 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002900 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002901 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002902 except UnicodeDecodeError:
2903 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002904 else:
2905 self.fail("No exception thrown by {}".format(func))
2906
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002907class CPUCountTests(unittest.TestCase):
2908 def test_cpu_count(self):
2909 cpus = os.cpu_count()
2910 if cpus is not None:
2911 self.assertIsInstance(cpus, int)
2912 self.assertGreater(cpus, 0)
2913 else:
2914 self.skipTest("Could not determine the number of CPUs")
2915
Victor Stinnerdaf45552013-08-28 00:53:59 +02002916
2917class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002918 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002919 fd = os.open(__file__, os.O_RDONLY)
2920 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002921 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002922
Victor Stinnerdaf45552013-08-28 00:53:59 +02002923 os.set_inheritable(fd, True)
2924 self.assertEqual(os.get_inheritable(fd), True)
2925
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002926 @unittest.skipIf(fcntl is None, "need fcntl")
2927 def test_get_inheritable_cloexec(self):
2928 fd = os.open(__file__, os.O_RDONLY)
2929 self.addCleanup(os.close, fd)
2930 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002931
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002932 # clear FD_CLOEXEC flag
2933 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2934 flags &= ~fcntl.FD_CLOEXEC
2935 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002936
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002937 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002938
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002939 @unittest.skipIf(fcntl is None, "need fcntl")
2940 def test_set_inheritable_cloexec(self):
2941 fd = os.open(__file__, os.O_RDONLY)
2942 self.addCleanup(os.close, fd)
2943 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2944 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002945
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002946 os.set_inheritable(fd, True)
2947 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2948 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002949
Victor Stinnerdaf45552013-08-28 00:53:59 +02002950 def test_open(self):
2951 fd = os.open(__file__, os.O_RDONLY)
2952 self.addCleanup(os.close, fd)
2953 self.assertEqual(os.get_inheritable(fd), False)
2954
2955 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2956 def test_pipe(self):
2957 rfd, wfd = os.pipe()
2958 self.addCleanup(os.close, rfd)
2959 self.addCleanup(os.close, wfd)
2960 self.assertEqual(os.get_inheritable(rfd), False)
2961 self.assertEqual(os.get_inheritable(wfd), False)
2962
2963 def test_dup(self):
2964 fd1 = os.open(__file__, os.O_RDONLY)
2965 self.addCleanup(os.close, fd1)
2966
2967 fd2 = os.dup(fd1)
2968 self.addCleanup(os.close, fd2)
2969 self.assertEqual(os.get_inheritable(fd2), False)
2970
2971 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2972 def test_dup2(self):
2973 fd = os.open(__file__, os.O_RDONLY)
2974 self.addCleanup(os.close, fd)
2975
2976 # inheritable by default
2977 fd2 = os.open(__file__, os.O_RDONLY)
2978 try:
2979 os.dup2(fd, fd2)
2980 self.assertEqual(os.get_inheritable(fd2), True)
2981 finally:
2982 os.close(fd2)
2983
2984 # force non-inheritable
2985 fd3 = os.open(__file__, os.O_RDONLY)
2986 try:
2987 os.dup2(fd, fd3, inheritable=False)
2988 self.assertEqual(os.get_inheritable(fd3), False)
2989 finally:
2990 os.close(fd3)
2991
2992 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2993 def test_openpty(self):
2994 master_fd, slave_fd = os.openpty()
2995 self.addCleanup(os.close, master_fd)
2996 self.addCleanup(os.close, slave_fd)
2997 self.assertEqual(os.get_inheritable(master_fd), False)
2998 self.assertEqual(os.get_inheritable(slave_fd), False)
2999
3000
Brett Cannon3f9183b2016-08-26 14:44:48 -07003001class PathTConverterTests(unittest.TestCase):
3002 # tuples of (function name, allows fd arguments, additional arguments to
3003 # function, cleanup function)
3004 functions = [
3005 ('stat', True, (), None),
3006 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003007 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003008 ('chflags', False, (0,), None),
3009 ('lchflags', False, (0,), None),
3010 ('open', False, (0,), getattr(os, 'close', None)),
3011 ]
3012
3013 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003014 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003015 if os.name == 'nt':
3016 bytes_fspath = bytes_filename = None
3017 else:
3018 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003019 bytes_fspath = _PathLike(bytes_filename)
3020 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003021 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003022 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003023
Brett Cannonec6ce872016-09-06 15:50:29 -07003024 int_fspath = _PathLike(fd)
3025 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003026
3027 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3028 with self.subTest(name=name):
3029 try:
3030 fn = getattr(os, name)
3031 except AttributeError:
3032 continue
3033
Brett Cannon8f96a302016-08-26 19:30:11 -07003034 for path in (str_filename, bytes_filename, str_fspath,
3035 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003036 if path is None:
3037 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003038 with self.subTest(name=name, path=path):
3039 result = fn(path, *extra_args)
3040 if cleanup_fn is not None:
3041 cleanup_fn(result)
3042
3043 with self.assertRaisesRegex(
3044 TypeError, 'should be string, bytes'):
3045 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003046
3047 if allow_fd:
3048 result = fn(fd, *extra_args) # should not fail
3049 if cleanup_fn is not None:
3050 cleanup_fn(result)
3051 else:
3052 with self.assertRaisesRegex(
3053 TypeError,
3054 'os.PathLike'):
3055 fn(fd, *extra_args)
3056
3057
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003058@unittest.skipUnless(hasattr(os, 'get_blocking'),
3059 'needs os.get_blocking() and os.set_blocking()')
3060class BlockingTests(unittest.TestCase):
3061 def test_blocking(self):
3062 fd = os.open(__file__, os.O_RDONLY)
3063 self.addCleanup(os.close, fd)
3064 self.assertEqual(os.get_blocking(fd), True)
3065
3066 os.set_blocking(fd, False)
3067 self.assertEqual(os.get_blocking(fd), False)
3068
3069 os.set_blocking(fd, True)
3070 self.assertEqual(os.get_blocking(fd), True)
3071
3072
Yury Selivanov97e2e062014-09-26 12:33:06 -04003073
3074class ExportsTests(unittest.TestCase):
3075 def test_os_all(self):
3076 self.assertIn('open', os.__all__)
3077 self.assertIn('walk', os.__all__)
3078
3079
Victor Stinner6036e442015-03-08 01:58:04 +01003080class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003081 check_no_resource_warning = support.check_no_resource_warning
3082
Victor Stinner6036e442015-03-08 01:58:04 +01003083 def setUp(self):
3084 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003085 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003086 self.addCleanup(support.rmtree, self.path)
3087 os.mkdir(self.path)
3088
3089 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003090 path = self.bytes_path if isinstance(name, bytes) else self.path
3091 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003092 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003093 return filename
3094
3095 def get_entries(self, names):
3096 entries = dict((entry.name, entry)
3097 for entry in os.scandir(self.path))
3098 self.assertEqual(sorted(entries.keys()), names)
3099 return entries
3100
3101 def assert_stat_equal(self, stat1, stat2, skip_fields):
3102 if skip_fields:
3103 for attr in dir(stat1):
3104 if not attr.startswith("st_"):
3105 continue
3106 if attr in ("st_dev", "st_ino", "st_nlink"):
3107 continue
3108 self.assertEqual(getattr(stat1, attr),
3109 getattr(stat2, attr),
3110 (stat1, stat2, attr))
3111 else:
3112 self.assertEqual(stat1, stat2)
3113
3114 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003115 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003116 self.assertEqual(entry.name, name)
3117 self.assertEqual(entry.path, os.path.join(self.path, name))
3118 self.assertEqual(entry.inode(),
3119 os.stat(entry.path, follow_symlinks=False).st_ino)
3120
3121 entry_stat = os.stat(entry.path)
3122 self.assertEqual(entry.is_dir(),
3123 stat.S_ISDIR(entry_stat.st_mode))
3124 self.assertEqual(entry.is_file(),
3125 stat.S_ISREG(entry_stat.st_mode))
3126 self.assertEqual(entry.is_symlink(),
3127 os.path.islink(entry.path))
3128
3129 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3130 self.assertEqual(entry.is_dir(follow_symlinks=False),
3131 stat.S_ISDIR(entry_lstat.st_mode))
3132 self.assertEqual(entry.is_file(follow_symlinks=False),
3133 stat.S_ISREG(entry_lstat.st_mode))
3134
3135 self.assert_stat_equal(entry.stat(),
3136 entry_stat,
3137 os.name == 'nt' and not is_symlink)
3138 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3139 entry_lstat,
3140 os.name == 'nt')
3141
3142 def test_attributes(self):
3143 link = hasattr(os, 'link')
3144 symlink = support.can_symlink()
3145
3146 dirname = os.path.join(self.path, "dir")
3147 os.mkdir(dirname)
3148 filename = self.create_file("file.txt")
3149 if link:
3150 os.link(filename, os.path.join(self.path, "link_file.txt"))
3151 if symlink:
3152 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3153 target_is_directory=True)
3154 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3155
3156 names = ['dir', 'file.txt']
3157 if link:
3158 names.append('link_file.txt')
3159 if symlink:
3160 names.extend(('symlink_dir', 'symlink_file.txt'))
3161 entries = self.get_entries(names)
3162
3163 entry = entries['dir']
3164 self.check_entry(entry, 'dir', True, False, False)
3165
3166 entry = entries['file.txt']
3167 self.check_entry(entry, 'file.txt', False, True, False)
3168
3169 if link:
3170 entry = entries['link_file.txt']
3171 self.check_entry(entry, 'link_file.txt', False, True, False)
3172
3173 if symlink:
3174 entry = entries['symlink_dir']
3175 self.check_entry(entry, 'symlink_dir', True, False, True)
3176
3177 entry = entries['symlink_file.txt']
3178 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3179
3180 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003181 path = self.bytes_path if isinstance(name, bytes) else self.path
3182 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003183 self.assertEqual(len(entries), 1)
3184
3185 entry = entries[0]
3186 self.assertEqual(entry.name, name)
3187 return entry
3188
Brett Cannon96881cd2016-06-10 14:37:21 -07003189 def create_file_entry(self, name='file.txt'):
3190 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003191 return self.get_entry(os.path.basename(filename))
3192
3193 def test_current_directory(self):
3194 filename = self.create_file()
3195 old_dir = os.getcwd()
3196 try:
3197 os.chdir(self.path)
3198
3199 # call scandir() without parameter: it must list the content
3200 # of the current directory
3201 entries = dict((entry.name, entry) for entry in os.scandir())
3202 self.assertEqual(sorted(entries.keys()),
3203 [os.path.basename(filename)])
3204 finally:
3205 os.chdir(old_dir)
3206
3207 def test_repr(self):
3208 entry = self.create_file_entry()
3209 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3210
Brett Cannon96881cd2016-06-10 14:37:21 -07003211 def test_fspath_protocol(self):
3212 entry = self.create_file_entry()
3213 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3214
3215 def test_fspath_protocol_bytes(self):
3216 bytes_filename = os.fsencode('bytesfile.txt')
3217 bytes_entry = self.create_file_entry(name=bytes_filename)
3218 fspath = os.fspath(bytes_entry)
3219 self.assertIsInstance(fspath, bytes)
3220 self.assertEqual(fspath,
3221 os.path.join(os.fsencode(self.path),bytes_filename))
3222
Victor Stinner6036e442015-03-08 01:58:04 +01003223 def test_removed_dir(self):
3224 path = os.path.join(self.path, 'dir')
3225
3226 os.mkdir(path)
3227 entry = self.get_entry('dir')
3228 os.rmdir(path)
3229
3230 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3231 if os.name == 'nt':
3232 self.assertTrue(entry.is_dir())
3233 self.assertFalse(entry.is_file())
3234 self.assertFalse(entry.is_symlink())
3235 if os.name == 'nt':
3236 self.assertRaises(FileNotFoundError, entry.inode)
3237 # don't fail
3238 entry.stat()
3239 entry.stat(follow_symlinks=False)
3240 else:
3241 self.assertGreater(entry.inode(), 0)
3242 self.assertRaises(FileNotFoundError, entry.stat)
3243 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3244
3245 def test_removed_file(self):
3246 entry = self.create_file_entry()
3247 os.unlink(entry.path)
3248
3249 self.assertFalse(entry.is_dir())
3250 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3251 if os.name == 'nt':
3252 self.assertTrue(entry.is_file())
3253 self.assertFalse(entry.is_symlink())
3254 if os.name == 'nt':
3255 self.assertRaises(FileNotFoundError, entry.inode)
3256 # don't fail
3257 entry.stat()
3258 entry.stat(follow_symlinks=False)
3259 else:
3260 self.assertGreater(entry.inode(), 0)
3261 self.assertRaises(FileNotFoundError, entry.stat)
3262 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3263
3264 def test_broken_symlink(self):
3265 if not support.can_symlink():
3266 return self.skipTest('cannot create symbolic link')
3267
3268 filename = self.create_file("file.txt")
3269 os.symlink(filename,
3270 os.path.join(self.path, "symlink.txt"))
3271 entries = self.get_entries(['file.txt', 'symlink.txt'])
3272 entry = entries['symlink.txt']
3273 os.unlink(filename)
3274
3275 self.assertGreater(entry.inode(), 0)
3276 self.assertFalse(entry.is_dir())
3277 self.assertFalse(entry.is_file()) # broken symlink returns False
3278 self.assertFalse(entry.is_dir(follow_symlinks=False))
3279 self.assertFalse(entry.is_file(follow_symlinks=False))
3280 self.assertTrue(entry.is_symlink())
3281 self.assertRaises(FileNotFoundError, entry.stat)
3282 # don't fail
3283 entry.stat(follow_symlinks=False)
3284
3285 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003286 self.create_file("file.txt")
3287
3288 path_bytes = os.fsencode(self.path)
3289 entries = list(os.scandir(path_bytes))
3290 self.assertEqual(len(entries), 1, entries)
3291 entry = entries[0]
3292
3293 self.assertEqual(entry.name, b'file.txt')
3294 self.assertEqual(entry.path,
3295 os.fsencode(os.path.join(self.path, 'file.txt')))
3296
3297 def test_empty_path(self):
3298 self.assertRaises(FileNotFoundError, os.scandir, '')
3299
3300 def test_consume_iterator_twice(self):
3301 self.create_file("file.txt")
3302 iterator = os.scandir(self.path)
3303
3304 entries = list(iterator)
3305 self.assertEqual(len(entries), 1, entries)
3306
3307 # check than consuming the iterator twice doesn't raise exception
3308 entries2 = list(iterator)
3309 self.assertEqual(len(entries2), 0, entries2)
3310
3311 def test_bad_path_type(self):
3312 for obj in [1234, 1.234, {}, []]:
3313 self.assertRaises(TypeError, os.scandir, obj)
3314
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003315 def test_close(self):
3316 self.create_file("file.txt")
3317 self.create_file("file2.txt")
3318 iterator = os.scandir(self.path)
3319 next(iterator)
3320 iterator.close()
3321 # multiple closes
3322 iterator.close()
3323 with self.check_no_resource_warning():
3324 del iterator
3325
3326 def test_context_manager(self):
3327 self.create_file("file.txt")
3328 self.create_file("file2.txt")
3329 with os.scandir(self.path) as iterator:
3330 next(iterator)
3331 with self.check_no_resource_warning():
3332 del iterator
3333
3334 def test_context_manager_close(self):
3335 self.create_file("file.txt")
3336 self.create_file("file2.txt")
3337 with os.scandir(self.path) as iterator:
3338 next(iterator)
3339 iterator.close()
3340
3341 def test_context_manager_exception(self):
3342 self.create_file("file.txt")
3343 self.create_file("file2.txt")
3344 with self.assertRaises(ZeroDivisionError):
3345 with os.scandir(self.path) as iterator:
3346 next(iterator)
3347 1/0
3348 with self.check_no_resource_warning():
3349 del iterator
3350
3351 def test_resource_warning(self):
3352 self.create_file("file.txt")
3353 self.create_file("file2.txt")
3354 iterator = os.scandir(self.path)
3355 next(iterator)
3356 with self.assertWarns(ResourceWarning):
3357 del iterator
3358 support.gc_collect()
3359 # exhausted iterator
3360 iterator = os.scandir(self.path)
3361 list(iterator)
3362 with self.check_no_resource_warning():
3363 del iterator
3364
Victor Stinner6036e442015-03-08 01:58:04 +01003365
Ethan Furmancdc08792016-06-02 15:06:09 -07003366class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003367
3368 # Abstracted so it can be overridden to test pure Python implementation
3369 # if a C version is provided.
3370 fspath = staticmethod(os.fspath)
3371
Ethan Furmancdc08792016-06-02 15:06:09 -07003372 def test_return_bytes(self):
3373 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003374 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003375
3376 def test_return_string(self):
3377 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003378 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003379
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003380 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003381 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003382 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003383
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003384 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003385 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3386 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3387
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003388 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003389 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3390 self.assertTrue(issubclass(_PathLike, os.PathLike))
3391 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003392
Ethan Furmancdc08792016-06-02 15:06:09 -07003393 def test_garbage_in_exception_out(self):
3394 vapor = type('blah', (), {})
3395 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003396 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003397
3398 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003399 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003400
Brett Cannon044283a2016-07-15 10:41:49 -07003401 def test_bad_pathlike(self):
3402 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003403 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003404 # __fspath__ attribute that is not callable.
3405 c = type('foo', (), {})
3406 c.__fspath__ = 1
3407 self.assertRaises(TypeError, self.fspath, c())
3408 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003409 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003410 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003411
3412# Only test if the C version is provided, otherwise TestPEP519 already tested
3413# the pure Python implementation.
3414if hasattr(os, "_fspath"):
3415 class TestPEP519PurePython(TestPEP519):
3416
3417 """Explicitly test the pure Python implementation of os.fspath()."""
3418
3419 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003420
3421
Fred Drake2e2be372001-09-20 21:33:42 +00003422if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003423 unittest.main()