blob: 46ad2099a9f5249117e4d2eed4dee4bda7b23ce3 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010059except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050060 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
Berker Peksag4af23d72016-09-15 20:32:44 +030093def requires_os_func(name):
94 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
95
96
Brett Cannonec6ce872016-09-06 15:50:29 -070097class _PathLike(os.PathLike):
98
99 def __init__(self, path=""):
100 self.path = path
101
102 def __str__(self):
103 return str(self.path)
104
105 def __fspath__(self):
106 if isinstance(self.path, BaseException):
107 raise self.path
108 else:
109 return self.path
110
111
Victor Stinnerae39d232016-03-24 17:12:55 +0100112def create_file(filename, content=b'content'):
113 with open(filename, "xb", 0) as fp:
114 fp.write(content)
115
116
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117# Tests creating TESTFN
118class FileTests(unittest.TestCase):
119 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000120 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000122 tearDown = setUp
123
124 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000125 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000126 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000127 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000128
Christian Heimesfdab48e2008-01-20 09:06:41 +0000129 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000130 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
131 # We must allocate two consecutive file descriptors, otherwise
132 # it will mess up other file descriptors (perhaps even the three
133 # standard ones).
134 second = os.dup(first)
135 try:
136 retries = 0
137 while second != first + 1:
138 os.close(first)
139 retries += 1
140 if retries > 10:
141 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000142 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000143 first, second = second, os.dup(second)
144 finally:
145 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000146 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000147 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000148 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000149
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000150 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000151 def test_rename(self):
152 path = support.TESTFN
153 old = sys.getrefcount(path)
154 self.assertRaises(TypeError, os.rename, path, 0)
155 new = sys.getrefcount(path)
156 self.assertEqual(old, new)
157
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000158 def test_read(self):
159 with open(support.TESTFN, "w+b") as fobj:
160 fobj.write(b"spam")
161 fobj.flush()
162 fd = fobj.fileno()
163 os.lseek(fd, 0, 0)
164 s = os.read(fd, 4)
165 self.assertEqual(type(s), bytes)
166 self.assertEqual(s, b"spam")
167
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200168 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200169 # Skip the test on 32-bit platforms: the number of bytes must fit in a
170 # Py_ssize_t type
171 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
172 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200173 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
174 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200175 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100176 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200177
178 # Issue #21932: Make sure that os.read() does not raise an
179 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200180 with open(support.TESTFN, "rb") as fp:
181 data = os.read(fp.fileno(), size)
182
183 # The test does not try to read more than 2 GB at once because the
184 # operating system is free to return less bytes than requested.
185 self.assertEqual(data, b'test')
186
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000187 def test_write(self):
188 # os.write() accepts bytes- and buffer-like objects but not strings
189 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
190 self.assertRaises(TypeError, os.write, fd, "beans")
191 os.write(fd, b"bacon\n")
192 os.write(fd, bytearray(b"eggs\n"))
193 os.write(fd, memoryview(b"spam\n"))
194 os.close(fd)
195 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000196 self.assertEqual(fobj.read().splitlines(),
197 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000198
Victor Stinnere0daff12011-03-20 23:36:35 +0100199 def write_windows_console(self, *args):
200 retcode = subprocess.call(args,
201 # use a new console to not flood the test output
202 creationflags=subprocess.CREATE_NEW_CONSOLE,
203 # use a shell to hide the console window (SW_HIDE)
204 shell=True)
205 self.assertEqual(retcode, 0)
206
207 @unittest.skipUnless(sys.platform == 'win32',
208 'test specific to the Windows console')
209 def test_write_windows_console(self):
210 # Issue #11395: the Windows console returns an error (12: not enough
211 # space error) on writing into stdout if stdout mode is binary and the
212 # length is greater than 66,000 bytes (or less, depending on heap
213 # usage).
214 code = "print('x' * 100000)"
215 self.write_windows_console(sys.executable, "-c", code)
216 self.write_windows_console(sys.executable, "-u", "-c", code)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 def fdopen_helper(self, *args):
219 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200220 f = os.fdopen(fd, *args)
221 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000222
223 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200224 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
225 os.close(fd)
226
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000227 self.fdopen_helper()
228 self.fdopen_helper('r')
229 self.fdopen_helper('r', 100)
230
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100231 def test_replace(self):
232 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100233 self.addCleanup(support.unlink, support.TESTFN)
234 self.addCleanup(support.unlink, TESTFN2)
235
236 create_file(support.TESTFN, b"1")
237 create_file(TESTFN2, b"2")
238
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100239 os.replace(support.TESTFN, TESTFN2)
240 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
241 with open(TESTFN2, 'r') as f:
242 self.assertEqual(f.read(), "1")
243
Martin Panterbf19d162015-09-09 01:01:13 +0000244 def test_open_keywords(self):
245 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
246 dir_fd=None)
247 os.close(f)
248
249 def test_symlink_keywords(self):
250 symlink = support.get_attribute(os, "symlink")
251 try:
252 symlink(src='target', dst=support.TESTFN,
253 target_is_directory=False, dir_fd=None)
254 except (NotImplementedError, OSError):
255 pass # No OS support or unprivileged user
256
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258# Test attributes on return values from os.*stat* family.
259class StatAttributeTests(unittest.TestCase):
260 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200261 self.fname = support.TESTFN
262 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100263 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000267 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268
269 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000270 self.assertEqual(result[stat.ST_SIZE], 3)
271 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 # Make sure all the attributes are there
274 members = dir(result)
275 for name in dir(stat):
276 if name[:3] == 'ST_':
277 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000278 if name.endswith("TIME"):
279 def trunc(x): return int(x)
280 else:
281 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000282 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285
Larry Hastings6fe20b32012-04-19 15:07:49 -0700286 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700287 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700288 for name in 'st_atime st_mtime st_ctime'.split():
289 floaty = int(getattr(result, name) * 100000)
290 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700291 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700292
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 try:
294 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except IndexError:
297 pass
298
299 # Make sure that assignment fails
300 try:
301 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200302 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000303 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000304 pass
305
306 try:
307 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200308 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000309 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000310 pass
311
312 try:
313 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200314 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315 except AttributeError:
316 pass
317
318 # Use the stat_result constructor with a too-short tuple.
319 try:
320 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200321 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322 except TypeError:
323 pass
324
Ezio Melotti42da6632011-03-15 05:18:48 +0200325 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000326 try:
327 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
328 except TypeError:
329 pass
330
Antoine Pitrou38425292010-09-21 18:19:07 +0000331 def test_stat_attributes(self):
332 self.check_stat_attributes(self.fname)
333
334 def test_stat_attributes_bytes(self):
335 try:
336 fname = self.fname.encode(sys.getfilesystemencoding())
337 except UnicodeEncodeError:
338 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700339 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000340
Christian Heimes25827622013-10-12 01:27:08 +0200341 def test_stat_result_pickle(self):
342 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200343 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
344 p = pickle.dumps(result, proto)
345 self.assertIn(b'stat_result', p)
346 if proto < 4:
347 self.assertIn(b'cos\nstat_result\n', p)
348 unpickled = pickle.loads(p)
349 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200350
Serhiy Storchaka43767632013-11-03 21:31:38 +0200351 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000352 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000353 try:
354 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000355 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000357 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200358 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359
360 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000361 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000363 # Make sure all the attributes are there.
364 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
365 'ffree', 'favail', 'flag', 'namemax')
366 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000367 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000368
369 # Make sure that assignment really fails
370 try:
371 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200372 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000373 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374 pass
375
376 try:
377 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200378 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000379 except AttributeError:
380 pass
381
382 # Use the constructor with a too-short tuple.
383 try:
384 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200385 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386 except TypeError:
387 pass
388
Ezio Melotti42da6632011-03-15 05:18:48 +0200389 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390 try:
391 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
392 except TypeError:
393 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000394
Christian Heimes25827622013-10-12 01:27:08 +0200395 @unittest.skipUnless(hasattr(os, 'statvfs'),
396 "need os.statvfs()")
397 def test_statvfs_result_pickle(self):
398 try:
399 result = os.statvfs(self.fname)
400 except OSError as e:
401 # On AtheOS, glibc always returns ENOSYS
402 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200403 self.skipTest('os.statvfs() failed with ENOSYS')
404
Serhiy Storchakabad12572014-12-15 14:03:42 +0200405 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
406 p = pickle.dumps(result, proto)
407 self.assertIn(b'statvfs_result', p)
408 if proto < 4:
409 self.assertIn(b'cos\nstatvfs_result\n', p)
410 unpickled = pickle.loads(p)
411 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200412
Serhiy Storchaka43767632013-11-03 21:31:38 +0200413 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
414 def test_1686475(self):
415 # Verify that an open file can be stat'ed
416 try:
417 os.stat(r"c:\pagefile.sys")
418 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600419 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200420 except OSError as e:
421 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000422
Serhiy Storchaka43767632013-11-03 21:31:38 +0200423 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
424 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
425 def test_15261(self):
426 # Verify that stat'ing a closed fd does not cause crash
427 r, w = os.pipe()
428 try:
429 os.stat(r) # should not raise error
430 finally:
431 os.close(r)
432 os.close(w)
433 with self.assertRaises(OSError) as ctx:
434 os.stat(r)
435 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100436
Zachary Ware63f277b2014-06-19 09:46:37 -0500437 def check_file_attributes(self, result):
438 self.assertTrue(hasattr(result, 'st_file_attributes'))
439 self.assertTrue(isinstance(result.st_file_attributes, int))
440 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
441
442 @unittest.skipUnless(sys.platform == "win32",
443 "st_file_attributes is Win32 specific")
444 def test_file_attributes(self):
445 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
446 result = os.stat(self.fname)
447 self.check_file_attributes(result)
448 self.assertEqual(
449 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
450 0)
451
452 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200453 dirname = support.TESTFN + "dir"
454 os.mkdir(dirname)
455 self.addCleanup(os.rmdir, dirname)
456
457 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500458 self.check_file_attributes(result)
459 self.assertEqual(
460 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
461 stat.FILE_ATTRIBUTE_DIRECTORY)
462
Berker Peksag0b4dc482016-09-17 15:49:59 +0300463 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
464 def test_access_denied(self):
465 # Default to FindFirstFile WIN32_FIND_DATA when access is
466 # denied. See issue 28075.
467 # os.environ['TEMP'] should be located on a volume that
468 # supports file ACLs.
469 fname = os.path.join(os.environ['TEMP'], self.fname)
470 self.addCleanup(support.unlink, fname)
471 create_file(fname, b'ABC')
472 # Deny the right to [S]YNCHRONIZE on the file to
473 # force CreateFile to fail with ERROR_ACCESS_DENIED.
474 DETACHED_PROCESS = 8
475 subprocess.check_call(
Denis Osipovca1b66f2017-06-08 17:02:05 +0500476 # bpo-30584: Use security identifier *S-1-5-32-545 instead
477 # of localized "Users" to not depend on the locale.
478 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300479 creationflags=DETACHED_PROCESS
480 )
481 result = os.stat(fname)
482 self.assertNotEqual(result.st_size, 0)
483
Victor Stinner47aacc82015-06-12 17:26:23 +0200484
485class UtimeTests(unittest.TestCase):
486 def setUp(self):
487 self.dirname = support.TESTFN
488 self.fname = os.path.join(self.dirname, "f1")
489
490 self.addCleanup(support.rmtree, self.dirname)
491 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100492 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200493
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200494 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100495 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200496 os.stat_float_times(state)
497
Victor Stinner47aacc82015-06-12 17:26:23 +0200498 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100499 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200500 old_float_times = os.stat_float_times(-1)
501 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200502
503 os.stat_float_times(True)
504
505 def support_subsecond(self, filename):
506 # Heuristic to check if the filesystem supports timestamp with
507 # subsecond resolution: check if float and int timestamps are different
508 st = os.stat(filename)
509 return ((st.st_atime != st[7])
510 or (st.st_mtime != st[8])
511 or (st.st_ctime != st[9]))
512
513 def _test_utime(self, set_time, filename=None):
514 if not filename:
515 filename = self.fname
516
517 support_subsecond = self.support_subsecond(filename)
518 if support_subsecond:
519 # Timestamp with a resolution of 1 microsecond (10^-6).
520 #
521 # The resolution of the C internal function used by os.utime()
522 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
523 # test with a resolution of 1 ns requires more work:
524 # see the issue #15745.
525 atime_ns = 1002003000 # 1.002003 seconds
526 mtime_ns = 4005006000 # 4.005006 seconds
527 else:
528 # use a resolution of 1 second
529 atime_ns = 5 * 10**9
530 mtime_ns = 8 * 10**9
531
532 set_time(filename, (atime_ns, mtime_ns))
533 st = os.stat(filename)
534
535 if support_subsecond:
536 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
537 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
538 else:
539 self.assertEqual(st.st_atime, atime_ns * 1e-9)
540 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
541 self.assertEqual(st.st_atime_ns, atime_ns)
542 self.assertEqual(st.st_mtime_ns, mtime_ns)
543
544 def test_utime(self):
545 def set_time(filename, ns):
546 # test the ns keyword parameter
547 os.utime(filename, ns=ns)
548 self._test_utime(set_time)
549
550 @staticmethod
551 def ns_to_sec(ns):
552 # Convert a number of nanosecond (int) to a number of seconds (float).
553 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
554 # issue, os.utime() rounds towards minus infinity.
555 return (ns * 1e-9) + 0.5e-9
556
557 def test_utime_by_indexed(self):
558 # pass times as floating point seconds as the second indexed parameter
559 def set_time(filename, ns):
560 atime_ns, mtime_ns = ns
561 atime = self.ns_to_sec(atime_ns)
562 mtime = self.ns_to_sec(mtime_ns)
563 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
564 # or utime(time_t)
565 os.utime(filename, (atime, mtime))
566 self._test_utime(set_time)
567
568 def test_utime_by_times(self):
569 def set_time(filename, ns):
570 atime_ns, mtime_ns = ns
571 atime = self.ns_to_sec(atime_ns)
572 mtime = self.ns_to_sec(mtime_ns)
573 # test the times keyword parameter
574 os.utime(filename, times=(atime, mtime))
575 self._test_utime(set_time)
576
577 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
578 "follow_symlinks support for utime required "
579 "for this test.")
580 def test_utime_nofollow_symlinks(self):
581 def set_time(filename, ns):
582 # use follow_symlinks=False to test utimensat(timespec)
583 # or lutimes(timeval)
584 os.utime(filename, ns=ns, follow_symlinks=False)
585 self._test_utime(set_time)
586
587 @unittest.skipUnless(os.utime in os.supports_fd,
588 "fd support for utime required for this test.")
589 def test_utime_fd(self):
590 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100591 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200592 # use a file descriptor to test futimens(timespec)
593 # or futimes(timeval)
594 os.utime(fp.fileno(), ns=ns)
595 self._test_utime(set_time)
596
597 @unittest.skipUnless(os.utime in os.supports_dir_fd,
598 "dir_fd support for utime required for this test.")
599 def test_utime_dir_fd(self):
600 def set_time(filename, ns):
601 dirname, name = os.path.split(filename)
602 dirfd = os.open(dirname, os.O_RDONLY)
603 try:
604 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
605 os.utime(name, dir_fd=dirfd, ns=ns)
606 finally:
607 os.close(dirfd)
608 self._test_utime(set_time)
609
610 def test_utime_directory(self):
611 def set_time(filename, ns):
612 # test calling os.utime() on a directory
613 os.utime(filename, ns=ns)
614 self._test_utime(set_time, filename=self.dirname)
615
616 def _test_utime_current(self, set_time):
617 # Get the system clock
618 current = time.time()
619
620 # Call os.utime() to set the timestamp to the current system clock
621 set_time(self.fname)
622
623 if not self.support_subsecond(self.fname):
624 delta = 1.0
625 else:
626 # On Windows, the usual resolution of time.time() is 15.6 ms
627 delta = 0.020
628 st = os.stat(self.fname)
629 msg = ("st_time=%r, current=%r, dt=%r"
630 % (st.st_mtime, current, st.st_mtime - current))
631 self.assertAlmostEqual(st.st_mtime, current,
632 delta=delta, msg=msg)
633
634 def test_utime_current(self):
635 def set_time(filename):
636 # Set to the current time in the new way
637 os.utime(self.fname)
638 self._test_utime_current(set_time)
639
640 def test_utime_current_old(self):
641 def set_time(filename):
642 # Set to the current time in the old explicit way.
643 os.utime(self.fname, None)
644 self._test_utime_current(set_time)
645
646 def get_file_system(self, path):
647 if sys.platform == 'win32':
648 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
649 import ctypes
650 kernel32 = ctypes.windll.kernel32
651 buf = ctypes.create_unicode_buffer("", 100)
652 ok = kernel32.GetVolumeInformationW(root, None, 0,
653 None, None, None,
654 buf, len(buf))
655 if ok:
656 return buf.value
657 # return None if the filesystem is unknown
658
659 def test_large_time(self):
660 # Many filesystems are limited to the year 2038. At least, the test
661 # pass with NTFS filesystem.
662 if self.get_file_system(self.dirname) != "NTFS":
663 self.skipTest("requires NTFS")
664
665 large = 5000000000 # some day in 2128
666 os.utime(self.fname, (large, large))
667 self.assertEqual(os.stat(self.fname).st_mtime, large)
668
669 def test_utime_invalid_arguments(self):
670 # seconds and nanoseconds parameters are mutually exclusive
671 with self.assertRaises(ValueError):
672 os.utime(self.fname, (5, 5), ns=(5, 5))
673
674
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000675from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000676
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000677class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000679 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000680
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000681 def setUp(self):
682 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000683 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000684 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000685 for key, value in self._reference().items():
686 os.environ[key] = value
687
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000688 def tearDown(self):
689 os.environ.clear()
690 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000691 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000692 os.environb.clear()
693 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000694
Christian Heimes90333392007-11-01 19:08:42 +0000695 def _reference(self):
696 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
697
698 def _empty_mapping(self):
699 os.environ.clear()
700 return os.environ
701
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000702 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200703 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
704 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000705 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000706 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300707 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200708 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300709 value = popen.read().strip()
710 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000711
Xavier de Gayed1415312016-07-22 12:15:29 +0200712 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
713 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000714 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200715 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
716 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300717 it = iter(popen)
718 self.assertEqual(next(it), "line1\n")
719 self.assertEqual(next(it), "line2\n")
720 self.assertEqual(next(it), "line3\n")
721 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000722
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000723 # Verify environ keys and values from the OS are of the
724 # correct str type.
725 def test_keyvalue_types(self):
726 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000727 self.assertEqual(type(key), str)
728 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000729
Christian Heimes90333392007-11-01 19:08:42 +0000730 def test_items(self):
731 for key, value in self._reference().items():
732 self.assertEqual(os.environ.get(key), value)
733
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000734 # Issue 7310
735 def test___repr__(self):
736 """Check that the repr() of os.environ looks like environ({...})."""
737 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000738 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
739 '{!r}: {!r}'.format(key, value)
740 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000741
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000742 def test_get_exec_path(self):
743 defpath_list = os.defpath.split(os.pathsep)
744 test_path = ['/monty', '/python', '', '/flying/circus']
745 test_env = {'PATH': os.pathsep.join(test_path)}
746
747 saved_environ = os.environ
748 try:
749 os.environ = dict(test_env)
750 # Test that defaulting to os.environ works.
751 self.assertSequenceEqual(test_path, os.get_exec_path())
752 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
753 finally:
754 os.environ = saved_environ
755
756 # No PATH environment variable
757 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
758 # Empty PATH environment variable
759 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
760 # Supplied PATH environment variable
761 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
762
Victor Stinnerb745a742010-05-18 17:17:23 +0000763 if os.supports_bytes_environ:
764 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000765 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000766 # ignore BytesWarning warning
767 with warnings.catch_warnings(record=True):
768 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000769 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000770 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000771 pass
772 else:
773 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000774
775 # bytes key and/or value
776 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
777 ['abc'])
778 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
779 ['abc'])
780 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
781 ['abc'])
782
783 @unittest.skipUnless(os.supports_bytes_environ,
784 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000785 def test_environb(self):
786 # os.environ -> os.environb
787 value = 'euro\u20ac'
788 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000789 value_bytes = value.encode(sys.getfilesystemencoding(),
790 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000791 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000792 msg = "U+20AC character is not encodable to %s" % (
793 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000794 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000795 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000796 self.assertEqual(os.environ['unicode'], value)
797 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000798
799 # os.environb -> os.environ
800 value = b'\xff'
801 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000802 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000803 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000805
Charles-François Natali2966f102011-11-26 11:32:46 +0100806 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
807 # #13415).
808 @support.requires_freebsd_version(7)
809 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100810 def test_unset_error(self):
811 if sys.platform == "win32":
812 # an environment variable is limited to 32,767 characters
813 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100814 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100815 else:
816 # "=" is not allowed in a variable name
817 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100818 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100819
Victor Stinner6d101392013-04-14 16:35:04 +0200820 def test_key_type(self):
821 missing = 'missingkey'
822 self.assertNotIn(missing, os.environ)
823
Victor Stinner839e5ea2013-04-14 16:43:03 +0200824 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200825 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200827 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200828
Victor Stinner839e5ea2013-04-14 16:43:03 +0200829 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200830 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200831 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200832 self.assertTrue(cm.exception.__suppress_context__)
833
Victor Stinner6d101392013-04-14 16:35:04 +0200834
Tim Petersc4e09402003-04-25 07:11:48 +0000835class WalkTests(unittest.TestCase):
836 """Tests for os.walk()."""
837
Victor Stinner0561c532015-03-12 10:28:24 +0100838 # Wrapper to hide minor differences between os.walk and os.fwalk
839 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200840 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200841 if 'follow_symlinks' in kwargs:
842 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200843 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100844
Charles-François Natali7372b062012-02-05 15:15:38 +0100845 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100846 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100847 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000848
849 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000850 # TESTFN/
851 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000852 # tmp1
853 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000854 # tmp2
855 # SUB11/ no kids
856 # SUB2/ a file kid and a dirsymlink kid
857 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300858 # SUB21/ not readable
859 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000860 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200861 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300862 # broken_link2
863 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000864 # TEST2/
865 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100866 self.walk_path = join(support.TESTFN, "TEST1")
867 self.sub1_path = join(self.walk_path, "SUB1")
868 self.sub11_path = join(self.sub1_path, "SUB11")
869 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300870 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100871 tmp1_path = join(self.walk_path, "tmp1")
872 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000873 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300874 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100875 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000876 t2_path = join(support.TESTFN, "TEST2")
877 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200878 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300879 broken_link2_path = join(sub2_path, "broken_link2")
880 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000881
882 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100883 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000884 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300885 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000886 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100887
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300888 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100889 with open(path, "x") as f:
890 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000891
Victor Stinner0561c532015-03-12 10:28:24 +0100892 if support.can_symlink():
893 os.symlink(os.path.abspath(t2_path), self.link_path)
894 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300895 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
896 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300897 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300898 ["broken_link", "broken_link2", "broken_link3",
899 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100900 else:
901 self.sub2_tree = (sub2_path, [], ["tmp3"])
902
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300903 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300904 try:
905 os.listdir(sub21_path)
906 except PermissionError:
907 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
908 else:
909 os.chmod(sub21_path, stat.S_IRWXU)
910 os.unlink(tmp5_path)
911 os.rmdir(sub21_path)
912 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300913
Victor Stinner0561c532015-03-12 10:28:24 +0100914 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000915 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300916 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100917
Tim Petersc4e09402003-04-25 07:11:48 +0000918 self.assertEqual(len(all), 4)
919 # We can't know which order SUB1 and SUB2 will appear in.
920 # Not flipped: TESTFN, SUB1, SUB11, SUB2
921 # flipped: TESTFN, SUB2, SUB1, SUB11
922 flipped = all[0][1][0] != "SUB1"
923 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200924 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300925 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100926 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
927 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
928 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
929 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000930
Brett Cannon3f9183b2016-08-26 14:44:48 -0700931 def test_walk_prune(self, walk_path=None):
932 if walk_path is None:
933 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000934 # Prune the search.
935 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700936 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000937 all.append((root, dirs, files))
938 # Don't descend into SUB1.
939 if 'SUB1' in dirs:
940 # Note that this also mutates the dirs we appended to all!
941 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000942
Victor Stinner0561c532015-03-12 10:28:24 +0100943 self.assertEqual(len(all), 2)
944 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700945 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100946
947 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300948 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100949 self.assertEqual(all[1], self.sub2_tree)
950
Brett Cannon3f9183b2016-08-26 14:44:48 -0700951 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700952 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700953
Victor Stinner0561c532015-03-12 10:28:24 +0100954 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000955 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100956 all = list(self.walk(self.walk_path, topdown=False))
957
Victor Stinner53b0a412016-03-26 01:12:36 +0100958 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000959 # We can't know which order SUB1 and SUB2 will appear in.
960 # Not flipped: SUB11, SUB1, SUB2, TESTFN
961 # flipped: SUB2, SUB11, SUB1, TESTFN
962 flipped = all[3][1][0] != "SUB1"
963 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200964 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300965 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100966 self.assertEqual(all[3],
967 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
968 self.assertEqual(all[flipped],
969 (self.sub11_path, [], []))
970 self.assertEqual(all[flipped + 1],
971 (self.sub1_path, ["SUB11"], ["tmp2"]))
972 self.assertEqual(all[2 - 2 * flipped],
973 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000974
Victor Stinner0561c532015-03-12 10:28:24 +0100975 def test_walk_symlink(self):
976 if not support.can_symlink():
977 self.skipTest("need symlink support")
978
979 # Walk, following symlinks.
980 walk_it = self.walk(self.walk_path, follow_symlinks=True)
981 for root, dirs, files in walk_it:
982 if root == self.link_path:
983 self.assertEqual(dirs, [])
984 self.assertEqual(files, ["tmp4"])
985 break
986 else:
987 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000988
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200989 def test_walk_bad_dir(self):
990 # Walk top-down.
991 errors = []
992 walk_it = self.walk(self.walk_path, onerror=errors.append)
993 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300994 self.assertEqual(errors, [])
995 dir1 = 'SUB1'
996 path1 = os.path.join(root, dir1)
997 path1new = os.path.join(root, dir1 + '.new')
998 os.rename(path1, path1new)
999 try:
1000 roots = [r for r, d, f in walk_it]
1001 self.assertTrue(errors)
1002 self.assertNotIn(path1, roots)
1003 self.assertNotIn(path1new, roots)
1004 for dir2 in dirs:
1005 if dir2 != dir1:
1006 self.assertIn(os.path.join(root, dir2), roots)
1007 finally:
1008 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001009
Charles-François Natali7372b062012-02-05 15:15:38 +01001010
1011@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1012class FwalkTests(WalkTests):
1013 """Tests for os.fwalk()."""
1014
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001015 def walk(self, top, **kwargs):
1016 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001017 yield (root, dirs, files)
1018
Larry Hastingsc48fe982012-06-25 04:49:05 -07001019 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1020 """
1021 compare with walk() results.
1022 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001023 walk_kwargs = walk_kwargs.copy()
1024 fwalk_kwargs = fwalk_kwargs.copy()
1025 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1026 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1027 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001028
Charles-François Natali7372b062012-02-05 15:15:38 +01001029 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001030 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001031 expected[root] = (set(dirs), set(files))
1032
Larry Hastingsc48fe982012-06-25 04:49:05 -07001033 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001034 self.assertIn(root, expected)
1035 self.assertEqual(expected[root], (set(dirs), set(files)))
1036
Larry Hastingsc48fe982012-06-25 04:49:05 -07001037 def test_compare_to_walk(self):
1038 kwargs = {'top': support.TESTFN}
1039 self._compare_to_walk(kwargs, kwargs)
1040
Charles-François Natali7372b062012-02-05 15:15:38 +01001041 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001042 try:
1043 fd = os.open(".", os.O_RDONLY)
1044 walk_kwargs = {'top': support.TESTFN}
1045 fwalk_kwargs = walk_kwargs.copy()
1046 fwalk_kwargs['dir_fd'] = fd
1047 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1048 finally:
1049 os.close(fd)
1050
1051 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001052 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001053 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1054 args = support.TESTFN, topdown, None
1055 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 # check that the FD is valid
1057 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001058 # redundant check
1059 os.stat(rootfd)
1060 # check that listdir() returns consistent information
1061 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001062
1063 def test_fd_leak(self):
1064 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1065 # we both check that calling fwalk() a large number of times doesn't
1066 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1067 minfd = os.dup(1)
1068 os.close(minfd)
1069 for i in range(256):
1070 for x in os.fwalk(support.TESTFN):
1071 pass
1072 newfd = os.dup(1)
1073 self.addCleanup(os.close, newfd)
1074 self.assertEqual(newfd, minfd)
1075
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001076class BytesWalkTests(WalkTests):
1077 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001078 def setUp(self):
1079 super().setUp()
1080 self.stack = contextlib.ExitStack()
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001081
1082 def tearDown(self):
1083 self.stack.close()
1084 super().tearDown()
1085
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001086 def walk(self, top, **kwargs):
1087 if 'follow_symlinks' in kwargs:
1088 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1089 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1090 root = os.fsdecode(broot)
1091 dirs = list(map(os.fsdecode, bdirs))
1092 files = list(map(os.fsdecode, bfiles))
1093 yield (root, dirs, files)
1094 bdirs[:] = list(map(os.fsencode, dirs))
1095 bfiles[:] = list(map(os.fsencode, files))
1096
Charles-François Natali7372b062012-02-05 15:15:38 +01001097
Guido van Rossume7ba4952007-06-06 23:52:48 +00001098class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001099 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001100 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001101
1102 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001103 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001104 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1105 os.makedirs(path) # Should work
1106 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1107 os.makedirs(path)
1108
1109 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001110 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001111 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1112 os.makedirs(path)
1113 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1114 'dir5', 'dir6')
1115 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001116
Terry Reedy5a22b652010-12-02 07:05:56 +00001117 def test_exist_ok_existing_directory(self):
1118 path = os.path.join(support.TESTFN, 'dir1')
1119 mode = 0o777
1120 old_mask = os.umask(0o022)
1121 os.makedirs(path, mode)
1122 self.assertRaises(OSError, os.makedirs, path, mode)
1123 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001124 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001125 os.makedirs(path, mode=mode, exist_ok=True)
1126 os.umask(old_mask)
1127
Martin Pantera82642f2015-11-19 04:48:44 +00001128 # Issue #25583: A drive root could raise PermissionError on Windows
1129 os.makedirs(os.path.abspath('/'), exist_ok=True)
1130
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001131 def test_exist_ok_s_isgid_directory(self):
1132 path = os.path.join(support.TESTFN, 'dir1')
1133 S_ISGID = stat.S_ISGID
1134 mode = 0o777
1135 old_mask = os.umask(0o022)
1136 try:
1137 existing_testfn_mode = stat.S_IMODE(
1138 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001139 try:
1140 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001141 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001142 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001143 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1144 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1145 # The os should apply S_ISGID from the parent dir for us, but
1146 # this test need not depend on that behavior. Be explicit.
1147 os.makedirs(path, mode | S_ISGID)
1148 # http://bugs.python.org/issue14992
1149 # Should not fail when the bit is already set.
1150 os.makedirs(path, mode, exist_ok=True)
1151 # remove the bit.
1152 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001153 # May work even when the bit is not already set when demanded.
1154 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001155 finally:
1156 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001157
1158 def test_exist_ok_existing_regular_file(self):
1159 base = support.TESTFN
1160 path = os.path.join(support.TESTFN, 'dir1')
1161 f = open(path, 'w')
1162 f.write('abc')
1163 f.close()
1164 self.assertRaises(OSError, os.makedirs, path)
1165 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1166 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1167 os.remove(path)
1168
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001169 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001170 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001171 'dir4', 'dir5', 'dir6')
1172 # If the tests failed, the bottom-most directory ('../dir6')
1173 # may not have been created, so we look for the outermost directory
1174 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001175 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001176 path = os.path.dirname(path)
1177
1178 os.removedirs(path)
1179
Andrew Svetlov405faed2012-12-25 12:18:09 +02001180
R David Murrayf2ad1732014-12-25 18:36:56 -05001181@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1182class ChownFileTests(unittest.TestCase):
1183
Berker Peksag036a71b2015-07-21 09:29:48 +03001184 @classmethod
1185 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001186 os.mkdir(support.TESTFN)
1187
1188 def test_chown_uid_gid_arguments_must_be_index(self):
1189 stat = os.stat(support.TESTFN)
1190 uid = stat.st_uid
1191 gid = stat.st_gid
1192 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1193 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1194 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1195 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1196 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1197
1198 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1199 def test_chown(self):
1200 gid_1, gid_2 = groups[:2]
1201 uid = os.stat(support.TESTFN).st_uid
1202 os.chown(support.TESTFN, uid, gid_1)
1203 gid = os.stat(support.TESTFN).st_gid
1204 self.assertEqual(gid, gid_1)
1205 os.chown(support.TESTFN, uid, gid_2)
1206 gid = os.stat(support.TESTFN).st_gid
1207 self.assertEqual(gid, gid_2)
1208
1209 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1210 "test needs root privilege and more than one user")
1211 def test_chown_with_root(self):
1212 uid_1, uid_2 = all_users[:2]
1213 gid = os.stat(support.TESTFN).st_gid
1214 os.chown(support.TESTFN, uid_1, gid)
1215 uid = os.stat(support.TESTFN).st_uid
1216 self.assertEqual(uid, uid_1)
1217 os.chown(support.TESTFN, uid_2, gid)
1218 uid = os.stat(support.TESTFN).st_uid
1219 self.assertEqual(uid, uid_2)
1220
1221 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1222 "test needs non-root account and more than one user")
1223 def test_chown_without_permission(self):
1224 uid_1, uid_2 = all_users[:2]
1225 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001226 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001227 os.chown(support.TESTFN, uid_1, gid)
1228 os.chown(support.TESTFN, uid_2, gid)
1229
Berker Peksag036a71b2015-07-21 09:29:48 +03001230 @classmethod
1231 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001232 os.rmdir(support.TESTFN)
1233
1234
Andrew Svetlov405faed2012-12-25 12:18:09 +02001235class RemoveDirsTests(unittest.TestCase):
1236 def setUp(self):
1237 os.makedirs(support.TESTFN)
1238
1239 def tearDown(self):
1240 support.rmtree(support.TESTFN)
1241
1242 def test_remove_all(self):
1243 dira = os.path.join(support.TESTFN, 'dira')
1244 os.mkdir(dira)
1245 dirb = os.path.join(dira, 'dirb')
1246 os.mkdir(dirb)
1247 os.removedirs(dirb)
1248 self.assertFalse(os.path.exists(dirb))
1249 self.assertFalse(os.path.exists(dira))
1250 self.assertFalse(os.path.exists(support.TESTFN))
1251
1252 def test_remove_partial(self):
1253 dira = os.path.join(support.TESTFN, 'dira')
1254 os.mkdir(dira)
1255 dirb = os.path.join(dira, 'dirb')
1256 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001257 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001258 os.removedirs(dirb)
1259 self.assertFalse(os.path.exists(dirb))
1260 self.assertTrue(os.path.exists(dira))
1261 self.assertTrue(os.path.exists(support.TESTFN))
1262
1263 def test_remove_nothing(self):
1264 dira = os.path.join(support.TESTFN, 'dira')
1265 os.mkdir(dira)
1266 dirb = os.path.join(dira, 'dirb')
1267 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001268 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001269 with self.assertRaises(OSError):
1270 os.removedirs(dirb)
1271 self.assertTrue(os.path.exists(dirb))
1272 self.assertTrue(os.path.exists(dira))
1273 self.assertTrue(os.path.exists(support.TESTFN))
1274
1275
Guido van Rossume7ba4952007-06-06 23:52:48 +00001276class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001277 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001278 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001279 f.write(b'hello')
1280 f.close()
1281 with open(os.devnull, 'rb') as f:
1282 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001283
Andrew Svetlov405faed2012-12-25 12:18:09 +02001284
Guido van Rossume7ba4952007-06-06 23:52:48 +00001285class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001286 def test_urandom_length(self):
1287 self.assertEqual(len(os.urandom(0)), 0)
1288 self.assertEqual(len(os.urandom(1)), 1)
1289 self.assertEqual(len(os.urandom(10)), 10)
1290 self.assertEqual(len(os.urandom(100)), 100)
1291 self.assertEqual(len(os.urandom(1000)), 1000)
1292
1293 def test_urandom_value(self):
1294 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001295 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001296 data2 = os.urandom(16)
1297 self.assertNotEqual(data1, data2)
1298
1299 def get_urandom_subprocess(self, count):
1300 code = '\n'.join((
1301 'import os, sys',
1302 'data = os.urandom(%s)' % count,
1303 'sys.stdout.buffer.write(data)',
1304 'sys.stdout.buffer.flush()'))
1305 out = assert_python_ok('-c', code)
1306 stdout = out[1]
1307 self.assertEqual(len(stdout), 16)
1308 return stdout
1309
1310 def test_urandom_subprocess(self):
1311 data1 = self.get_urandom_subprocess(16)
1312 data2 = self.get_urandom_subprocess(16)
1313 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001314
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001315
Victor Stinner9b1f4742016-09-06 16:18:52 -07001316@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1317class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001318 @classmethod
1319 def setUpClass(cls):
1320 try:
1321 os.getrandom(1)
1322 except OSError as exc:
1323 if exc.errno == errno.ENOSYS:
1324 # Python compiled on a more recent Linux version
1325 # than the current Linux kernel
1326 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1327 else:
1328 raise
1329
Victor Stinner9b1f4742016-09-06 16:18:52 -07001330 def test_getrandom_type(self):
1331 data = os.getrandom(16)
1332 self.assertIsInstance(data, bytes)
1333 self.assertEqual(len(data), 16)
1334
1335 def test_getrandom0(self):
1336 empty = os.getrandom(0)
1337 self.assertEqual(empty, b'')
1338
1339 def test_getrandom_random(self):
1340 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1341
1342 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1343 # resource /dev/random
1344
1345 def test_getrandom_nonblock(self):
1346 # The call must not fail. Check also that the flag exists
1347 try:
1348 os.getrandom(1, os.GRND_NONBLOCK)
1349 except BlockingIOError:
1350 # System urandom is not initialized yet
1351 pass
1352
1353 def test_getrandom_value(self):
1354 data1 = os.getrandom(16)
1355 data2 = os.getrandom(16)
1356 self.assertNotEqual(data1, data2)
1357
1358
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001359# os.urandom() doesn't use a file descriptor when it is implemented with the
1360# getentropy() function, the getrandom() function or the getrandom() syscall
1361OS_URANDOM_DONT_USE_FD = (
1362 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1363 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1364 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001365
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001366@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1367 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001368class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001369 @unittest.skipUnless(resource, "test requires the resource module")
1370 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001371 # Check urandom() failing when it is not able to open /dev/random.
1372 # We spawn a new process to make the test more robust (if getrlimit()
1373 # failed to restore the file descriptor limit after this, the whole
1374 # test suite would crash; this actually happened on the OS X Tiger
1375 # buildbot).
1376 code = """if 1:
1377 import errno
1378 import os
1379 import resource
1380
1381 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1382 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1383 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001384 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001385 except OSError as e:
1386 assert e.errno == errno.EMFILE, e.errno
1387 else:
1388 raise AssertionError("OSError not raised")
1389 """
1390 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001391
Antoine Pitroue472aea2014-04-26 14:33:03 +02001392 def test_urandom_fd_closed(self):
1393 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1394 # closed.
1395 code = """if 1:
1396 import os
1397 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001398 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001399 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001400 with test.support.SuppressCrashReport():
1401 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001402 sys.stdout.buffer.write(os.urandom(4))
1403 """
1404 rc, out, err = assert_python_ok('-Sc', code)
1405
1406 def test_urandom_fd_reopened(self):
1407 # Issue #21207: urandom() should detect its fd to /dev/urandom
1408 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001409 self.addCleanup(support.unlink, support.TESTFN)
1410 create_file(support.TESTFN, b"x" * 256)
1411
Antoine Pitroue472aea2014-04-26 14:33:03 +02001412 code = """if 1:
1413 import os
1414 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001415 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001416 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001417 with test.support.SuppressCrashReport():
1418 for fd in range(3, 256):
1419 try:
1420 os.close(fd)
1421 except OSError:
1422 pass
1423 else:
1424 # Found the urandom fd (XXX hopefully)
1425 break
1426 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001427 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001428 new_fd = f.fileno()
1429 # Issue #26935: posix allows new_fd and fd to be equal but
1430 # some libc implementations have dup2 return an error in this
1431 # case.
1432 if new_fd != fd:
1433 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001434 sys.stdout.buffer.write(os.urandom(4))
1435 sys.stdout.buffer.write(os.urandom(4))
1436 """.format(TESTFN=support.TESTFN)
1437 rc, out, err = assert_python_ok('-Sc', code)
1438 self.assertEqual(len(out), 8)
1439 self.assertNotEqual(out[0:4], out[4:8])
1440 rc, out2, err2 = assert_python_ok('-Sc', code)
1441 self.assertEqual(len(out2), 8)
1442 self.assertNotEqual(out2, out)
1443
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001444
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001445@contextlib.contextmanager
1446def _execvpe_mockup(defpath=None):
1447 """
1448 Stubs out execv and execve functions when used as context manager.
1449 Records exec calls. The mock execv and execve functions always raise an
1450 exception as they would normally never return.
1451 """
1452 # A list of tuples containing (function name, first arg, args)
1453 # of calls to execv or execve that have been made.
1454 calls = []
1455
1456 def mock_execv(name, *args):
1457 calls.append(('execv', name, args))
1458 raise RuntimeError("execv called")
1459
1460 def mock_execve(name, *args):
1461 calls.append(('execve', name, args))
1462 raise OSError(errno.ENOTDIR, "execve called")
1463
1464 try:
1465 orig_execv = os.execv
1466 orig_execve = os.execve
1467 orig_defpath = os.defpath
1468 os.execv = mock_execv
1469 os.execve = mock_execve
1470 if defpath is not None:
1471 os.defpath = defpath
1472 yield calls
1473 finally:
1474 os.execv = orig_execv
1475 os.execve = orig_execve
1476 os.defpath = orig_defpath
1477
Victor Stinner4659ccf2016-09-14 10:57:00 +02001478
Guido van Rossume7ba4952007-06-06 23:52:48 +00001479class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001480 @unittest.skipIf(USING_LINUXTHREADS,
1481 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001482 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001483 self.assertRaises(OSError, os.execvpe, 'no such app-',
1484 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001485
Steve Dowerbce26262016-11-19 19:17:26 -08001486 def test_execv_with_bad_arglist(self):
1487 self.assertRaises(ValueError, os.execv, 'notepad', ())
1488 self.assertRaises(ValueError, os.execv, 'notepad', [])
1489 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1490 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1491
Thomas Heller6790d602007-08-30 17:15:14 +00001492 def test_execvpe_with_bad_arglist(self):
1493 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001494 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1495 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001496
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001497 @unittest.skipUnless(hasattr(os, '_execvpe'),
1498 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001499 def _test_internal_execvpe(self, test_type):
1500 program_path = os.sep + 'absolutepath'
1501 if test_type is bytes:
1502 program = b'executable'
1503 fullpath = os.path.join(os.fsencode(program_path), program)
1504 native_fullpath = fullpath
1505 arguments = [b'progname', 'arg1', 'arg2']
1506 else:
1507 program = 'executable'
1508 arguments = ['progname', 'arg1', 'arg2']
1509 fullpath = os.path.join(program_path, program)
1510 if os.name != "nt":
1511 native_fullpath = os.fsencode(fullpath)
1512 else:
1513 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001514 env = {'spam': 'beans'}
1515
Victor Stinnerb745a742010-05-18 17:17:23 +00001516 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001517 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001518 self.assertRaises(RuntimeError,
1519 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001520 self.assertEqual(len(calls), 1)
1521 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1522
Victor Stinnerb745a742010-05-18 17:17:23 +00001523 # test os._execvpe() with a relative path:
1524 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001525 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001526 self.assertRaises(OSError,
1527 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001528 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001529 self.assertSequenceEqual(calls[0],
1530 ('execve', native_fullpath, (arguments, env)))
1531
1532 # test os._execvpe() with a relative path:
1533 # os.get_exec_path() reads the 'PATH' variable
1534 with _execvpe_mockup() as calls:
1535 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001536 if test_type is bytes:
1537 env_path[b'PATH'] = program_path
1538 else:
1539 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001540 self.assertRaises(OSError,
1541 os._execvpe, program, arguments, env=env_path)
1542 self.assertEqual(len(calls), 1)
1543 self.assertSequenceEqual(calls[0],
1544 ('execve', native_fullpath, (arguments, env_path)))
1545
1546 def test_internal_execvpe_str(self):
1547 self._test_internal_execvpe(str)
1548 if os.name != "nt":
1549 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001550
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001551
Serhiy Storchaka43767632013-11-03 21:31:38 +02001552@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001553class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001554 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001555 try:
1556 os.stat(support.TESTFN)
1557 except FileNotFoundError:
1558 exists = False
1559 except OSError as exc:
1560 exists = True
1561 self.fail("file %s must not exist; os.stat failed with %s"
1562 % (support.TESTFN, exc))
1563 else:
1564 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001565
Thomas Wouters477c8d52006-05-27 19:21:47 +00001566 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001567 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001568
1569 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001570 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001571
1572 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001573 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001574
1575 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001576 self.addCleanup(support.unlink, support.TESTFN)
1577
Victor Stinnere77c9742016-03-25 10:28:23 +01001578 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001579 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001580
1581 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001582 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001583
Thomas Wouters477c8d52006-05-27 19:21:47 +00001584 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001585 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001586
Victor Stinnere77c9742016-03-25 10:28:23 +01001587
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001588class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001589 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001590 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1591 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001592 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001593 def get_single(f):
1594 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001595 if hasattr(os, f):
1596 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001597 return helper
1598 for f in singles:
1599 locals()["test_"+f] = get_single(f)
1600
Benjamin Peterson7522c742009-01-19 21:00:09 +00001601 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001602 try:
1603 f(support.make_bad_fd(), *args)
1604 except OSError as e:
1605 self.assertEqual(e.errno, errno.EBADF)
1606 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001607 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001608 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001609
Serhiy Storchaka43767632013-11-03 21:31:38 +02001610 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001611 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001612 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001613
Serhiy Storchaka43767632013-11-03 21:31:38 +02001614 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001615 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001616 fd = support.make_bad_fd()
1617 # Make sure none of the descriptors we are about to close are
1618 # currently valid (issue 6542).
1619 for i in range(10):
1620 try: os.fstat(fd+i)
1621 except OSError:
1622 pass
1623 else:
1624 break
1625 if i < 2:
1626 raise unittest.SkipTest(
1627 "Unable to acquire a range of invalid file descriptors")
1628 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001629
Serhiy Storchaka43767632013-11-03 21:31:38 +02001630 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001631 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001632 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001633
Serhiy Storchaka43767632013-11-03 21:31:38 +02001634 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001635 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001636 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001637
Serhiy Storchaka43767632013-11-03 21:31:38 +02001638 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001639 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001640 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001641
Serhiy Storchaka43767632013-11-03 21:31:38 +02001642 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001643 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001644 self.check(os.pathconf, "PC_NAME_MAX")
1645 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001646
Serhiy Storchaka43767632013-11-03 21:31:38 +02001647 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001648 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001649 self.check(os.truncate, 0)
1650 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001651
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001653 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001654 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001655
Serhiy Storchaka43767632013-11-03 21:31:38 +02001656 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001657 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001658 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001659
Victor Stinner57ddf782014-01-08 15:21:28 +01001660 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1661 def test_readv(self):
1662 buf = bytearray(10)
1663 self.check(os.readv, [buf])
1664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001667 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001668
Serhiy Storchaka43767632013-11-03 21:31:38 +02001669 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001670 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001671 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001672
Victor Stinner57ddf782014-01-08 15:21:28 +01001673 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1674 def test_writev(self):
1675 self.check(os.writev, [b'abc'])
1676
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001677 def test_inheritable(self):
1678 self.check(os.get_inheritable)
1679 self.check(os.set_inheritable, True)
1680
1681 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1682 'needs os.get_blocking() and os.set_blocking()')
1683 def test_blocking(self):
1684 self.check(os.get_blocking)
1685 self.check(os.set_blocking, True)
1686
Brian Curtin1b9df392010-11-24 20:24:31 +00001687
1688class LinkTests(unittest.TestCase):
1689 def setUp(self):
1690 self.file1 = support.TESTFN
1691 self.file2 = os.path.join(support.TESTFN + "2")
1692
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001693 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001694 for file in (self.file1, self.file2):
1695 if os.path.exists(file):
1696 os.unlink(file)
1697
Brian Curtin1b9df392010-11-24 20:24:31 +00001698 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001699 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001700
Steve Dowercc16be82016-09-08 10:35:16 -07001701 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001702 with open(file1, "r") as f1, open(file2, "r") as f2:
1703 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1704
1705 def test_link(self):
1706 self._test_link(self.file1, self.file2)
1707
1708 def test_link_bytes(self):
1709 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1710 bytes(self.file2, sys.getfilesystemencoding()))
1711
Brian Curtinf498b752010-11-30 15:54:04 +00001712 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001713 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001714 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001715 except UnicodeError:
1716 raise unittest.SkipTest("Unable to encode for this platform.")
1717
Brian Curtinf498b752010-11-30 15:54:04 +00001718 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001719 self.file2 = self.file1 + "2"
1720 self._test_link(self.file1, self.file2)
1721
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1723class PosixUidGidTests(unittest.TestCase):
1724 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1725 def test_setuid(self):
1726 if os.getuid() != 0:
1727 self.assertRaises(OSError, os.setuid, 0)
1728 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001729
Serhiy Storchaka43767632013-11-03 21:31:38 +02001730 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1731 def test_setgid(self):
1732 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1733 self.assertRaises(OSError, os.setgid, 0)
1734 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001735
Serhiy Storchaka43767632013-11-03 21:31:38 +02001736 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1737 def test_seteuid(self):
1738 if os.getuid() != 0:
1739 self.assertRaises(OSError, os.seteuid, 0)
1740 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001741
Serhiy Storchaka43767632013-11-03 21:31:38 +02001742 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1743 def test_setegid(self):
1744 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1745 self.assertRaises(OSError, os.setegid, 0)
1746 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001747
Serhiy Storchaka43767632013-11-03 21:31:38 +02001748 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1749 def test_setreuid(self):
1750 if os.getuid() != 0:
1751 self.assertRaises(OSError, os.setreuid, 0, 0)
1752 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1753 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001754
Serhiy Storchaka43767632013-11-03 21:31:38 +02001755 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1756 def test_setreuid_neg1(self):
1757 # Needs to accept -1. We run this in a subprocess to avoid
1758 # altering the test runner's process state (issue8045).
1759 subprocess.check_call([
1760 sys.executable, '-c',
1761 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001762
Serhiy Storchaka43767632013-11-03 21:31:38 +02001763 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1764 def test_setregid(self):
1765 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1766 self.assertRaises(OSError, os.setregid, 0, 0)
1767 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1768 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001769
Serhiy Storchaka43767632013-11-03 21:31:38 +02001770 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1771 def test_setregid_neg1(self):
1772 # Needs to accept -1. We run this in a subprocess to avoid
1773 # altering the test runner's process state (issue8045).
1774 subprocess.check_call([
1775 sys.executable, '-c',
1776 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001777
Serhiy Storchaka43767632013-11-03 21:31:38 +02001778@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1779class Pep383Tests(unittest.TestCase):
1780 def setUp(self):
1781 if support.TESTFN_UNENCODABLE:
1782 self.dir = support.TESTFN_UNENCODABLE
1783 elif support.TESTFN_NONASCII:
1784 self.dir = support.TESTFN_NONASCII
1785 else:
1786 self.dir = support.TESTFN
1787 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001788
Serhiy Storchaka43767632013-11-03 21:31:38 +02001789 bytesfn = []
1790 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001791 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001792 fn = os.fsencode(fn)
1793 except UnicodeEncodeError:
1794 return
1795 bytesfn.append(fn)
1796 add_filename(support.TESTFN_UNICODE)
1797 if support.TESTFN_UNENCODABLE:
1798 add_filename(support.TESTFN_UNENCODABLE)
1799 if support.TESTFN_NONASCII:
1800 add_filename(support.TESTFN_NONASCII)
1801 if not bytesfn:
1802 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001803
Serhiy Storchaka43767632013-11-03 21:31:38 +02001804 self.unicodefn = set()
1805 os.mkdir(self.dir)
1806 try:
1807 for fn in bytesfn:
1808 support.create_empty_file(os.path.join(self.bdir, fn))
1809 fn = os.fsdecode(fn)
1810 if fn in self.unicodefn:
1811 raise ValueError("duplicate filename")
1812 self.unicodefn.add(fn)
1813 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001814 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001815 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001816
Serhiy Storchaka43767632013-11-03 21:31:38 +02001817 def tearDown(self):
1818 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001819
Serhiy Storchaka43767632013-11-03 21:31:38 +02001820 def test_listdir(self):
1821 expected = self.unicodefn
1822 found = set(os.listdir(self.dir))
1823 self.assertEqual(found, expected)
1824 # test listdir without arguments
1825 current_directory = os.getcwd()
1826 try:
1827 os.chdir(os.sep)
1828 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1829 finally:
1830 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001831
Serhiy Storchaka43767632013-11-03 21:31:38 +02001832 def test_open(self):
1833 for fn in self.unicodefn:
1834 f = open(os.path.join(self.dir, fn), 'rb')
1835 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001836
Serhiy Storchaka43767632013-11-03 21:31:38 +02001837 @unittest.skipUnless(hasattr(os, 'statvfs'),
1838 "need os.statvfs()")
1839 def test_statvfs(self):
1840 # issue #9645
1841 for fn in self.unicodefn:
1842 # should not fail with file not found error
1843 fullname = os.path.join(self.dir, fn)
1844 os.statvfs(fullname)
1845
1846 def test_stat(self):
1847 for fn in self.unicodefn:
1848 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001849
Brian Curtineb24d742010-04-12 17:16:38 +00001850@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1851class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001852 def _kill(self, sig):
1853 # Start sys.executable as a subprocess and communicate from the
1854 # subprocess to the parent that the interpreter is ready. When it
1855 # becomes ready, send *sig* via os.kill to the subprocess and check
1856 # that the return code is equal to *sig*.
1857 import ctypes
1858 from ctypes import wintypes
1859 import msvcrt
1860
1861 # Since we can't access the contents of the process' stdout until the
1862 # process has exited, use PeekNamedPipe to see what's inside stdout
1863 # without waiting. This is done so we can tell that the interpreter
1864 # is started and running at a point where it could handle a signal.
1865 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1866 PeekNamedPipe.restype = wintypes.BOOL
1867 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1868 ctypes.POINTER(ctypes.c_char), # stdout buf
1869 wintypes.DWORD, # Buffer size
1870 ctypes.POINTER(wintypes.DWORD), # bytes read
1871 ctypes.POINTER(wintypes.DWORD), # bytes avail
1872 ctypes.POINTER(wintypes.DWORD)) # bytes left
1873 msg = "running"
1874 proc = subprocess.Popen([sys.executable, "-c",
1875 "import sys;"
1876 "sys.stdout.write('{}');"
1877 "sys.stdout.flush();"
1878 "input()".format(msg)],
1879 stdout=subprocess.PIPE,
1880 stderr=subprocess.PIPE,
1881 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001882 self.addCleanup(proc.stdout.close)
1883 self.addCleanup(proc.stderr.close)
1884 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001885
1886 count, max = 0, 100
1887 while count < max and proc.poll() is None:
1888 # Create a string buffer to store the result of stdout from the pipe
1889 buf = ctypes.create_string_buffer(len(msg))
1890 # Obtain the text currently in proc.stdout
1891 # Bytes read/avail/left are left as NULL and unused
1892 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1893 buf, ctypes.sizeof(buf), None, None, None)
1894 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1895 if buf.value:
1896 self.assertEqual(msg, buf.value.decode())
1897 break
1898 time.sleep(0.1)
1899 count += 1
1900 else:
1901 self.fail("Did not receive communication from the subprocess")
1902
Brian Curtineb24d742010-04-12 17:16:38 +00001903 os.kill(proc.pid, sig)
1904 self.assertEqual(proc.wait(), sig)
1905
1906 def test_kill_sigterm(self):
1907 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001908 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001909
1910 def test_kill_int(self):
1911 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001912 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001913
1914 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001915 tagname = "test_os_%s" % uuid.uuid1()
1916 m = mmap.mmap(-1, 1, tagname)
1917 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001918 # Run a script which has console control handling enabled.
1919 proc = subprocess.Popen([sys.executable,
1920 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001921 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001922 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1923 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001924 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001925 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001926 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001927 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001928 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001929 count += 1
1930 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001931 # Forcefully kill the process if we weren't able to signal it.
1932 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001933 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001934 os.kill(proc.pid, event)
1935 # proc.send_signal(event) could also be done here.
1936 # Allow time for the signal to be passed and the process to exit.
1937 time.sleep(0.5)
1938 if not proc.poll():
1939 # Forcefully kill the process if we weren't able to signal it.
1940 os.kill(proc.pid, signal.SIGINT)
1941 self.fail("subprocess did not stop on {}".format(name))
1942
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001943 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001944 def test_CTRL_C_EVENT(self):
1945 from ctypes import wintypes
1946 import ctypes
1947
1948 # Make a NULL value by creating a pointer with no argument.
1949 NULL = ctypes.POINTER(ctypes.c_int)()
1950 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1951 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1952 wintypes.BOOL)
1953 SetConsoleCtrlHandler.restype = wintypes.BOOL
1954
1955 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001956 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001957 # by subprocesses.
1958 SetConsoleCtrlHandler(NULL, 0)
1959
1960 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1961
1962 def test_CTRL_BREAK_EVENT(self):
1963 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1964
1965
Brian Curtind40e6f72010-07-08 21:39:08 +00001966@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001967class Win32ListdirTests(unittest.TestCase):
1968 """Test listdir on Windows."""
1969
1970 def setUp(self):
1971 self.created_paths = []
1972 for i in range(2):
1973 dir_name = 'SUB%d' % i
1974 dir_path = os.path.join(support.TESTFN, dir_name)
1975 file_name = 'FILE%d' % i
1976 file_path = os.path.join(support.TESTFN, file_name)
1977 os.makedirs(dir_path)
1978 with open(file_path, 'w') as f:
1979 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1980 self.created_paths.extend([dir_name, file_name])
1981 self.created_paths.sort()
1982
1983 def tearDown(self):
1984 shutil.rmtree(support.TESTFN)
1985
1986 def test_listdir_no_extended_path(self):
1987 """Test when the path is not an "extended" path."""
1988 # unicode
1989 self.assertEqual(
1990 sorted(os.listdir(support.TESTFN)),
1991 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001992
Tim Golden781bbeb2013-10-25 20:24:06 +01001993 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001994 self.assertEqual(
1995 sorted(os.listdir(os.fsencode(support.TESTFN))),
1996 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001997
1998 def test_listdir_extended_path(self):
1999 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002000 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002001 # unicode
2002 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2003 self.assertEqual(
2004 sorted(os.listdir(path)),
2005 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002006
Tim Golden781bbeb2013-10-25 20:24:06 +01002007 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002008 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2009 self.assertEqual(
2010 sorted(os.listdir(path)),
2011 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002012
2013
2014@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002015@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002016class Win32SymlinkTests(unittest.TestCase):
2017 filelink = 'filelinktest'
2018 filelink_target = os.path.abspath(__file__)
2019 dirlink = 'dirlinktest'
2020 dirlink_target = os.path.dirname(filelink_target)
2021 missing_link = 'missing link'
2022
2023 def setUp(self):
2024 assert os.path.exists(self.dirlink_target)
2025 assert os.path.exists(self.filelink_target)
2026 assert not os.path.exists(self.dirlink)
2027 assert not os.path.exists(self.filelink)
2028 assert not os.path.exists(self.missing_link)
2029
2030 def tearDown(self):
2031 if os.path.exists(self.filelink):
2032 os.remove(self.filelink)
2033 if os.path.exists(self.dirlink):
2034 os.rmdir(self.dirlink)
2035 if os.path.lexists(self.missing_link):
2036 os.remove(self.missing_link)
2037
2038 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002039 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002040 self.assertTrue(os.path.exists(self.dirlink))
2041 self.assertTrue(os.path.isdir(self.dirlink))
2042 self.assertTrue(os.path.islink(self.dirlink))
2043 self.check_stat(self.dirlink, self.dirlink_target)
2044
2045 def test_file_link(self):
2046 os.symlink(self.filelink_target, self.filelink)
2047 self.assertTrue(os.path.exists(self.filelink))
2048 self.assertTrue(os.path.isfile(self.filelink))
2049 self.assertTrue(os.path.islink(self.filelink))
2050 self.check_stat(self.filelink, self.filelink_target)
2051
2052 def _create_missing_dir_link(self):
2053 'Create a "directory" link to a non-existent target'
2054 linkname = self.missing_link
2055 if os.path.lexists(linkname):
2056 os.remove(linkname)
2057 target = r'c:\\target does not exist.29r3c740'
2058 assert not os.path.exists(target)
2059 target_is_dir = True
2060 os.symlink(target, linkname, target_is_dir)
2061
2062 def test_remove_directory_link_to_missing_target(self):
2063 self._create_missing_dir_link()
2064 # For compatibility with Unix, os.remove will check the
2065 # directory status and call RemoveDirectory if the symlink
2066 # was created with target_is_dir==True.
2067 os.remove(self.missing_link)
2068
2069 @unittest.skip("currently fails; consider for improvement")
2070 def test_isdir_on_directory_link_to_missing_target(self):
2071 self._create_missing_dir_link()
2072 # consider having isdir return true for directory links
2073 self.assertTrue(os.path.isdir(self.missing_link))
2074
2075 @unittest.skip("currently fails; consider for improvement")
2076 def test_rmdir_on_directory_link_to_missing_target(self):
2077 self._create_missing_dir_link()
2078 # consider allowing rmdir to remove directory links
2079 os.rmdir(self.missing_link)
2080
2081 def check_stat(self, link, target):
2082 self.assertEqual(os.stat(link), os.stat(target))
2083 self.assertNotEqual(os.lstat(link), os.stat(link))
2084
Brian Curtind25aef52011-06-13 15:16:04 -05002085 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002086 self.assertEqual(os.stat(bytes_link), os.stat(target))
2087 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002088
2089 def test_12084(self):
2090 level1 = os.path.abspath(support.TESTFN)
2091 level2 = os.path.join(level1, "level2")
2092 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002093 self.addCleanup(support.rmtree, level1)
2094
2095 os.mkdir(level1)
2096 os.mkdir(level2)
2097 os.mkdir(level3)
2098
2099 file1 = os.path.abspath(os.path.join(level1, "file1"))
2100 create_file(file1)
2101
2102 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002103 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002104 os.chdir(level2)
2105 link = os.path.join(level2, "link")
2106 os.symlink(os.path.relpath(file1), "link")
2107 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002108
Victor Stinnerae39d232016-03-24 17:12:55 +01002109 # Check os.stat calls from the same dir as the link
2110 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002111
Victor Stinnerae39d232016-03-24 17:12:55 +01002112 # Check os.stat calls from a dir below the link
2113 os.chdir(level1)
2114 self.assertEqual(os.stat(file1),
2115 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002116
Victor Stinnerae39d232016-03-24 17:12:55 +01002117 # Check os.stat calls from a dir above the link
2118 os.chdir(level3)
2119 self.assertEqual(os.stat(file1),
2120 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002121 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002122 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002123
Brian Curtind40e6f72010-07-08 21:39:08 +00002124
Tim Golden0321cf22014-05-05 19:46:17 +01002125@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2126class Win32JunctionTests(unittest.TestCase):
2127 junction = 'junctiontest'
2128 junction_target = os.path.dirname(os.path.abspath(__file__))
2129
2130 def setUp(self):
2131 assert os.path.exists(self.junction_target)
2132 assert not os.path.exists(self.junction)
2133
2134 def tearDown(self):
2135 if os.path.exists(self.junction):
2136 # os.rmdir delegates to Windows' RemoveDirectoryW,
2137 # which removes junction points safely.
2138 os.rmdir(self.junction)
2139
2140 def test_create_junction(self):
2141 _winapi.CreateJunction(self.junction_target, self.junction)
2142 self.assertTrue(os.path.exists(self.junction))
2143 self.assertTrue(os.path.isdir(self.junction))
2144
2145 # Junctions are not recognized as links.
2146 self.assertFalse(os.path.islink(self.junction))
2147
2148 def test_unlink_removes_junction(self):
2149 _winapi.CreateJunction(self.junction_target, self.junction)
2150 self.assertTrue(os.path.exists(self.junction))
2151
2152 os.unlink(self.junction)
2153 self.assertFalse(os.path.exists(self.junction))
2154
2155
Jason R. Coombs3a092862013-05-27 23:21:28 -04002156@support.skip_unless_symlink
2157class NonLocalSymlinkTests(unittest.TestCase):
2158
2159 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002160 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002161 Create this structure:
2162
2163 base
2164 \___ some_dir
2165 """
2166 os.makedirs('base/some_dir')
2167
2168 def tearDown(self):
2169 shutil.rmtree('base')
2170
2171 def test_directory_link_nonlocal(self):
2172 """
2173 The symlink target should resolve relative to the link, not relative
2174 to the current directory.
2175
2176 Then, link base/some_link -> base/some_dir and ensure that some_link
2177 is resolved as a directory.
2178
2179 In issue13772, it was discovered that directory detection failed if
2180 the symlink target was not specified relative to the current
2181 directory, which was a defect in the implementation.
2182 """
2183 src = os.path.join('base', 'some_link')
2184 os.symlink('some_dir', src)
2185 assert os.path.isdir(src)
2186
2187
Victor Stinnere8d51452010-08-19 01:05:19 +00002188class FSEncodingTests(unittest.TestCase):
2189 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002190 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2191 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002192
Victor Stinnere8d51452010-08-19 01:05:19 +00002193 def test_identity(self):
2194 # assert fsdecode(fsencode(x)) == x
2195 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2196 try:
2197 bytesfn = os.fsencode(fn)
2198 except UnicodeEncodeError:
2199 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002200 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002201
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002202
Brett Cannonefb00c02012-02-29 18:31:31 -05002203
2204class DeviceEncodingTests(unittest.TestCase):
2205
2206 def test_bad_fd(self):
2207 # Return None when an fd doesn't actually exist.
2208 self.assertIsNone(os.device_encoding(123456))
2209
Philip Jenveye308b7c2012-02-29 16:16:15 -08002210 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2211 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002212 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002213 def test_device_encoding(self):
2214 encoding = os.device_encoding(0)
2215 self.assertIsNotNone(encoding)
2216 self.assertTrue(codecs.lookup(encoding))
2217
2218
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002219class PidTests(unittest.TestCase):
2220 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2221 def test_getppid(self):
2222 p = subprocess.Popen([sys.executable, '-c',
2223 'import os; print(os.getppid())'],
2224 stdout=subprocess.PIPE)
2225 stdout, _ = p.communicate()
2226 # We are the parent of our subprocess
2227 self.assertEqual(int(stdout), os.getpid())
2228
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002229 def test_waitpid(self):
2230 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002231 # Add an implicit test for PyUnicode_FSConverter().
2232 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002233 status = os.waitpid(pid, 0)
2234 self.assertEqual(status, (pid, 0))
2235
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002236
Victor Stinner4659ccf2016-09-14 10:57:00 +02002237class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002238 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002239 self.exitcode = 17
2240
2241 filename = support.TESTFN
2242 self.addCleanup(support.unlink, filename)
2243
2244 if not with_env:
2245 code = 'import sys; sys.exit(%s)' % self.exitcode
2246 else:
2247 self.env = dict(os.environ)
2248 # create an unique key
2249 self.key = str(uuid.uuid4())
2250 self.env[self.key] = self.key
2251 # read the variable from os.environ to check that it exists
2252 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2253 % (self.key, self.exitcode))
2254
2255 with open(filename, "w") as fp:
2256 fp.write(code)
2257
Berker Peksag81816462016-09-15 20:19:47 +03002258 args = [sys.executable, filename]
2259 if use_bytes:
2260 args = [os.fsencode(a) for a in args]
2261 self.env = {os.fsencode(k): os.fsencode(v)
2262 for k, v in self.env.items()}
2263
2264 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002265
Berker Peksag4af23d72016-09-15 20:32:44 +03002266 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002267 def test_spawnl(self):
2268 args = self.create_args()
2269 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2270 self.assertEqual(exitcode, self.exitcode)
2271
Berker Peksag4af23d72016-09-15 20:32:44 +03002272 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002273 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002274 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002275 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2276 self.assertEqual(exitcode, self.exitcode)
2277
Berker Peksag4af23d72016-09-15 20:32:44 +03002278 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002279 def test_spawnlp(self):
2280 args = self.create_args()
2281 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2282 self.assertEqual(exitcode, self.exitcode)
2283
Berker Peksag4af23d72016-09-15 20:32:44 +03002284 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002285 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002286 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002287 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2288 self.assertEqual(exitcode, self.exitcode)
2289
Berker Peksag4af23d72016-09-15 20:32:44 +03002290 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002291 def test_spawnv(self):
2292 args = self.create_args()
2293 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2294 self.assertEqual(exitcode, self.exitcode)
2295
Berker Peksag4af23d72016-09-15 20:32:44 +03002296 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002297 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002298 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002299 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2300 self.assertEqual(exitcode, self.exitcode)
2301
Berker Peksag4af23d72016-09-15 20:32:44 +03002302 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002303 def test_spawnvp(self):
2304 args = self.create_args()
2305 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2306 self.assertEqual(exitcode, self.exitcode)
2307
Berker Peksag4af23d72016-09-15 20:32:44 +03002308 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002309 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002310 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002311 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2312 self.assertEqual(exitcode, self.exitcode)
2313
Berker Peksag4af23d72016-09-15 20:32:44 +03002314 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002315 def test_nowait(self):
2316 args = self.create_args()
2317 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2318 result = os.waitpid(pid, 0)
2319 self.assertEqual(result[0], pid)
2320 status = result[1]
2321 if hasattr(os, 'WIFEXITED'):
2322 self.assertTrue(os.WIFEXITED(status))
2323 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2324 else:
2325 self.assertEqual(status, self.exitcode << 8)
2326
Berker Peksag4af23d72016-09-15 20:32:44 +03002327 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002328 def test_spawnve_bytes(self):
2329 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2330 args = self.create_args(with_env=True, use_bytes=True)
2331 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2332 self.assertEqual(exitcode, self.exitcode)
2333
Steve Dower859fd7b2016-11-19 18:53:19 -08002334 @requires_os_func('spawnl')
2335 def test_spawnl_noargs(self):
2336 args = self.create_args()
2337 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002338 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002339
2340 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002341 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002342 args = self.create_args()
2343 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002344 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002345
2346 @requires_os_func('spawnv')
2347 def test_spawnv_noargs(self):
2348 args = self.create_args()
2349 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2350 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002351 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2352 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002353
2354 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002355 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002356 args = self.create_args()
2357 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2358 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002359 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2360 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002361
Brian Curtin0151b8e2010-09-24 13:43:43 +00002362# The introduction of this TestCase caused at least two different errors on
2363# *nix buildbots. Temporarily skip this to let the buildbots move along.
2364@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002365@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2366class LoginTests(unittest.TestCase):
2367 def test_getlogin(self):
2368 user_name = os.getlogin()
2369 self.assertNotEqual(len(user_name), 0)
2370
2371
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002372@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2373 "needs os.getpriority and os.setpriority")
2374class ProgramPriorityTests(unittest.TestCase):
2375 """Tests for os.getpriority() and os.setpriority()."""
2376
2377 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002378
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002379 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2380 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2381 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002382 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2383 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002384 raise unittest.SkipTest("unable to reliably test setpriority "
2385 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002386 else:
2387 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002388 finally:
2389 try:
2390 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2391 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002392 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002393 raise
2394
2395
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002396if threading is not None:
2397 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002398
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002399 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002400
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002401 def __init__(self, conn):
2402 asynchat.async_chat.__init__(self, conn)
2403 self.in_buffer = []
2404 self.closed = False
2405 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002406
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002407 def handle_read(self):
2408 data = self.recv(4096)
2409 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002410
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002411 def get_data(self):
2412 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002413
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002414 def handle_close(self):
2415 self.close()
2416 self.closed = True
2417
2418 def handle_error(self):
2419 raise
2420
2421 def __init__(self, address):
2422 threading.Thread.__init__(self)
2423 asyncore.dispatcher.__init__(self)
2424 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2425 self.bind(address)
2426 self.listen(5)
2427 self.host, self.port = self.socket.getsockname()[:2]
2428 self.handler_instance = None
2429 self._active = False
2430 self._active_lock = threading.Lock()
2431
2432 # --- public API
2433
2434 @property
2435 def running(self):
2436 return self._active
2437
2438 def start(self):
2439 assert not self.running
2440 self.__flag = threading.Event()
2441 threading.Thread.start(self)
2442 self.__flag.wait()
2443
2444 def stop(self):
2445 assert self.running
2446 self._active = False
2447 self.join()
2448
2449 def wait(self):
2450 # wait for handler connection to be closed, then stop the server
2451 while not getattr(self.handler_instance, "closed", False):
2452 time.sleep(0.001)
2453 self.stop()
2454
2455 # --- internals
2456
2457 def run(self):
2458 self._active = True
2459 self.__flag.set()
2460 while self._active and asyncore.socket_map:
2461 self._active_lock.acquire()
2462 asyncore.loop(timeout=0.001, count=1)
2463 self._active_lock.release()
2464 asyncore.close_all()
2465
2466 def handle_accept(self):
2467 conn, addr = self.accept()
2468 self.handler_instance = self.Handler(conn)
2469
2470 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002471 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002472 handle_read = handle_connect
2473
2474 def writable(self):
2475 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002476
2477 def handle_error(self):
2478 raise
2479
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002480
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002481@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002482@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2483class TestSendfile(unittest.TestCase):
2484
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002485 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002486 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002487 not sys.platform.startswith("solaris") and \
2488 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002489 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2490 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002491
2492 @classmethod
2493 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002494 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002495 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002496
2497 @classmethod
2498 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002499 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002500 support.unlink(support.TESTFN)
2501
2502 def setUp(self):
2503 self.server = SendfileTestServer((support.HOST, 0))
2504 self.server.start()
2505 self.client = socket.socket()
2506 self.client.connect((self.server.host, self.server.port))
2507 self.client.settimeout(1)
2508 # synchronize by waiting for "220 ready" response
2509 self.client.recv(1024)
2510 self.sockno = self.client.fileno()
2511 self.file = open(support.TESTFN, 'rb')
2512 self.fileno = self.file.fileno()
2513
2514 def tearDown(self):
2515 self.file.close()
2516 self.client.close()
2517 if self.server.running:
2518 self.server.stop()
2519
2520 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2521 """A higher level wrapper representing how an application is
2522 supposed to use sendfile().
2523 """
2524 while 1:
2525 try:
2526 if self.SUPPORT_HEADERS_TRAILERS:
2527 return os.sendfile(sock, file, offset, nbytes, headers,
2528 trailers)
2529 else:
2530 return os.sendfile(sock, file, offset, nbytes)
2531 except OSError as err:
2532 if err.errno == errno.ECONNRESET:
2533 # disconnected
2534 raise
2535 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2536 # we have to retry send data
2537 continue
2538 else:
2539 raise
2540
2541 def test_send_whole_file(self):
2542 # normal send
2543 total_sent = 0
2544 offset = 0
2545 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002546 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002547 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2548 if sent == 0:
2549 break
2550 offset += sent
2551 total_sent += sent
2552 self.assertTrue(sent <= nbytes)
2553 self.assertEqual(offset, total_sent)
2554
2555 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002556 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002557 self.client.close()
2558 self.server.wait()
2559 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002560 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002561 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002562
2563 def test_send_at_certain_offset(self):
2564 # start sending a file at a certain offset
2565 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002566 offset = len(self.DATA) // 2
2567 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002568 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002569 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002570 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2571 if sent == 0:
2572 break
2573 offset += sent
2574 total_sent += sent
2575 self.assertTrue(sent <= nbytes)
2576
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002577 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002578 self.client.close()
2579 self.server.wait()
2580 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002581 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002582 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002583 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002584 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002585
2586 def test_offset_overflow(self):
2587 # specify an offset > file size
2588 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002589 try:
2590 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2591 except OSError as e:
2592 # Solaris can raise EINVAL if offset >= file length, ignore.
2593 if e.errno != errno.EINVAL:
2594 raise
2595 else:
2596 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002597 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002598 self.client.close()
2599 self.server.wait()
2600 data = self.server.handler_instance.get_data()
2601 self.assertEqual(data, b'')
2602
2603 def test_invalid_offset(self):
2604 with self.assertRaises(OSError) as cm:
2605 os.sendfile(self.sockno, self.fileno, -1, 4096)
2606 self.assertEqual(cm.exception.errno, errno.EINVAL)
2607
Martin Panterbf19d162015-09-09 01:01:13 +00002608 def test_keywords(self):
2609 # Keyword arguments should be supported
2610 os.sendfile(out=self.sockno, offset=0, count=4096,
2611 **{'in': self.fileno})
2612 if self.SUPPORT_HEADERS_TRAILERS:
2613 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002614 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002615
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002616 # --- headers / trailers tests
2617
Serhiy Storchaka43767632013-11-03 21:31:38 +02002618 @requires_headers_trailers
2619 def test_headers(self):
2620 total_sent = 0
2621 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2622 headers=[b"x" * 512])
2623 total_sent += sent
2624 offset = 4096
2625 nbytes = 4096
2626 while 1:
2627 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2628 offset, nbytes)
2629 if sent == 0:
2630 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002631 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002632 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002633
Serhiy Storchaka43767632013-11-03 21:31:38 +02002634 expected_data = b"x" * 512 + self.DATA
2635 self.assertEqual(total_sent, len(expected_data))
2636 self.client.close()
2637 self.server.wait()
2638 data = self.server.handler_instance.get_data()
2639 self.assertEqual(hash(data), hash(expected_data))
2640
2641 @requires_headers_trailers
2642 def test_trailers(self):
2643 TESTFN2 = support.TESTFN + "2"
2644 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002645
2646 self.addCleanup(support.unlink, TESTFN2)
2647 create_file(TESTFN2, file_data)
2648
2649 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002650 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2651 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002652 self.client.close()
2653 self.server.wait()
2654 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002655 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002656
Serhiy Storchaka43767632013-11-03 21:31:38 +02002657 @requires_headers_trailers
2658 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2659 'test needs os.SF_NODISKIO')
2660 def test_flags(self):
2661 try:
2662 os.sendfile(self.sockno, self.fileno, 0, 4096,
2663 flags=os.SF_NODISKIO)
2664 except OSError as err:
2665 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2666 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002667
2668
Larry Hastings9cf065c2012-06-22 16:30:09 -07002669def supports_extended_attributes():
2670 if not hasattr(os, "setxattr"):
2671 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002672
Larry Hastings9cf065c2012-06-22 16:30:09 -07002673 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002674 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002675 try:
2676 os.setxattr(fp.fileno(), b"user.test", b"")
2677 except OSError:
2678 return False
2679 finally:
2680 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002681
2682 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002683
2684
2685@unittest.skipUnless(supports_extended_attributes(),
2686 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002687# Kernels < 2.6.39 don't respect setxattr flags.
2688@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002689class ExtendedAttributeTests(unittest.TestCase):
2690
Larry Hastings9cf065c2012-06-22 16:30:09 -07002691 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002692 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002693 self.addCleanup(support.unlink, fn)
2694 create_file(fn)
2695
Benjamin Peterson799bd802011-08-31 22:15:17 -04002696 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002697 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002698 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002699
Victor Stinnerf12e5062011-10-16 22:12:03 +02002700 init_xattr = listxattr(fn)
2701 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002702
Larry Hastings9cf065c2012-06-22 16:30:09 -07002703 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002704 xattr = set(init_xattr)
2705 xattr.add("user.test")
2706 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002707 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2708 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2709 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002710
Benjamin Peterson799bd802011-08-31 22:15:17 -04002711 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002712 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002713 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002714
Benjamin Peterson799bd802011-08-31 22:15:17 -04002715 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002716 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002717 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002718
Larry Hastings9cf065c2012-06-22 16:30:09 -07002719 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002720 xattr.add("user.test2")
2721 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002722 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002723
Benjamin Peterson799bd802011-08-31 22:15:17 -04002724 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002725 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002726 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002727
Victor Stinnerf12e5062011-10-16 22:12:03 +02002728 xattr.remove("user.test")
2729 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002730 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2731 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2732 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2733 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002734 many = sorted("user.test{}".format(i) for i in range(100))
2735 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002736 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002737 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002738
Larry Hastings9cf065c2012-06-22 16:30:09 -07002739 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002740 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002741 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002742
2743 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2744 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002745
2746 def test_simple(self):
2747 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2748 os.listxattr)
2749
2750 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002751 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2752 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002753
2754 def test_fds(self):
2755 def getxattr(path, *args):
2756 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002757 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002758 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002759 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002760 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002761 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002762 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002763 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002764 def listxattr(path, *args):
2765 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002766 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002767 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2768
2769
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002770@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2771class TermsizeTests(unittest.TestCase):
2772 def test_does_not_crash(self):
2773 """Check if get_terminal_size() returns a meaningful value.
2774
2775 There's no easy portable way to actually check the size of the
2776 terminal, so let's check if it returns something sensible instead.
2777 """
2778 try:
2779 size = os.get_terminal_size()
2780 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002781 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002782 # Under win32 a generic OSError can be thrown if the
2783 # handle cannot be retrieved
2784 self.skipTest("failed to query terminal size")
2785 raise
2786
Antoine Pitroucfade362012-02-08 23:48:59 +01002787 self.assertGreaterEqual(size.columns, 0)
2788 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002789
2790 def test_stty_match(self):
2791 """Check if stty returns the same results
2792
2793 stty actually tests stdin, so get_terminal_size is invoked on
2794 stdin explicitly. If stty succeeded, then get_terminal_size()
2795 should work too.
2796 """
2797 try:
2798 size = subprocess.check_output(['stty', 'size']).decode().split()
2799 except (FileNotFoundError, subprocess.CalledProcessError):
2800 self.skipTest("stty invocation failed")
2801 expected = (int(size[1]), int(size[0])) # reversed order
2802
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002803 try:
2804 actual = os.get_terminal_size(sys.__stdin__.fileno())
2805 except OSError as e:
2806 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2807 # Under win32 a generic OSError can be thrown if the
2808 # handle cannot be retrieved
2809 self.skipTest("failed to query terminal size")
2810 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002811 self.assertEqual(expected, actual)
2812
2813
Victor Stinner292c8352012-10-30 02:17:38 +01002814class OSErrorTests(unittest.TestCase):
2815 def setUp(self):
2816 class Str(str):
2817 pass
2818
Victor Stinnerafe17062012-10-31 22:47:43 +01002819 self.bytes_filenames = []
2820 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002821 if support.TESTFN_UNENCODABLE is not None:
2822 decoded = support.TESTFN_UNENCODABLE
2823 else:
2824 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002825 self.unicode_filenames.append(decoded)
2826 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002827 if support.TESTFN_UNDECODABLE is not None:
2828 encoded = support.TESTFN_UNDECODABLE
2829 else:
2830 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002831 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002832 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002833 self.bytes_filenames.append(memoryview(encoded))
2834
2835 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002836
2837 def test_oserror_filename(self):
2838 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002839 (self.filenames, os.chdir,),
2840 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002841 (self.filenames, os.lstat,),
2842 (self.filenames, os.open, os.O_RDONLY),
2843 (self.filenames, os.rmdir,),
2844 (self.filenames, os.stat,),
2845 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002846 ]
2847 if sys.platform == "win32":
2848 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002849 (self.bytes_filenames, os.rename, b"dst"),
2850 (self.bytes_filenames, os.replace, b"dst"),
2851 (self.unicode_filenames, os.rename, "dst"),
2852 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002853 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002854 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002855 else:
2856 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002857 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002858 (self.filenames, os.rename, "dst"),
2859 (self.filenames, os.replace, "dst"),
2860 ))
2861 if hasattr(os, "chown"):
2862 funcs.append((self.filenames, os.chown, 0, 0))
2863 if hasattr(os, "lchown"):
2864 funcs.append((self.filenames, os.lchown, 0, 0))
2865 if hasattr(os, "truncate"):
2866 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002867 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002868 funcs.append((self.filenames, os.chflags, 0))
2869 if hasattr(os, "lchflags"):
2870 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002871 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002872 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002873 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002874 if sys.platform == "win32":
2875 funcs.append((self.bytes_filenames, os.link, b"dst"))
2876 funcs.append((self.unicode_filenames, os.link, "dst"))
2877 else:
2878 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002879 if hasattr(os, "listxattr"):
2880 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002881 (self.filenames, os.listxattr,),
2882 (self.filenames, os.getxattr, "user.test"),
2883 (self.filenames, os.setxattr, "user.test", b'user'),
2884 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002885 ))
2886 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002887 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002888 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002889 if sys.platform == "win32":
2890 funcs.append((self.unicode_filenames, os.readlink,))
2891 else:
2892 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002893
Steve Dowercc16be82016-09-08 10:35:16 -07002894
Victor Stinnerafe17062012-10-31 22:47:43 +01002895 for filenames, func, *func_args in funcs:
2896 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002897 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002898 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002899 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002900 else:
2901 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2902 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002903 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002904 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002905 except UnicodeDecodeError:
2906 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002907 else:
2908 self.fail("No exception thrown by {}".format(func))
2909
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002910class CPUCountTests(unittest.TestCase):
2911 def test_cpu_count(self):
2912 cpus = os.cpu_count()
2913 if cpus is not None:
2914 self.assertIsInstance(cpus, int)
2915 self.assertGreater(cpus, 0)
2916 else:
2917 self.skipTest("Could not determine the number of CPUs")
2918
Victor Stinnerdaf45552013-08-28 00:53:59 +02002919
2920class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002921 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002922 fd = os.open(__file__, os.O_RDONLY)
2923 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002924 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002925
Victor Stinnerdaf45552013-08-28 00:53:59 +02002926 os.set_inheritable(fd, True)
2927 self.assertEqual(os.get_inheritable(fd), True)
2928
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002929 @unittest.skipIf(fcntl is None, "need fcntl")
2930 def test_get_inheritable_cloexec(self):
2931 fd = os.open(__file__, os.O_RDONLY)
2932 self.addCleanup(os.close, fd)
2933 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002934
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002935 # clear FD_CLOEXEC flag
2936 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2937 flags &= ~fcntl.FD_CLOEXEC
2938 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002939
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002940 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002941
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002942 @unittest.skipIf(fcntl is None, "need fcntl")
2943 def test_set_inheritable_cloexec(self):
2944 fd = os.open(__file__, os.O_RDONLY)
2945 self.addCleanup(os.close, fd)
2946 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2947 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002948
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002949 os.set_inheritable(fd, True)
2950 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2951 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002952
Victor Stinnerdaf45552013-08-28 00:53:59 +02002953 def test_open(self):
2954 fd = os.open(__file__, os.O_RDONLY)
2955 self.addCleanup(os.close, fd)
2956 self.assertEqual(os.get_inheritable(fd), False)
2957
2958 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2959 def test_pipe(self):
2960 rfd, wfd = os.pipe()
2961 self.addCleanup(os.close, rfd)
2962 self.addCleanup(os.close, wfd)
2963 self.assertEqual(os.get_inheritable(rfd), False)
2964 self.assertEqual(os.get_inheritable(wfd), False)
2965
2966 def test_dup(self):
2967 fd1 = os.open(__file__, os.O_RDONLY)
2968 self.addCleanup(os.close, fd1)
2969
2970 fd2 = os.dup(fd1)
2971 self.addCleanup(os.close, fd2)
2972 self.assertEqual(os.get_inheritable(fd2), False)
2973
2974 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2975 def test_dup2(self):
2976 fd = os.open(__file__, os.O_RDONLY)
2977 self.addCleanup(os.close, fd)
2978
2979 # inheritable by default
2980 fd2 = os.open(__file__, os.O_RDONLY)
2981 try:
2982 os.dup2(fd, fd2)
2983 self.assertEqual(os.get_inheritable(fd2), True)
2984 finally:
2985 os.close(fd2)
2986
2987 # force non-inheritable
2988 fd3 = os.open(__file__, os.O_RDONLY)
2989 try:
2990 os.dup2(fd, fd3, inheritable=False)
2991 self.assertEqual(os.get_inheritable(fd3), False)
2992 finally:
2993 os.close(fd3)
2994
2995 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2996 def test_openpty(self):
2997 master_fd, slave_fd = os.openpty()
2998 self.addCleanup(os.close, master_fd)
2999 self.addCleanup(os.close, slave_fd)
3000 self.assertEqual(os.get_inheritable(master_fd), False)
3001 self.assertEqual(os.get_inheritable(slave_fd), False)
3002
3003
Brett Cannon3f9183b2016-08-26 14:44:48 -07003004class PathTConverterTests(unittest.TestCase):
3005 # tuples of (function name, allows fd arguments, additional arguments to
3006 # function, cleanup function)
3007 functions = [
3008 ('stat', True, (), None),
3009 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003010 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003011 ('chflags', False, (0,), None),
3012 ('lchflags', False, (0,), None),
3013 ('open', False, (0,), getattr(os, 'close', None)),
3014 ]
3015
3016 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003017 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003018 if os.name == 'nt':
3019 bytes_fspath = bytes_filename = None
3020 else:
3021 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003022 bytes_fspath = _PathLike(bytes_filename)
3023 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003024 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003025 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003026
Brett Cannonec6ce872016-09-06 15:50:29 -07003027 int_fspath = _PathLike(fd)
3028 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003029
3030 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3031 with self.subTest(name=name):
3032 try:
3033 fn = getattr(os, name)
3034 except AttributeError:
3035 continue
3036
Brett Cannon8f96a302016-08-26 19:30:11 -07003037 for path in (str_filename, bytes_filename, str_fspath,
3038 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003039 if path is None:
3040 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003041 with self.subTest(name=name, path=path):
3042 result = fn(path, *extra_args)
3043 if cleanup_fn is not None:
3044 cleanup_fn(result)
3045
3046 with self.assertRaisesRegex(
3047 TypeError, 'should be string, bytes'):
3048 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003049
3050 if allow_fd:
3051 result = fn(fd, *extra_args) # should not fail
3052 if cleanup_fn is not None:
3053 cleanup_fn(result)
3054 else:
3055 with self.assertRaisesRegex(
3056 TypeError,
3057 'os.PathLike'):
3058 fn(fd, *extra_args)
3059
3060
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003061@unittest.skipUnless(hasattr(os, 'get_blocking'),
3062 'needs os.get_blocking() and os.set_blocking()')
3063class BlockingTests(unittest.TestCase):
3064 def test_blocking(self):
3065 fd = os.open(__file__, os.O_RDONLY)
3066 self.addCleanup(os.close, fd)
3067 self.assertEqual(os.get_blocking(fd), True)
3068
3069 os.set_blocking(fd, False)
3070 self.assertEqual(os.get_blocking(fd), False)
3071
3072 os.set_blocking(fd, True)
3073 self.assertEqual(os.get_blocking(fd), True)
3074
3075
Yury Selivanov97e2e062014-09-26 12:33:06 -04003076
3077class ExportsTests(unittest.TestCase):
3078 def test_os_all(self):
3079 self.assertIn('open', os.__all__)
3080 self.assertIn('walk', os.__all__)
3081
3082
Victor Stinner6036e442015-03-08 01:58:04 +01003083class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003084 check_no_resource_warning = support.check_no_resource_warning
3085
Victor Stinner6036e442015-03-08 01:58:04 +01003086 def setUp(self):
3087 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003088 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003089 self.addCleanup(support.rmtree, self.path)
3090 os.mkdir(self.path)
3091
3092 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003093 path = self.bytes_path if isinstance(name, bytes) else self.path
3094 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003095 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003096 return filename
3097
3098 def get_entries(self, names):
3099 entries = dict((entry.name, entry)
3100 for entry in os.scandir(self.path))
3101 self.assertEqual(sorted(entries.keys()), names)
3102 return entries
3103
3104 def assert_stat_equal(self, stat1, stat2, skip_fields):
3105 if skip_fields:
3106 for attr in dir(stat1):
3107 if not attr.startswith("st_"):
3108 continue
3109 if attr in ("st_dev", "st_ino", "st_nlink"):
3110 continue
3111 self.assertEqual(getattr(stat1, attr),
3112 getattr(stat2, attr),
3113 (stat1, stat2, attr))
3114 else:
3115 self.assertEqual(stat1, stat2)
3116
3117 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003118 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003119 self.assertEqual(entry.name, name)
3120 self.assertEqual(entry.path, os.path.join(self.path, name))
3121 self.assertEqual(entry.inode(),
3122 os.stat(entry.path, follow_symlinks=False).st_ino)
3123
3124 entry_stat = os.stat(entry.path)
3125 self.assertEqual(entry.is_dir(),
3126 stat.S_ISDIR(entry_stat.st_mode))
3127 self.assertEqual(entry.is_file(),
3128 stat.S_ISREG(entry_stat.st_mode))
3129 self.assertEqual(entry.is_symlink(),
3130 os.path.islink(entry.path))
3131
3132 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3133 self.assertEqual(entry.is_dir(follow_symlinks=False),
3134 stat.S_ISDIR(entry_lstat.st_mode))
3135 self.assertEqual(entry.is_file(follow_symlinks=False),
3136 stat.S_ISREG(entry_lstat.st_mode))
3137
3138 self.assert_stat_equal(entry.stat(),
3139 entry_stat,
3140 os.name == 'nt' and not is_symlink)
3141 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3142 entry_lstat,
3143 os.name == 'nt')
3144
3145 def test_attributes(self):
3146 link = hasattr(os, 'link')
3147 symlink = support.can_symlink()
3148
3149 dirname = os.path.join(self.path, "dir")
3150 os.mkdir(dirname)
3151 filename = self.create_file("file.txt")
3152 if link:
3153 os.link(filename, os.path.join(self.path, "link_file.txt"))
3154 if symlink:
3155 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3156 target_is_directory=True)
3157 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3158
3159 names = ['dir', 'file.txt']
3160 if link:
3161 names.append('link_file.txt')
3162 if symlink:
3163 names.extend(('symlink_dir', 'symlink_file.txt'))
3164 entries = self.get_entries(names)
3165
3166 entry = entries['dir']
3167 self.check_entry(entry, 'dir', True, False, False)
3168
3169 entry = entries['file.txt']
3170 self.check_entry(entry, 'file.txt', False, True, False)
3171
3172 if link:
3173 entry = entries['link_file.txt']
3174 self.check_entry(entry, 'link_file.txt', False, True, False)
3175
3176 if symlink:
3177 entry = entries['symlink_dir']
3178 self.check_entry(entry, 'symlink_dir', True, False, True)
3179
3180 entry = entries['symlink_file.txt']
3181 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3182
3183 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003184 path = self.bytes_path if isinstance(name, bytes) else self.path
3185 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003186 self.assertEqual(len(entries), 1)
3187
3188 entry = entries[0]
3189 self.assertEqual(entry.name, name)
3190 return entry
3191
Brett Cannon96881cd2016-06-10 14:37:21 -07003192 def create_file_entry(self, name='file.txt'):
3193 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003194 return self.get_entry(os.path.basename(filename))
3195
3196 def test_current_directory(self):
3197 filename = self.create_file()
3198 old_dir = os.getcwd()
3199 try:
3200 os.chdir(self.path)
3201
3202 # call scandir() without parameter: it must list the content
3203 # of the current directory
3204 entries = dict((entry.name, entry) for entry in os.scandir())
3205 self.assertEqual(sorted(entries.keys()),
3206 [os.path.basename(filename)])
3207 finally:
3208 os.chdir(old_dir)
3209
3210 def test_repr(self):
3211 entry = self.create_file_entry()
3212 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3213
Brett Cannon96881cd2016-06-10 14:37:21 -07003214 def test_fspath_protocol(self):
3215 entry = self.create_file_entry()
3216 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3217
3218 def test_fspath_protocol_bytes(self):
3219 bytes_filename = os.fsencode('bytesfile.txt')
3220 bytes_entry = self.create_file_entry(name=bytes_filename)
3221 fspath = os.fspath(bytes_entry)
3222 self.assertIsInstance(fspath, bytes)
3223 self.assertEqual(fspath,
3224 os.path.join(os.fsencode(self.path),bytes_filename))
3225
Victor Stinner6036e442015-03-08 01:58:04 +01003226 def test_removed_dir(self):
3227 path = os.path.join(self.path, 'dir')
3228
3229 os.mkdir(path)
3230 entry = self.get_entry('dir')
3231 os.rmdir(path)
3232
3233 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3234 if os.name == 'nt':
3235 self.assertTrue(entry.is_dir())
3236 self.assertFalse(entry.is_file())
3237 self.assertFalse(entry.is_symlink())
3238 if os.name == 'nt':
3239 self.assertRaises(FileNotFoundError, entry.inode)
3240 # don't fail
3241 entry.stat()
3242 entry.stat(follow_symlinks=False)
3243 else:
3244 self.assertGreater(entry.inode(), 0)
3245 self.assertRaises(FileNotFoundError, entry.stat)
3246 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3247
3248 def test_removed_file(self):
3249 entry = self.create_file_entry()
3250 os.unlink(entry.path)
3251
3252 self.assertFalse(entry.is_dir())
3253 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3254 if os.name == 'nt':
3255 self.assertTrue(entry.is_file())
3256 self.assertFalse(entry.is_symlink())
3257 if os.name == 'nt':
3258 self.assertRaises(FileNotFoundError, entry.inode)
3259 # don't fail
3260 entry.stat()
3261 entry.stat(follow_symlinks=False)
3262 else:
3263 self.assertGreater(entry.inode(), 0)
3264 self.assertRaises(FileNotFoundError, entry.stat)
3265 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3266
3267 def test_broken_symlink(self):
3268 if not support.can_symlink():
3269 return self.skipTest('cannot create symbolic link')
3270
3271 filename = self.create_file("file.txt")
3272 os.symlink(filename,
3273 os.path.join(self.path, "symlink.txt"))
3274 entries = self.get_entries(['file.txt', 'symlink.txt'])
3275 entry = entries['symlink.txt']
3276 os.unlink(filename)
3277
3278 self.assertGreater(entry.inode(), 0)
3279 self.assertFalse(entry.is_dir())
3280 self.assertFalse(entry.is_file()) # broken symlink returns False
3281 self.assertFalse(entry.is_dir(follow_symlinks=False))
3282 self.assertFalse(entry.is_file(follow_symlinks=False))
3283 self.assertTrue(entry.is_symlink())
3284 self.assertRaises(FileNotFoundError, entry.stat)
3285 # don't fail
3286 entry.stat(follow_symlinks=False)
3287
3288 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003289 self.create_file("file.txt")
3290
3291 path_bytes = os.fsencode(self.path)
3292 entries = list(os.scandir(path_bytes))
3293 self.assertEqual(len(entries), 1, entries)
3294 entry = entries[0]
3295
3296 self.assertEqual(entry.name, b'file.txt')
3297 self.assertEqual(entry.path,
3298 os.fsencode(os.path.join(self.path, 'file.txt')))
3299
3300 def test_empty_path(self):
3301 self.assertRaises(FileNotFoundError, os.scandir, '')
3302
3303 def test_consume_iterator_twice(self):
3304 self.create_file("file.txt")
3305 iterator = os.scandir(self.path)
3306
3307 entries = list(iterator)
3308 self.assertEqual(len(entries), 1, entries)
3309
3310 # check than consuming the iterator twice doesn't raise exception
3311 entries2 = list(iterator)
3312 self.assertEqual(len(entries2), 0, entries2)
3313
3314 def test_bad_path_type(self):
3315 for obj in [1234, 1.234, {}, []]:
3316 self.assertRaises(TypeError, os.scandir, obj)
3317
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003318 def test_close(self):
3319 self.create_file("file.txt")
3320 self.create_file("file2.txt")
3321 iterator = os.scandir(self.path)
3322 next(iterator)
3323 iterator.close()
3324 # multiple closes
3325 iterator.close()
3326 with self.check_no_resource_warning():
3327 del iterator
3328
3329 def test_context_manager(self):
3330 self.create_file("file.txt")
3331 self.create_file("file2.txt")
3332 with os.scandir(self.path) as iterator:
3333 next(iterator)
3334 with self.check_no_resource_warning():
3335 del iterator
3336
3337 def test_context_manager_close(self):
3338 self.create_file("file.txt")
3339 self.create_file("file2.txt")
3340 with os.scandir(self.path) as iterator:
3341 next(iterator)
3342 iterator.close()
3343
3344 def test_context_manager_exception(self):
3345 self.create_file("file.txt")
3346 self.create_file("file2.txt")
3347 with self.assertRaises(ZeroDivisionError):
3348 with os.scandir(self.path) as iterator:
3349 next(iterator)
3350 1/0
3351 with self.check_no_resource_warning():
3352 del iterator
3353
3354 def test_resource_warning(self):
3355 self.create_file("file.txt")
3356 self.create_file("file2.txt")
3357 iterator = os.scandir(self.path)
3358 next(iterator)
3359 with self.assertWarns(ResourceWarning):
3360 del iterator
3361 support.gc_collect()
3362 # exhausted iterator
3363 iterator = os.scandir(self.path)
3364 list(iterator)
3365 with self.check_no_resource_warning():
3366 del iterator
3367
Victor Stinner6036e442015-03-08 01:58:04 +01003368
Ethan Furmancdc08792016-06-02 15:06:09 -07003369class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003370
3371 # Abstracted so it can be overridden to test pure Python implementation
3372 # if a C version is provided.
3373 fspath = staticmethod(os.fspath)
3374
Ethan Furmancdc08792016-06-02 15:06:09 -07003375 def test_return_bytes(self):
3376 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003377 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003378
3379 def test_return_string(self):
3380 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003381 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003382
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003383 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003384 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003385 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003386
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003387 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003388 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3389 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3390
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003391 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003392 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3393 self.assertTrue(issubclass(_PathLike, os.PathLike))
3394 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003395
Ethan Furmancdc08792016-06-02 15:06:09 -07003396 def test_garbage_in_exception_out(self):
3397 vapor = type('blah', (), {})
3398 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003399 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003400
3401 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003402 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003403
Brett Cannon044283a2016-07-15 10:41:49 -07003404 def test_bad_pathlike(self):
3405 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003406 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003407 # __fspath__ attribute that is not callable.
3408 c = type('foo', (), {})
3409 c.__fspath__ = 1
3410 self.assertRaises(TypeError, self.fspath, c())
3411 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003412 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003413 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003414
3415# Only test if the C version is provided, otherwise TestPEP519 already tested
3416# the pure Python implementation.
3417if hasattr(os, "_fspath"):
3418 class TestPEP519PurePython(TestPEP519):
3419
3420 """Explicitly test the pure Python implementation of os.fspath()."""
3421
3422 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003423
3424
Fred Drake2e2be372001-09-20 21:33:42 +00003425if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003426 unittest.main()