blob: 8612ec9edb535690c1eb0fe391bd5812adea1637 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010059except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050060 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
Berker Peksag4af23d72016-09-15 20:32:44 +030093def requires_os_func(name):
94 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
95
96
Brett Cannonec6ce872016-09-06 15:50:29 -070097class _PathLike(os.PathLike):
98
99 def __init__(self, path=""):
100 self.path = path
101
102 def __str__(self):
103 return str(self.path)
104
105 def __fspath__(self):
106 if isinstance(self.path, BaseException):
107 raise self.path
108 else:
109 return self.path
110
111
Victor Stinnerae39d232016-03-24 17:12:55 +0100112def create_file(filename, content=b'content'):
113 with open(filename, "xb", 0) as fp:
114 fp.write(content)
115
116
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117# Tests creating TESTFN
118class FileTests(unittest.TestCase):
119 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000120 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000122 tearDown = setUp
123
124 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000125 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000126 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000127 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000128
Christian Heimesfdab48e2008-01-20 09:06:41 +0000129 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000130 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
131 # We must allocate two consecutive file descriptors, otherwise
132 # it will mess up other file descriptors (perhaps even the three
133 # standard ones).
134 second = os.dup(first)
135 try:
136 retries = 0
137 while second != first + 1:
138 os.close(first)
139 retries += 1
140 if retries > 10:
141 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000142 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000143 first, second = second, os.dup(second)
144 finally:
145 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000146 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000147 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000148 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000149
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000150 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000151 def test_rename(self):
152 path = support.TESTFN
153 old = sys.getrefcount(path)
154 self.assertRaises(TypeError, os.rename, path, 0)
155 new = sys.getrefcount(path)
156 self.assertEqual(old, new)
157
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000158 def test_read(self):
159 with open(support.TESTFN, "w+b") as fobj:
160 fobj.write(b"spam")
161 fobj.flush()
162 fd = fobj.fileno()
163 os.lseek(fd, 0, 0)
164 s = os.read(fd, 4)
165 self.assertEqual(type(s), bytes)
166 self.assertEqual(s, b"spam")
167
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200168 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200169 # Skip the test on 32-bit platforms: the number of bytes must fit in a
170 # Py_ssize_t type
171 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
172 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200173 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
174 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200175 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100176 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200177
178 # Issue #21932: Make sure that os.read() does not raise an
179 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200180 with open(support.TESTFN, "rb") as fp:
181 data = os.read(fp.fileno(), size)
182
183 # The test does not try to read more than 2 GB at once because the
184 # operating system is free to return less bytes than requested.
185 self.assertEqual(data, b'test')
186
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000187 def test_write(self):
188 # os.write() accepts bytes- and buffer-like objects but not strings
189 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
190 self.assertRaises(TypeError, os.write, fd, "beans")
191 os.write(fd, b"bacon\n")
192 os.write(fd, bytearray(b"eggs\n"))
193 os.write(fd, memoryview(b"spam\n"))
194 os.close(fd)
195 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000196 self.assertEqual(fobj.read().splitlines(),
197 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000198
Victor Stinnere0daff12011-03-20 23:36:35 +0100199 def write_windows_console(self, *args):
200 retcode = subprocess.call(args,
201 # use a new console to not flood the test output
202 creationflags=subprocess.CREATE_NEW_CONSOLE,
203 # use a shell to hide the console window (SW_HIDE)
204 shell=True)
205 self.assertEqual(retcode, 0)
206
207 @unittest.skipUnless(sys.platform == 'win32',
208 'test specific to the Windows console')
209 def test_write_windows_console(self):
210 # Issue #11395: the Windows console returns an error (12: not enough
211 # space error) on writing into stdout if stdout mode is binary and the
212 # length is greater than 66,000 bytes (or less, depending on heap
213 # usage).
214 code = "print('x' * 100000)"
215 self.write_windows_console(sys.executable, "-c", code)
216 self.write_windows_console(sys.executable, "-u", "-c", code)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 def fdopen_helper(self, *args):
219 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200220 f = os.fdopen(fd, *args)
221 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000222
223 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200224 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
225 os.close(fd)
226
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000227 self.fdopen_helper()
228 self.fdopen_helper('r')
229 self.fdopen_helper('r', 100)
230
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100231 def test_replace(self):
232 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100233 self.addCleanup(support.unlink, support.TESTFN)
234 self.addCleanup(support.unlink, TESTFN2)
235
236 create_file(support.TESTFN, b"1")
237 create_file(TESTFN2, b"2")
238
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100239 os.replace(support.TESTFN, TESTFN2)
240 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
241 with open(TESTFN2, 'r') as f:
242 self.assertEqual(f.read(), "1")
243
Martin Panterbf19d162015-09-09 01:01:13 +0000244 def test_open_keywords(self):
245 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
246 dir_fd=None)
247 os.close(f)
248
249 def test_symlink_keywords(self):
250 symlink = support.get_attribute(os, "symlink")
251 try:
252 symlink(src='target', dst=support.TESTFN,
253 target_is_directory=False, dir_fd=None)
254 except (NotImplementedError, OSError):
255 pass # No OS support or unprivileged user
256
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258# Test attributes on return values from os.*stat* family.
259class StatAttributeTests(unittest.TestCase):
260 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200261 self.fname = support.TESTFN
262 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100263 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000267 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268
269 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000270 self.assertEqual(result[stat.ST_SIZE], 3)
271 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 # Make sure all the attributes are there
274 members = dir(result)
275 for name in dir(stat):
276 if name[:3] == 'ST_':
277 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000278 if name.endswith("TIME"):
279 def trunc(x): return int(x)
280 else:
281 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000282 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285
Larry Hastings6fe20b32012-04-19 15:07:49 -0700286 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700287 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700288 for name in 'st_atime st_mtime st_ctime'.split():
289 floaty = int(getattr(result, name) * 100000)
290 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700291 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700292
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 try:
294 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except IndexError:
297 pass
298
299 # Make sure that assignment fails
300 try:
301 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200302 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000303 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000304 pass
305
306 try:
307 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200308 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000309 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000310 pass
311
312 try:
313 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200314 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315 except AttributeError:
316 pass
317
318 # Use the stat_result constructor with a too-short tuple.
319 try:
320 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200321 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322 except TypeError:
323 pass
324
Ezio Melotti42da6632011-03-15 05:18:48 +0200325 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000326 try:
327 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
328 except TypeError:
329 pass
330
Antoine Pitrou38425292010-09-21 18:19:07 +0000331 def test_stat_attributes(self):
332 self.check_stat_attributes(self.fname)
333
334 def test_stat_attributes_bytes(self):
335 try:
336 fname = self.fname.encode(sys.getfilesystemencoding())
337 except UnicodeEncodeError:
338 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700339 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000340
Christian Heimes25827622013-10-12 01:27:08 +0200341 def test_stat_result_pickle(self):
342 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200343 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
344 p = pickle.dumps(result, proto)
345 self.assertIn(b'stat_result', p)
346 if proto < 4:
347 self.assertIn(b'cos\nstat_result\n', p)
348 unpickled = pickle.loads(p)
349 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200350
Serhiy Storchaka43767632013-11-03 21:31:38 +0200351 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000352 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000353 try:
354 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000355 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000357 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200358 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359
360 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000361 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000363 # Make sure all the attributes are there.
364 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
365 'ffree', 'favail', 'flag', 'namemax')
366 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000367 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000368
369 # Make sure that assignment really fails
370 try:
371 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200372 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000373 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374 pass
375
376 try:
377 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200378 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000379 except AttributeError:
380 pass
381
382 # Use the constructor with a too-short tuple.
383 try:
384 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200385 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386 except TypeError:
387 pass
388
Ezio Melotti42da6632011-03-15 05:18:48 +0200389 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390 try:
391 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
392 except TypeError:
393 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000394
Christian Heimes25827622013-10-12 01:27:08 +0200395 @unittest.skipUnless(hasattr(os, 'statvfs'),
396 "need os.statvfs()")
397 def test_statvfs_result_pickle(self):
398 try:
399 result = os.statvfs(self.fname)
400 except OSError as e:
401 # On AtheOS, glibc always returns ENOSYS
402 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200403 self.skipTest('os.statvfs() failed with ENOSYS')
404
Serhiy Storchakabad12572014-12-15 14:03:42 +0200405 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
406 p = pickle.dumps(result, proto)
407 self.assertIn(b'statvfs_result', p)
408 if proto < 4:
409 self.assertIn(b'cos\nstatvfs_result\n', p)
410 unpickled = pickle.loads(p)
411 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200412
Serhiy Storchaka43767632013-11-03 21:31:38 +0200413 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
414 def test_1686475(self):
415 # Verify that an open file can be stat'ed
416 try:
417 os.stat(r"c:\pagefile.sys")
418 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600419 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200420 except OSError as e:
421 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000422
Serhiy Storchaka43767632013-11-03 21:31:38 +0200423 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
424 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
425 def test_15261(self):
426 # Verify that stat'ing a closed fd does not cause crash
427 r, w = os.pipe()
428 try:
429 os.stat(r) # should not raise error
430 finally:
431 os.close(r)
432 os.close(w)
433 with self.assertRaises(OSError) as ctx:
434 os.stat(r)
435 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100436
Zachary Ware63f277b2014-06-19 09:46:37 -0500437 def check_file_attributes(self, result):
438 self.assertTrue(hasattr(result, 'st_file_attributes'))
439 self.assertTrue(isinstance(result.st_file_attributes, int))
440 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
441
442 @unittest.skipUnless(sys.platform == "win32",
443 "st_file_attributes is Win32 specific")
444 def test_file_attributes(self):
445 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
446 result = os.stat(self.fname)
447 self.check_file_attributes(result)
448 self.assertEqual(
449 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
450 0)
451
452 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200453 dirname = support.TESTFN + "dir"
454 os.mkdir(dirname)
455 self.addCleanup(os.rmdir, dirname)
456
457 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500458 self.check_file_attributes(result)
459 self.assertEqual(
460 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
461 stat.FILE_ATTRIBUTE_DIRECTORY)
462
Berker Peksag0b4dc482016-09-17 15:49:59 +0300463 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
464 def test_access_denied(self):
465 # Default to FindFirstFile WIN32_FIND_DATA when access is
466 # denied. See issue 28075.
467 # os.environ['TEMP'] should be located on a volume that
468 # supports file ACLs.
469 fname = os.path.join(os.environ['TEMP'], self.fname)
470 self.addCleanup(support.unlink, fname)
471 create_file(fname, b'ABC')
472 # Deny the right to [S]YNCHRONIZE on the file to
473 # force CreateFile to fail with ERROR_ACCESS_DENIED.
474 DETACHED_PROCESS = 8
475 subprocess.check_call(
Denis Osipovca1b66f2017-06-08 17:02:05 +0500476 # bpo-30584: Use security identifier *S-1-5-32-545 instead
477 # of localized "Users" to not depend on the locale.
478 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300479 creationflags=DETACHED_PROCESS
480 )
481 result = os.stat(fname)
482 self.assertNotEqual(result.st_size, 0)
483
Victor Stinner47aacc82015-06-12 17:26:23 +0200484
485class UtimeTests(unittest.TestCase):
486 def setUp(self):
487 self.dirname = support.TESTFN
488 self.fname = os.path.join(self.dirname, "f1")
489
490 self.addCleanup(support.rmtree, self.dirname)
491 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100492 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200493
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200494 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100495 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200496 os.stat_float_times(state)
497
Victor Stinner47aacc82015-06-12 17:26:23 +0200498 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100499 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200500 old_float_times = os.stat_float_times(-1)
501 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200502
503 os.stat_float_times(True)
504
505 def support_subsecond(self, filename):
506 # Heuristic to check if the filesystem supports timestamp with
507 # subsecond resolution: check if float and int timestamps are different
508 st = os.stat(filename)
509 return ((st.st_atime != st[7])
510 or (st.st_mtime != st[8])
511 or (st.st_ctime != st[9]))
512
513 def _test_utime(self, set_time, filename=None):
514 if not filename:
515 filename = self.fname
516
517 support_subsecond = self.support_subsecond(filename)
518 if support_subsecond:
519 # Timestamp with a resolution of 1 microsecond (10^-6).
520 #
521 # The resolution of the C internal function used by os.utime()
522 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
523 # test with a resolution of 1 ns requires more work:
524 # see the issue #15745.
525 atime_ns = 1002003000 # 1.002003 seconds
526 mtime_ns = 4005006000 # 4.005006 seconds
527 else:
528 # use a resolution of 1 second
529 atime_ns = 5 * 10**9
530 mtime_ns = 8 * 10**9
531
532 set_time(filename, (atime_ns, mtime_ns))
533 st = os.stat(filename)
534
535 if support_subsecond:
536 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
537 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
538 else:
539 self.assertEqual(st.st_atime, atime_ns * 1e-9)
540 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
541 self.assertEqual(st.st_atime_ns, atime_ns)
542 self.assertEqual(st.st_mtime_ns, mtime_ns)
543
544 def test_utime(self):
545 def set_time(filename, ns):
546 # test the ns keyword parameter
547 os.utime(filename, ns=ns)
548 self._test_utime(set_time)
549
550 @staticmethod
551 def ns_to_sec(ns):
552 # Convert a number of nanosecond (int) to a number of seconds (float).
553 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
554 # issue, os.utime() rounds towards minus infinity.
555 return (ns * 1e-9) + 0.5e-9
556
557 def test_utime_by_indexed(self):
558 # pass times as floating point seconds as the second indexed parameter
559 def set_time(filename, ns):
560 atime_ns, mtime_ns = ns
561 atime = self.ns_to_sec(atime_ns)
562 mtime = self.ns_to_sec(mtime_ns)
563 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
564 # or utime(time_t)
565 os.utime(filename, (atime, mtime))
566 self._test_utime(set_time)
567
568 def test_utime_by_times(self):
569 def set_time(filename, ns):
570 atime_ns, mtime_ns = ns
571 atime = self.ns_to_sec(atime_ns)
572 mtime = self.ns_to_sec(mtime_ns)
573 # test the times keyword parameter
574 os.utime(filename, times=(atime, mtime))
575 self._test_utime(set_time)
576
577 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
578 "follow_symlinks support for utime required "
579 "for this test.")
580 def test_utime_nofollow_symlinks(self):
581 def set_time(filename, ns):
582 # use follow_symlinks=False to test utimensat(timespec)
583 # or lutimes(timeval)
584 os.utime(filename, ns=ns, follow_symlinks=False)
585 self._test_utime(set_time)
586
587 @unittest.skipUnless(os.utime in os.supports_fd,
588 "fd support for utime required for this test.")
589 def test_utime_fd(self):
590 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100591 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200592 # use a file descriptor to test futimens(timespec)
593 # or futimes(timeval)
594 os.utime(fp.fileno(), ns=ns)
595 self._test_utime(set_time)
596
597 @unittest.skipUnless(os.utime in os.supports_dir_fd,
598 "dir_fd support for utime required for this test.")
599 def test_utime_dir_fd(self):
600 def set_time(filename, ns):
601 dirname, name = os.path.split(filename)
602 dirfd = os.open(dirname, os.O_RDONLY)
603 try:
604 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
605 os.utime(name, dir_fd=dirfd, ns=ns)
606 finally:
607 os.close(dirfd)
608 self._test_utime(set_time)
609
610 def test_utime_directory(self):
611 def set_time(filename, ns):
612 # test calling os.utime() on a directory
613 os.utime(filename, ns=ns)
614 self._test_utime(set_time, filename=self.dirname)
615
616 def _test_utime_current(self, set_time):
617 # Get the system clock
618 current = time.time()
619
620 # Call os.utime() to set the timestamp to the current system clock
621 set_time(self.fname)
622
623 if not self.support_subsecond(self.fname):
624 delta = 1.0
Victor Stinnerc2a506e2017-06-14 14:26:52 +0200625 elif os.name == 'nt':
626 # On Windows, the usual resolution of time.time() is 15.6 ms.
627 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
628 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200629 else:
Victor Stinnerc2a506e2017-06-14 14:26:52 +0200630 # bpo-30649: PPC64 Fedora 3.x buildbot requires
631 # at least a delta of 14 ms
Victor Stinner47aacc82015-06-12 17:26:23 +0200632 delta = 0.020
633 st = os.stat(self.fname)
634 msg = ("st_time=%r, current=%r, dt=%r"
635 % (st.st_mtime, current, st.st_mtime - current))
636 self.assertAlmostEqual(st.st_mtime, current,
637 delta=delta, msg=msg)
638
639 def test_utime_current(self):
640 def set_time(filename):
641 # Set to the current time in the new way
642 os.utime(self.fname)
643 self._test_utime_current(set_time)
644
645 def test_utime_current_old(self):
646 def set_time(filename):
647 # Set to the current time in the old explicit way.
648 os.utime(self.fname, None)
649 self._test_utime_current(set_time)
650
651 def get_file_system(self, path):
652 if sys.platform == 'win32':
653 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
654 import ctypes
655 kernel32 = ctypes.windll.kernel32
656 buf = ctypes.create_unicode_buffer("", 100)
657 ok = kernel32.GetVolumeInformationW(root, None, 0,
658 None, None, None,
659 buf, len(buf))
660 if ok:
661 return buf.value
662 # return None if the filesystem is unknown
663
664 def test_large_time(self):
665 # Many filesystems are limited to the year 2038. At least, the test
666 # pass with NTFS filesystem.
667 if self.get_file_system(self.dirname) != "NTFS":
668 self.skipTest("requires NTFS")
669
670 large = 5000000000 # some day in 2128
671 os.utime(self.fname, (large, large))
672 self.assertEqual(os.stat(self.fname).st_mtime, large)
673
674 def test_utime_invalid_arguments(self):
675 # seconds and nanoseconds parameters are mutually exclusive
676 with self.assertRaises(ValueError):
677 os.utime(self.fname, (5, 5), ns=(5, 5))
678
679
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000680from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000681
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000682class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000683 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000684 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000685
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000686 def setUp(self):
687 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000688 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000689 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000690 for key, value in self._reference().items():
691 os.environ[key] = value
692
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000693 def tearDown(self):
694 os.environ.clear()
695 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000696 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000697 os.environb.clear()
698 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000699
Christian Heimes90333392007-11-01 19:08:42 +0000700 def _reference(self):
701 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
702
703 def _empty_mapping(self):
704 os.environ.clear()
705 return os.environ
706
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000707 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200708 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
709 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000710 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000711 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300712 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200713 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300714 value = popen.read().strip()
715 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000716
Xavier de Gayed1415312016-07-22 12:15:29 +0200717 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
718 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000719 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200720 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
721 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300722 it = iter(popen)
723 self.assertEqual(next(it), "line1\n")
724 self.assertEqual(next(it), "line2\n")
725 self.assertEqual(next(it), "line3\n")
726 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000727
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000728 # Verify environ keys and values from the OS are of the
729 # correct str type.
730 def test_keyvalue_types(self):
731 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000732 self.assertEqual(type(key), str)
733 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000734
Christian Heimes90333392007-11-01 19:08:42 +0000735 def test_items(self):
736 for key, value in self._reference().items():
737 self.assertEqual(os.environ.get(key), value)
738
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000739 # Issue 7310
740 def test___repr__(self):
741 """Check that the repr() of os.environ looks like environ({...})."""
742 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000743 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
744 '{!r}: {!r}'.format(key, value)
745 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000746
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000747 def test_get_exec_path(self):
748 defpath_list = os.defpath.split(os.pathsep)
749 test_path = ['/monty', '/python', '', '/flying/circus']
750 test_env = {'PATH': os.pathsep.join(test_path)}
751
752 saved_environ = os.environ
753 try:
754 os.environ = dict(test_env)
755 # Test that defaulting to os.environ works.
756 self.assertSequenceEqual(test_path, os.get_exec_path())
757 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
758 finally:
759 os.environ = saved_environ
760
761 # No PATH environment variable
762 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
763 # Empty PATH environment variable
764 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
765 # Supplied PATH environment variable
766 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
767
Victor Stinnerb745a742010-05-18 17:17:23 +0000768 if os.supports_bytes_environ:
769 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000770 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000771 # ignore BytesWarning warning
772 with warnings.catch_warnings(record=True):
773 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000774 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000775 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000776 pass
777 else:
778 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000779
780 # bytes key and/or value
781 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
782 ['abc'])
783 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
784 ['abc'])
785 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
786 ['abc'])
787
788 @unittest.skipUnless(os.supports_bytes_environ,
789 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000790 def test_environb(self):
791 # os.environ -> os.environb
792 value = 'euro\u20ac'
793 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000794 value_bytes = value.encode(sys.getfilesystemencoding(),
795 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000796 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000797 msg = "U+20AC character is not encodable to %s" % (
798 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000799 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000801 self.assertEqual(os.environ['unicode'], value)
802 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000803
804 # os.environb -> os.environ
805 value = b'\xff'
806 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000807 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000808 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000809 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000810
Charles-François Natali2966f102011-11-26 11:32:46 +0100811 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
812 # #13415).
813 @support.requires_freebsd_version(7)
814 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100815 def test_unset_error(self):
816 if sys.platform == "win32":
817 # an environment variable is limited to 32,767 characters
818 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100819 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100820 else:
821 # "=" is not allowed in a variable name
822 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100823 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100824
Victor Stinner6d101392013-04-14 16:35:04 +0200825 def test_key_type(self):
826 missing = 'missingkey'
827 self.assertNotIn(missing, os.environ)
828
Victor Stinner839e5ea2013-04-14 16:43:03 +0200829 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200830 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200831 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200832 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200833
Victor Stinner839e5ea2013-04-14 16:43:03 +0200834 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200835 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200836 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200837 self.assertTrue(cm.exception.__suppress_context__)
838
Victor Stinner6d101392013-04-14 16:35:04 +0200839
Tim Petersc4e09402003-04-25 07:11:48 +0000840class WalkTests(unittest.TestCase):
841 """Tests for os.walk()."""
842
Victor Stinner0561c532015-03-12 10:28:24 +0100843 # Wrapper to hide minor differences between os.walk and os.fwalk
844 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200845 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200846 if 'follow_symlinks' in kwargs:
847 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200848 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100849
Charles-François Natali7372b062012-02-05 15:15:38 +0100850 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100851 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100852 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000853
854 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000855 # TESTFN/
856 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000857 # tmp1
858 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000859 # tmp2
860 # SUB11/ no kids
861 # SUB2/ a file kid and a dirsymlink kid
862 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300863 # SUB21/ not readable
864 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000865 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200866 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300867 # broken_link2
868 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000869 # TEST2/
870 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100871 self.walk_path = join(support.TESTFN, "TEST1")
872 self.sub1_path = join(self.walk_path, "SUB1")
873 self.sub11_path = join(self.sub1_path, "SUB11")
874 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300875 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100876 tmp1_path = join(self.walk_path, "tmp1")
877 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000878 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300879 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100880 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000881 t2_path = join(support.TESTFN, "TEST2")
882 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200883 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300884 broken_link2_path = join(sub2_path, "broken_link2")
885 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000886
887 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100888 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000889 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300890 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000891 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100892
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300893 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100894 with open(path, "x") as f:
895 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000896
Victor Stinner0561c532015-03-12 10:28:24 +0100897 if support.can_symlink():
898 os.symlink(os.path.abspath(t2_path), self.link_path)
899 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300900 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
901 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300902 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300903 ["broken_link", "broken_link2", "broken_link3",
904 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100905 else:
906 self.sub2_tree = (sub2_path, [], ["tmp3"])
907
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300908 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300909 try:
910 os.listdir(sub21_path)
911 except PermissionError:
912 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
913 else:
914 os.chmod(sub21_path, stat.S_IRWXU)
915 os.unlink(tmp5_path)
916 os.rmdir(sub21_path)
917 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300918
Victor Stinner0561c532015-03-12 10:28:24 +0100919 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000920 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300921 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100922
Tim Petersc4e09402003-04-25 07:11:48 +0000923 self.assertEqual(len(all), 4)
924 # We can't know which order SUB1 and SUB2 will appear in.
925 # Not flipped: TESTFN, SUB1, SUB11, SUB2
926 # flipped: TESTFN, SUB2, SUB1, SUB11
927 flipped = all[0][1][0] != "SUB1"
928 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200929 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300930 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100931 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
932 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
933 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
934 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000935
Brett Cannon3f9183b2016-08-26 14:44:48 -0700936 def test_walk_prune(self, walk_path=None):
937 if walk_path is None:
938 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000939 # Prune the search.
940 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700941 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000942 all.append((root, dirs, files))
943 # Don't descend into SUB1.
944 if 'SUB1' in dirs:
945 # Note that this also mutates the dirs we appended to all!
946 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000947
Victor Stinner0561c532015-03-12 10:28:24 +0100948 self.assertEqual(len(all), 2)
949 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700950 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100951
952 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300953 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100954 self.assertEqual(all[1], self.sub2_tree)
955
Brett Cannon3f9183b2016-08-26 14:44:48 -0700956 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700957 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700958
Victor Stinner0561c532015-03-12 10:28:24 +0100959 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000960 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100961 all = list(self.walk(self.walk_path, topdown=False))
962
Victor Stinner53b0a412016-03-26 01:12:36 +0100963 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000964 # We can't know which order SUB1 and SUB2 will appear in.
965 # Not flipped: SUB11, SUB1, SUB2, TESTFN
966 # flipped: SUB2, SUB11, SUB1, TESTFN
967 flipped = all[3][1][0] != "SUB1"
968 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200969 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300970 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100971 self.assertEqual(all[3],
972 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
973 self.assertEqual(all[flipped],
974 (self.sub11_path, [], []))
975 self.assertEqual(all[flipped + 1],
976 (self.sub1_path, ["SUB11"], ["tmp2"]))
977 self.assertEqual(all[2 - 2 * flipped],
978 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000979
Victor Stinner0561c532015-03-12 10:28:24 +0100980 def test_walk_symlink(self):
981 if not support.can_symlink():
982 self.skipTest("need symlink support")
983
984 # Walk, following symlinks.
985 walk_it = self.walk(self.walk_path, follow_symlinks=True)
986 for root, dirs, files in walk_it:
987 if root == self.link_path:
988 self.assertEqual(dirs, [])
989 self.assertEqual(files, ["tmp4"])
990 break
991 else:
992 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000993
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200994 def test_walk_bad_dir(self):
995 # Walk top-down.
996 errors = []
997 walk_it = self.walk(self.walk_path, onerror=errors.append)
998 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300999 self.assertEqual(errors, [])
1000 dir1 = 'SUB1'
1001 path1 = os.path.join(root, dir1)
1002 path1new = os.path.join(root, dir1 + '.new')
1003 os.rename(path1, path1new)
1004 try:
1005 roots = [r for r, d, f in walk_it]
1006 self.assertTrue(errors)
1007 self.assertNotIn(path1, roots)
1008 self.assertNotIn(path1new, roots)
1009 for dir2 in dirs:
1010 if dir2 != dir1:
1011 self.assertIn(os.path.join(root, dir2), roots)
1012 finally:
1013 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001014
Charles-François Natali7372b062012-02-05 15:15:38 +01001015
1016@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1017class FwalkTests(WalkTests):
1018 """Tests for os.fwalk()."""
1019
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001020 def walk(self, top, **kwargs):
1021 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001022 yield (root, dirs, files)
1023
Larry Hastingsc48fe982012-06-25 04:49:05 -07001024 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1025 """
1026 compare with walk() results.
1027 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001028 walk_kwargs = walk_kwargs.copy()
1029 fwalk_kwargs = fwalk_kwargs.copy()
1030 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1031 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1032 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001033
Charles-François Natali7372b062012-02-05 15:15:38 +01001034 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001035 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001036 expected[root] = (set(dirs), set(files))
1037
Larry Hastingsc48fe982012-06-25 04:49:05 -07001038 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001039 self.assertIn(root, expected)
1040 self.assertEqual(expected[root], (set(dirs), set(files)))
1041
Larry Hastingsc48fe982012-06-25 04:49:05 -07001042 def test_compare_to_walk(self):
1043 kwargs = {'top': support.TESTFN}
1044 self._compare_to_walk(kwargs, kwargs)
1045
Charles-François Natali7372b062012-02-05 15:15:38 +01001046 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001047 try:
1048 fd = os.open(".", os.O_RDONLY)
1049 walk_kwargs = {'top': support.TESTFN}
1050 fwalk_kwargs = walk_kwargs.copy()
1051 fwalk_kwargs['dir_fd'] = fd
1052 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1053 finally:
1054 os.close(fd)
1055
1056 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001057 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001058 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1059 args = support.TESTFN, topdown, None
1060 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001061 # check that the FD is valid
1062 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001063 # redundant check
1064 os.stat(rootfd)
1065 # check that listdir() returns consistent information
1066 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001067
1068 def test_fd_leak(self):
1069 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1070 # we both check that calling fwalk() a large number of times doesn't
1071 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1072 minfd = os.dup(1)
1073 os.close(minfd)
1074 for i in range(256):
1075 for x in os.fwalk(support.TESTFN):
1076 pass
1077 newfd = os.dup(1)
1078 self.addCleanup(os.close, newfd)
1079 self.assertEqual(newfd, minfd)
1080
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001081class BytesWalkTests(WalkTests):
1082 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001083 def setUp(self):
1084 super().setUp()
1085 self.stack = contextlib.ExitStack()
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001086
1087 def tearDown(self):
1088 self.stack.close()
1089 super().tearDown()
1090
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001091 def walk(self, top, **kwargs):
1092 if 'follow_symlinks' in kwargs:
1093 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1094 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1095 root = os.fsdecode(broot)
1096 dirs = list(map(os.fsdecode, bdirs))
1097 files = list(map(os.fsdecode, bfiles))
1098 yield (root, dirs, files)
1099 bdirs[:] = list(map(os.fsencode, dirs))
1100 bfiles[:] = list(map(os.fsencode, files))
1101
Charles-François Natali7372b062012-02-05 15:15:38 +01001102
Guido van Rossume7ba4952007-06-06 23:52:48 +00001103class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001104 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001105 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001106
1107 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1110 os.makedirs(path) # Should work
1111 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1112 os.makedirs(path)
1113
1114 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001115 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001116 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1117 os.makedirs(path)
1118 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1119 'dir5', 'dir6')
1120 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001121
Terry Reedy5a22b652010-12-02 07:05:56 +00001122 def test_exist_ok_existing_directory(self):
1123 path = os.path.join(support.TESTFN, 'dir1')
1124 mode = 0o777
1125 old_mask = os.umask(0o022)
1126 os.makedirs(path, mode)
1127 self.assertRaises(OSError, os.makedirs, path, mode)
1128 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001129 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001130 os.makedirs(path, mode=mode, exist_ok=True)
1131 os.umask(old_mask)
1132
Martin Pantera82642f2015-11-19 04:48:44 +00001133 # Issue #25583: A drive root could raise PermissionError on Windows
1134 os.makedirs(os.path.abspath('/'), exist_ok=True)
1135
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001136 def test_exist_ok_s_isgid_directory(self):
1137 path = os.path.join(support.TESTFN, 'dir1')
1138 S_ISGID = stat.S_ISGID
1139 mode = 0o777
1140 old_mask = os.umask(0o022)
1141 try:
1142 existing_testfn_mode = stat.S_IMODE(
1143 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001144 try:
1145 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001146 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001147 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001148 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1149 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1150 # The os should apply S_ISGID from the parent dir for us, but
1151 # this test need not depend on that behavior. Be explicit.
1152 os.makedirs(path, mode | S_ISGID)
1153 # http://bugs.python.org/issue14992
1154 # Should not fail when the bit is already set.
1155 os.makedirs(path, mode, exist_ok=True)
1156 # remove the bit.
1157 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001158 # May work even when the bit is not already set when demanded.
1159 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001160 finally:
1161 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001162
1163 def test_exist_ok_existing_regular_file(self):
1164 base = support.TESTFN
1165 path = os.path.join(support.TESTFN, 'dir1')
1166 f = open(path, 'w')
1167 f.write('abc')
1168 f.close()
1169 self.assertRaises(OSError, os.makedirs, path)
1170 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1171 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1172 os.remove(path)
1173
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001174 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001175 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001176 'dir4', 'dir5', 'dir6')
1177 # If the tests failed, the bottom-most directory ('../dir6')
1178 # may not have been created, so we look for the outermost directory
1179 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001180 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001181 path = os.path.dirname(path)
1182
1183 os.removedirs(path)
1184
Andrew Svetlov405faed2012-12-25 12:18:09 +02001185
R David Murrayf2ad1732014-12-25 18:36:56 -05001186@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1187class ChownFileTests(unittest.TestCase):
1188
Berker Peksag036a71b2015-07-21 09:29:48 +03001189 @classmethod
1190 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001191 os.mkdir(support.TESTFN)
1192
1193 def test_chown_uid_gid_arguments_must_be_index(self):
1194 stat = os.stat(support.TESTFN)
1195 uid = stat.st_uid
1196 gid = stat.st_gid
1197 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1198 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1199 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1200 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1201 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1202
1203 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1204 def test_chown(self):
1205 gid_1, gid_2 = groups[:2]
1206 uid = os.stat(support.TESTFN).st_uid
1207 os.chown(support.TESTFN, uid, gid_1)
1208 gid = os.stat(support.TESTFN).st_gid
1209 self.assertEqual(gid, gid_1)
1210 os.chown(support.TESTFN, uid, gid_2)
1211 gid = os.stat(support.TESTFN).st_gid
1212 self.assertEqual(gid, gid_2)
1213
1214 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1215 "test needs root privilege and more than one user")
1216 def test_chown_with_root(self):
1217 uid_1, uid_2 = all_users[:2]
1218 gid = os.stat(support.TESTFN).st_gid
1219 os.chown(support.TESTFN, uid_1, gid)
1220 uid = os.stat(support.TESTFN).st_uid
1221 self.assertEqual(uid, uid_1)
1222 os.chown(support.TESTFN, uid_2, gid)
1223 uid = os.stat(support.TESTFN).st_uid
1224 self.assertEqual(uid, uid_2)
1225
1226 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1227 "test needs non-root account and more than one user")
1228 def test_chown_without_permission(self):
1229 uid_1, uid_2 = all_users[:2]
1230 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001231 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001232 os.chown(support.TESTFN, uid_1, gid)
1233 os.chown(support.TESTFN, uid_2, gid)
1234
Berker Peksag036a71b2015-07-21 09:29:48 +03001235 @classmethod
1236 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001237 os.rmdir(support.TESTFN)
1238
1239
Andrew Svetlov405faed2012-12-25 12:18:09 +02001240class RemoveDirsTests(unittest.TestCase):
1241 def setUp(self):
1242 os.makedirs(support.TESTFN)
1243
1244 def tearDown(self):
1245 support.rmtree(support.TESTFN)
1246
1247 def test_remove_all(self):
1248 dira = os.path.join(support.TESTFN, 'dira')
1249 os.mkdir(dira)
1250 dirb = os.path.join(dira, 'dirb')
1251 os.mkdir(dirb)
1252 os.removedirs(dirb)
1253 self.assertFalse(os.path.exists(dirb))
1254 self.assertFalse(os.path.exists(dira))
1255 self.assertFalse(os.path.exists(support.TESTFN))
1256
1257 def test_remove_partial(self):
1258 dira = os.path.join(support.TESTFN, 'dira')
1259 os.mkdir(dira)
1260 dirb = os.path.join(dira, 'dirb')
1261 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001262 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001263 os.removedirs(dirb)
1264 self.assertFalse(os.path.exists(dirb))
1265 self.assertTrue(os.path.exists(dira))
1266 self.assertTrue(os.path.exists(support.TESTFN))
1267
1268 def test_remove_nothing(self):
1269 dira = os.path.join(support.TESTFN, 'dira')
1270 os.mkdir(dira)
1271 dirb = os.path.join(dira, 'dirb')
1272 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001273 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001274 with self.assertRaises(OSError):
1275 os.removedirs(dirb)
1276 self.assertTrue(os.path.exists(dirb))
1277 self.assertTrue(os.path.exists(dira))
1278 self.assertTrue(os.path.exists(support.TESTFN))
1279
1280
Guido van Rossume7ba4952007-06-06 23:52:48 +00001281class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001282 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001283 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001284 f.write(b'hello')
1285 f.close()
1286 with open(os.devnull, 'rb') as f:
1287 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001288
Andrew Svetlov405faed2012-12-25 12:18:09 +02001289
Guido van Rossume7ba4952007-06-06 23:52:48 +00001290class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001291 def test_urandom_length(self):
1292 self.assertEqual(len(os.urandom(0)), 0)
1293 self.assertEqual(len(os.urandom(1)), 1)
1294 self.assertEqual(len(os.urandom(10)), 10)
1295 self.assertEqual(len(os.urandom(100)), 100)
1296 self.assertEqual(len(os.urandom(1000)), 1000)
1297
1298 def test_urandom_value(self):
1299 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001300 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001301 data2 = os.urandom(16)
1302 self.assertNotEqual(data1, data2)
1303
1304 def get_urandom_subprocess(self, count):
1305 code = '\n'.join((
1306 'import os, sys',
1307 'data = os.urandom(%s)' % count,
1308 'sys.stdout.buffer.write(data)',
1309 'sys.stdout.buffer.flush()'))
1310 out = assert_python_ok('-c', code)
1311 stdout = out[1]
1312 self.assertEqual(len(stdout), 16)
1313 return stdout
1314
1315 def test_urandom_subprocess(self):
1316 data1 = self.get_urandom_subprocess(16)
1317 data2 = self.get_urandom_subprocess(16)
1318 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001319
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001320
Victor Stinner9b1f4742016-09-06 16:18:52 -07001321@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1322class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001323 @classmethod
1324 def setUpClass(cls):
1325 try:
1326 os.getrandom(1)
1327 except OSError as exc:
1328 if exc.errno == errno.ENOSYS:
1329 # Python compiled on a more recent Linux version
1330 # than the current Linux kernel
1331 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1332 else:
1333 raise
1334
Victor Stinner9b1f4742016-09-06 16:18:52 -07001335 def test_getrandom_type(self):
1336 data = os.getrandom(16)
1337 self.assertIsInstance(data, bytes)
1338 self.assertEqual(len(data), 16)
1339
1340 def test_getrandom0(self):
1341 empty = os.getrandom(0)
1342 self.assertEqual(empty, b'')
1343
1344 def test_getrandom_random(self):
1345 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1346
1347 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1348 # resource /dev/random
1349
1350 def test_getrandom_nonblock(self):
1351 # The call must not fail. Check also that the flag exists
1352 try:
1353 os.getrandom(1, os.GRND_NONBLOCK)
1354 except BlockingIOError:
1355 # System urandom is not initialized yet
1356 pass
1357
1358 def test_getrandom_value(self):
1359 data1 = os.getrandom(16)
1360 data2 = os.getrandom(16)
1361 self.assertNotEqual(data1, data2)
1362
1363
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001364# os.urandom() doesn't use a file descriptor when it is implemented with the
1365# getentropy() function, the getrandom() function or the getrandom() syscall
1366OS_URANDOM_DONT_USE_FD = (
1367 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1368 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1369 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001370
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001371@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1372 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001373class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001374 @unittest.skipUnless(resource, "test requires the resource module")
1375 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001376 # Check urandom() failing when it is not able to open /dev/random.
1377 # We spawn a new process to make the test more robust (if getrlimit()
1378 # failed to restore the file descriptor limit after this, the whole
1379 # test suite would crash; this actually happened on the OS X Tiger
1380 # buildbot).
1381 code = """if 1:
1382 import errno
1383 import os
1384 import resource
1385
1386 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1387 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1388 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001389 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001390 except OSError as e:
1391 assert e.errno == errno.EMFILE, e.errno
1392 else:
1393 raise AssertionError("OSError not raised")
1394 """
1395 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001396
Antoine Pitroue472aea2014-04-26 14:33:03 +02001397 def test_urandom_fd_closed(self):
1398 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1399 # closed.
1400 code = """if 1:
1401 import os
1402 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001403 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001404 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001405 with test.support.SuppressCrashReport():
1406 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001407 sys.stdout.buffer.write(os.urandom(4))
1408 """
1409 rc, out, err = assert_python_ok('-Sc', code)
1410
1411 def test_urandom_fd_reopened(self):
1412 # Issue #21207: urandom() should detect its fd to /dev/urandom
1413 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001414 self.addCleanup(support.unlink, support.TESTFN)
1415 create_file(support.TESTFN, b"x" * 256)
1416
Antoine Pitroue472aea2014-04-26 14:33:03 +02001417 code = """if 1:
1418 import os
1419 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001420 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001421 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001422 with test.support.SuppressCrashReport():
1423 for fd in range(3, 256):
1424 try:
1425 os.close(fd)
1426 except OSError:
1427 pass
1428 else:
1429 # Found the urandom fd (XXX hopefully)
1430 break
1431 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001432 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001433 new_fd = f.fileno()
1434 # Issue #26935: posix allows new_fd and fd to be equal but
1435 # some libc implementations have dup2 return an error in this
1436 # case.
1437 if new_fd != fd:
1438 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001439 sys.stdout.buffer.write(os.urandom(4))
1440 sys.stdout.buffer.write(os.urandom(4))
1441 """.format(TESTFN=support.TESTFN)
1442 rc, out, err = assert_python_ok('-Sc', code)
1443 self.assertEqual(len(out), 8)
1444 self.assertNotEqual(out[0:4], out[4:8])
1445 rc, out2, err2 = assert_python_ok('-Sc', code)
1446 self.assertEqual(len(out2), 8)
1447 self.assertNotEqual(out2, out)
1448
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001449
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001450@contextlib.contextmanager
1451def _execvpe_mockup(defpath=None):
1452 """
1453 Stubs out execv and execve functions when used as context manager.
1454 Records exec calls. The mock execv and execve functions always raise an
1455 exception as they would normally never return.
1456 """
1457 # A list of tuples containing (function name, first arg, args)
1458 # of calls to execv or execve that have been made.
1459 calls = []
1460
1461 def mock_execv(name, *args):
1462 calls.append(('execv', name, args))
1463 raise RuntimeError("execv called")
1464
1465 def mock_execve(name, *args):
1466 calls.append(('execve', name, args))
1467 raise OSError(errno.ENOTDIR, "execve called")
1468
1469 try:
1470 orig_execv = os.execv
1471 orig_execve = os.execve
1472 orig_defpath = os.defpath
1473 os.execv = mock_execv
1474 os.execve = mock_execve
1475 if defpath is not None:
1476 os.defpath = defpath
1477 yield calls
1478 finally:
1479 os.execv = orig_execv
1480 os.execve = orig_execve
1481 os.defpath = orig_defpath
1482
Victor Stinner4659ccf2016-09-14 10:57:00 +02001483
Guido van Rossume7ba4952007-06-06 23:52:48 +00001484class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001485 @unittest.skipIf(USING_LINUXTHREADS,
1486 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001487 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001488 self.assertRaises(OSError, os.execvpe, 'no such app-',
1489 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001490
Steve Dowerbce26262016-11-19 19:17:26 -08001491 def test_execv_with_bad_arglist(self):
1492 self.assertRaises(ValueError, os.execv, 'notepad', ())
1493 self.assertRaises(ValueError, os.execv, 'notepad', [])
1494 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1495 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1496
Thomas Heller6790d602007-08-30 17:15:14 +00001497 def test_execvpe_with_bad_arglist(self):
1498 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001499 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1500 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001501
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001502 @unittest.skipUnless(hasattr(os, '_execvpe'),
1503 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001504 def _test_internal_execvpe(self, test_type):
1505 program_path = os.sep + 'absolutepath'
1506 if test_type is bytes:
1507 program = b'executable'
1508 fullpath = os.path.join(os.fsencode(program_path), program)
1509 native_fullpath = fullpath
1510 arguments = [b'progname', 'arg1', 'arg2']
1511 else:
1512 program = 'executable'
1513 arguments = ['progname', 'arg1', 'arg2']
1514 fullpath = os.path.join(program_path, program)
1515 if os.name != "nt":
1516 native_fullpath = os.fsencode(fullpath)
1517 else:
1518 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001519 env = {'spam': 'beans'}
1520
Victor Stinnerb745a742010-05-18 17:17:23 +00001521 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001522 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001523 self.assertRaises(RuntimeError,
1524 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001525 self.assertEqual(len(calls), 1)
1526 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1527
Victor Stinnerb745a742010-05-18 17:17:23 +00001528 # test os._execvpe() with a relative path:
1529 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001530 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001531 self.assertRaises(OSError,
1532 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001533 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001534 self.assertSequenceEqual(calls[0],
1535 ('execve', native_fullpath, (arguments, env)))
1536
1537 # test os._execvpe() with a relative path:
1538 # os.get_exec_path() reads the 'PATH' variable
1539 with _execvpe_mockup() as calls:
1540 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001541 if test_type is bytes:
1542 env_path[b'PATH'] = program_path
1543 else:
1544 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001545 self.assertRaises(OSError,
1546 os._execvpe, program, arguments, env=env_path)
1547 self.assertEqual(len(calls), 1)
1548 self.assertSequenceEqual(calls[0],
1549 ('execve', native_fullpath, (arguments, env_path)))
1550
1551 def test_internal_execvpe_str(self):
1552 self._test_internal_execvpe(str)
1553 if os.name != "nt":
1554 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001555
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001556
Serhiy Storchaka43767632013-11-03 21:31:38 +02001557@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001558class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001559 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001560 try:
1561 os.stat(support.TESTFN)
1562 except FileNotFoundError:
1563 exists = False
1564 except OSError as exc:
1565 exists = True
1566 self.fail("file %s must not exist; os.stat failed with %s"
1567 % (support.TESTFN, exc))
1568 else:
1569 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001570
Thomas Wouters477c8d52006-05-27 19:21:47 +00001571 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001572 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001573
1574 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001575 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001576
1577 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001578 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001579
1580 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001581 self.addCleanup(support.unlink, support.TESTFN)
1582
Victor Stinnere77c9742016-03-25 10:28:23 +01001583 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001584 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001585
1586 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001587 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001588
Thomas Wouters477c8d52006-05-27 19:21:47 +00001589 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001590 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001591
Victor Stinnere77c9742016-03-25 10:28:23 +01001592
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001593class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001594 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001595 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1596 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001597 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001598 def get_single(f):
1599 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001600 if hasattr(os, f):
1601 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001602 return helper
1603 for f in singles:
1604 locals()["test_"+f] = get_single(f)
1605
Benjamin Peterson7522c742009-01-19 21:00:09 +00001606 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001607 try:
1608 f(support.make_bad_fd(), *args)
1609 except OSError as e:
1610 self.assertEqual(e.errno, errno.EBADF)
1611 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001612 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001613 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001614
Serhiy Storchaka43767632013-11-03 21:31:38 +02001615 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001616 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001617 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001618
Serhiy Storchaka43767632013-11-03 21:31:38 +02001619 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001620 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001621 fd = support.make_bad_fd()
1622 # Make sure none of the descriptors we are about to close are
1623 # currently valid (issue 6542).
1624 for i in range(10):
1625 try: os.fstat(fd+i)
1626 except OSError:
1627 pass
1628 else:
1629 break
1630 if i < 2:
1631 raise unittest.SkipTest(
1632 "Unable to acquire a range of invalid file descriptors")
1633 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001634
Serhiy Storchaka43767632013-11-03 21:31:38 +02001635 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001636 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001638
Serhiy Storchaka43767632013-11-03 21:31:38 +02001639 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001640 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001641 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001642
Serhiy Storchaka43767632013-11-03 21:31:38 +02001643 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001644 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001645 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001646
Serhiy Storchaka43767632013-11-03 21:31:38 +02001647 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001648 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001649 self.check(os.pathconf, "PC_NAME_MAX")
1650 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001651
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001653 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001654 self.check(os.truncate, 0)
1655 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656
Serhiy Storchaka43767632013-11-03 21:31:38 +02001657 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001658 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001659 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001660
Serhiy Storchaka43767632013-11-03 21:31:38 +02001661 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664
Victor Stinner57ddf782014-01-08 15:21:28 +01001665 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1666 def test_readv(self):
1667 buf = bytearray(10)
1668 self.check(os.readv, [buf])
1669
Serhiy Storchaka43767632013-11-03 21:31:38 +02001670 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001671 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001672 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001673
Serhiy Storchaka43767632013-11-03 21:31:38 +02001674 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001675 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001676 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001677
Victor Stinner57ddf782014-01-08 15:21:28 +01001678 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1679 def test_writev(self):
1680 self.check(os.writev, [b'abc'])
1681
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001682 def test_inheritable(self):
1683 self.check(os.get_inheritable)
1684 self.check(os.set_inheritable, True)
1685
1686 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1687 'needs os.get_blocking() and os.set_blocking()')
1688 def test_blocking(self):
1689 self.check(os.get_blocking)
1690 self.check(os.set_blocking, True)
1691
Brian Curtin1b9df392010-11-24 20:24:31 +00001692
1693class LinkTests(unittest.TestCase):
1694 def setUp(self):
1695 self.file1 = support.TESTFN
1696 self.file2 = os.path.join(support.TESTFN + "2")
1697
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001698 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001699 for file in (self.file1, self.file2):
1700 if os.path.exists(file):
1701 os.unlink(file)
1702
Brian Curtin1b9df392010-11-24 20:24:31 +00001703 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001704 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001705
Steve Dowercc16be82016-09-08 10:35:16 -07001706 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001707 with open(file1, "r") as f1, open(file2, "r") as f2:
1708 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1709
1710 def test_link(self):
1711 self._test_link(self.file1, self.file2)
1712
1713 def test_link_bytes(self):
1714 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1715 bytes(self.file2, sys.getfilesystemencoding()))
1716
Brian Curtinf498b752010-11-30 15:54:04 +00001717 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001718 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001719 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001720 except UnicodeError:
1721 raise unittest.SkipTest("Unable to encode for this platform.")
1722
Brian Curtinf498b752010-11-30 15:54:04 +00001723 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001724 self.file2 = self.file1 + "2"
1725 self._test_link(self.file1, self.file2)
1726
Serhiy Storchaka43767632013-11-03 21:31:38 +02001727@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1728class PosixUidGidTests(unittest.TestCase):
1729 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1730 def test_setuid(self):
1731 if os.getuid() != 0:
1732 self.assertRaises(OSError, os.setuid, 0)
1733 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001734
Serhiy Storchaka43767632013-11-03 21:31:38 +02001735 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1736 def test_setgid(self):
1737 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1738 self.assertRaises(OSError, os.setgid, 0)
1739 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001740
Serhiy Storchaka43767632013-11-03 21:31:38 +02001741 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1742 def test_seteuid(self):
1743 if os.getuid() != 0:
1744 self.assertRaises(OSError, os.seteuid, 0)
1745 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001746
Serhiy Storchaka43767632013-11-03 21:31:38 +02001747 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1748 def test_setegid(self):
1749 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1750 self.assertRaises(OSError, os.setegid, 0)
1751 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001752
Serhiy Storchaka43767632013-11-03 21:31:38 +02001753 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1754 def test_setreuid(self):
1755 if os.getuid() != 0:
1756 self.assertRaises(OSError, os.setreuid, 0, 0)
1757 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1758 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001759
Serhiy Storchaka43767632013-11-03 21:31:38 +02001760 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1761 def test_setreuid_neg1(self):
1762 # Needs to accept -1. We run this in a subprocess to avoid
1763 # altering the test runner's process state (issue8045).
1764 subprocess.check_call([
1765 sys.executable, '-c',
1766 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001767
Serhiy Storchaka43767632013-11-03 21:31:38 +02001768 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1769 def test_setregid(self):
1770 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1771 self.assertRaises(OSError, os.setregid, 0, 0)
1772 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1773 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001774
Serhiy Storchaka43767632013-11-03 21:31:38 +02001775 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1776 def test_setregid_neg1(self):
1777 # Needs to accept -1. We run this in a subprocess to avoid
1778 # altering the test runner's process state (issue8045).
1779 subprocess.check_call([
1780 sys.executable, '-c',
1781 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001782
Serhiy Storchaka43767632013-11-03 21:31:38 +02001783@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1784class Pep383Tests(unittest.TestCase):
1785 def setUp(self):
1786 if support.TESTFN_UNENCODABLE:
1787 self.dir = support.TESTFN_UNENCODABLE
1788 elif support.TESTFN_NONASCII:
1789 self.dir = support.TESTFN_NONASCII
1790 else:
1791 self.dir = support.TESTFN
1792 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001793
Serhiy Storchaka43767632013-11-03 21:31:38 +02001794 bytesfn = []
1795 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001796 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001797 fn = os.fsencode(fn)
1798 except UnicodeEncodeError:
1799 return
1800 bytesfn.append(fn)
1801 add_filename(support.TESTFN_UNICODE)
1802 if support.TESTFN_UNENCODABLE:
1803 add_filename(support.TESTFN_UNENCODABLE)
1804 if support.TESTFN_NONASCII:
1805 add_filename(support.TESTFN_NONASCII)
1806 if not bytesfn:
1807 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001808
Serhiy Storchaka43767632013-11-03 21:31:38 +02001809 self.unicodefn = set()
1810 os.mkdir(self.dir)
1811 try:
1812 for fn in bytesfn:
1813 support.create_empty_file(os.path.join(self.bdir, fn))
1814 fn = os.fsdecode(fn)
1815 if fn in self.unicodefn:
1816 raise ValueError("duplicate filename")
1817 self.unicodefn.add(fn)
1818 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001819 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001820 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001821
Serhiy Storchaka43767632013-11-03 21:31:38 +02001822 def tearDown(self):
1823 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001824
Serhiy Storchaka43767632013-11-03 21:31:38 +02001825 def test_listdir(self):
1826 expected = self.unicodefn
1827 found = set(os.listdir(self.dir))
1828 self.assertEqual(found, expected)
1829 # test listdir without arguments
1830 current_directory = os.getcwd()
1831 try:
1832 os.chdir(os.sep)
1833 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1834 finally:
1835 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001836
Serhiy Storchaka43767632013-11-03 21:31:38 +02001837 def test_open(self):
1838 for fn in self.unicodefn:
1839 f = open(os.path.join(self.dir, fn), 'rb')
1840 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001841
Serhiy Storchaka43767632013-11-03 21:31:38 +02001842 @unittest.skipUnless(hasattr(os, 'statvfs'),
1843 "need os.statvfs()")
1844 def test_statvfs(self):
1845 # issue #9645
1846 for fn in self.unicodefn:
1847 # should not fail with file not found error
1848 fullname = os.path.join(self.dir, fn)
1849 os.statvfs(fullname)
1850
1851 def test_stat(self):
1852 for fn in self.unicodefn:
1853 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001854
Brian Curtineb24d742010-04-12 17:16:38 +00001855@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1856class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001857 def _kill(self, sig):
1858 # Start sys.executable as a subprocess and communicate from the
1859 # subprocess to the parent that the interpreter is ready. When it
1860 # becomes ready, send *sig* via os.kill to the subprocess and check
1861 # that the return code is equal to *sig*.
1862 import ctypes
1863 from ctypes import wintypes
1864 import msvcrt
1865
1866 # Since we can't access the contents of the process' stdout until the
1867 # process has exited, use PeekNamedPipe to see what's inside stdout
1868 # without waiting. This is done so we can tell that the interpreter
1869 # is started and running at a point where it could handle a signal.
1870 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1871 PeekNamedPipe.restype = wintypes.BOOL
1872 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1873 ctypes.POINTER(ctypes.c_char), # stdout buf
1874 wintypes.DWORD, # Buffer size
1875 ctypes.POINTER(wintypes.DWORD), # bytes read
1876 ctypes.POINTER(wintypes.DWORD), # bytes avail
1877 ctypes.POINTER(wintypes.DWORD)) # bytes left
1878 msg = "running"
1879 proc = subprocess.Popen([sys.executable, "-c",
1880 "import sys;"
1881 "sys.stdout.write('{}');"
1882 "sys.stdout.flush();"
1883 "input()".format(msg)],
1884 stdout=subprocess.PIPE,
1885 stderr=subprocess.PIPE,
1886 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001887 self.addCleanup(proc.stdout.close)
1888 self.addCleanup(proc.stderr.close)
1889 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001890
1891 count, max = 0, 100
1892 while count < max and proc.poll() is None:
1893 # Create a string buffer to store the result of stdout from the pipe
1894 buf = ctypes.create_string_buffer(len(msg))
1895 # Obtain the text currently in proc.stdout
1896 # Bytes read/avail/left are left as NULL and unused
1897 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1898 buf, ctypes.sizeof(buf), None, None, None)
1899 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1900 if buf.value:
1901 self.assertEqual(msg, buf.value.decode())
1902 break
1903 time.sleep(0.1)
1904 count += 1
1905 else:
1906 self.fail("Did not receive communication from the subprocess")
1907
Brian Curtineb24d742010-04-12 17:16:38 +00001908 os.kill(proc.pid, sig)
1909 self.assertEqual(proc.wait(), sig)
1910
1911 def test_kill_sigterm(self):
1912 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001913 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001914
1915 def test_kill_int(self):
1916 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001917 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001918
1919 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001920 tagname = "test_os_%s" % uuid.uuid1()
1921 m = mmap.mmap(-1, 1, tagname)
1922 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001923 # Run a script which has console control handling enabled.
1924 proc = subprocess.Popen([sys.executable,
1925 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001926 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001927 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1928 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001929 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001930 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001931 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001932 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001933 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001934 count += 1
1935 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001936 # Forcefully kill the process if we weren't able to signal it.
1937 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001938 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001939 os.kill(proc.pid, event)
1940 # proc.send_signal(event) could also be done here.
1941 # Allow time for the signal to be passed and the process to exit.
1942 time.sleep(0.5)
1943 if not proc.poll():
1944 # Forcefully kill the process if we weren't able to signal it.
1945 os.kill(proc.pid, signal.SIGINT)
1946 self.fail("subprocess did not stop on {}".format(name))
1947
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001948 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001949 def test_CTRL_C_EVENT(self):
1950 from ctypes import wintypes
1951 import ctypes
1952
1953 # Make a NULL value by creating a pointer with no argument.
1954 NULL = ctypes.POINTER(ctypes.c_int)()
1955 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1956 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1957 wintypes.BOOL)
1958 SetConsoleCtrlHandler.restype = wintypes.BOOL
1959
1960 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001961 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001962 # by subprocesses.
1963 SetConsoleCtrlHandler(NULL, 0)
1964
1965 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1966
1967 def test_CTRL_BREAK_EVENT(self):
1968 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1969
1970
Brian Curtind40e6f72010-07-08 21:39:08 +00001971@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001972class Win32ListdirTests(unittest.TestCase):
1973 """Test listdir on Windows."""
1974
1975 def setUp(self):
1976 self.created_paths = []
1977 for i in range(2):
1978 dir_name = 'SUB%d' % i
1979 dir_path = os.path.join(support.TESTFN, dir_name)
1980 file_name = 'FILE%d' % i
1981 file_path = os.path.join(support.TESTFN, file_name)
1982 os.makedirs(dir_path)
1983 with open(file_path, 'w') as f:
1984 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1985 self.created_paths.extend([dir_name, file_name])
1986 self.created_paths.sort()
1987
1988 def tearDown(self):
1989 shutil.rmtree(support.TESTFN)
1990
1991 def test_listdir_no_extended_path(self):
1992 """Test when the path is not an "extended" path."""
1993 # unicode
1994 self.assertEqual(
1995 sorted(os.listdir(support.TESTFN)),
1996 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001997
Tim Golden781bbeb2013-10-25 20:24:06 +01001998 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001999 self.assertEqual(
2000 sorted(os.listdir(os.fsencode(support.TESTFN))),
2001 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002002
2003 def test_listdir_extended_path(self):
2004 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002005 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002006 # unicode
2007 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2008 self.assertEqual(
2009 sorted(os.listdir(path)),
2010 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002011
Tim Golden781bbeb2013-10-25 20:24:06 +01002012 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002013 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2014 self.assertEqual(
2015 sorted(os.listdir(path)),
2016 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002017
2018
2019@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002020@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002021class Win32SymlinkTests(unittest.TestCase):
2022 filelink = 'filelinktest'
2023 filelink_target = os.path.abspath(__file__)
2024 dirlink = 'dirlinktest'
2025 dirlink_target = os.path.dirname(filelink_target)
2026 missing_link = 'missing link'
2027
2028 def setUp(self):
2029 assert os.path.exists(self.dirlink_target)
2030 assert os.path.exists(self.filelink_target)
2031 assert not os.path.exists(self.dirlink)
2032 assert not os.path.exists(self.filelink)
2033 assert not os.path.exists(self.missing_link)
2034
2035 def tearDown(self):
2036 if os.path.exists(self.filelink):
2037 os.remove(self.filelink)
2038 if os.path.exists(self.dirlink):
2039 os.rmdir(self.dirlink)
2040 if os.path.lexists(self.missing_link):
2041 os.remove(self.missing_link)
2042
2043 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002044 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002045 self.assertTrue(os.path.exists(self.dirlink))
2046 self.assertTrue(os.path.isdir(self.dirlink))
2047 self.assertTrue(os.path.islink(self.dirlink))
2048 self.check_stat(self.dirlink, self.dirlink_target)
2049
2050 def test_file_link(self):
2051 os.symlink(self.filelink_target, self.filelink)
2052 self.assertTrue(os.path.exists(self.filelink))
2053 self.assertTrue(os.path.isfile(self.filelink))
2054 self.assertTrue(os.path.islink(self.filelink))
2055 self.check_stat(self.filelink, self.filelink_target)
2056
2057 def _create_missing_dir_link(self):
2058 'Create a "directory" link to a non-existent target'
2059 linkname = self.missing_link
2060 if os.path.lexists(linkname):
2061 os.remove(linkname)
2062 target = r'c:\\target does not exist.29r3c740'
2063 assert not os.path.exists(target)
2064 target_is_dir = True
2065 os.symlink(target, linkname, target_is_dir)
2066
2067 def test_remove_directory_link_to_missing_target(self):
2068 self._create_missing_dir_link()
2069 # For compatibility with Unix, os.remove will check the
2070 # directory status and call RemoveDirectory if the symlink
2071 # was created with target_is_dir==True.
2072 os.remove(self.missing_link)
2073
2074 @unittest.skip("currently fails; consider for improvement")
2075 def test_isdir_on_directory_link_to_missing_target(self):
2076 self._create_missing_dir_link()
2077 # consider having isdir return true for directory links
2078 self.assertTrue(os.path.isdir(self.missing_link))
2079
2080 @unittest.skip("currently fails; consider for improvement")
2081 def test_rmdir_on_directory_link_to_missing_target(self):
2082 self._create_missing_dir_link()
2083 # consider allowing rmdir to remove directory links
2084 os.rmdir(self.missing_link)
2085
2086 def check_stat(self, link, target):
2087 self.assertEqual(os.stat(link), os.stat(target))
2088 self.assertNotEqual(os.lstat(link), os.stat(link))
2089
Brian Curtind25aef52011-06-13 15:16:04 -05002090 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002091 self.assertEqual(os.stat(bytes_link), os.stat(target))
2092 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002093
2094 def test_12084(self):
2095 level1 = os.path.abspath(support.TESTFN)
2096 level2 = os.path.join(level1, "level2")
2097 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002098 self.addCleanup(support.rmtree, level1)
2099
2100 os.mkdir(level1)
2101 os.mkdir(level2)
2102 os.mkdir(level3)
2103
2104 file1 = os.path.abspath(os.path.join(level1, "file1"))
2105 create_file(file1)
2106
2107 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002108 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002109 os.chdir(level2)
2110 link = os.path.join(level2, "link")
2111 os.symlink(os.path.relpath(file1), "link")
2112 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002113
Victor Stinnerae39d232016-03-24 17:12:55 +01002114 # Check os.stat calls from the same dir as the link
2115 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002116
Victor Stinnerae39d232016-03-24 17:12:55 +01002117 # Check os.stat calls from a dir below the link
2118 os.chdir(level1)
2119 self.assertEqual(os.stat(file1),
2120 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002121
Victor Stinnerae39d232016-03-24 17:12:55 +01002122 # Check os.stat calls from a dir above the link
2123 os.chdir(level3)
2124 self.assertEqual(os.stat(file1),
2125 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002126 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002127 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002128
Brian Curtind40e6f72010-07-08 21:39:08 +00002129
Tim Golden0321cf22014-05-05 19:46:17 +01002130@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2131class Win32JunctionTests(unittest.TestCase):
2132 junction = 'junctiontest'
2133 junction_target = os.path.dirname(os.path.abspath(__file__))
2134
2135 def setUp(self):
2136 assert os.path.exists(self.junction_target)
2137 assert not os.path.exists(self.junction)
2138
2139 def tearDown(self):
2140 if os.path.exists(self.junction):
2141 # os.rmdir delegates to Windows' RemoveDirectoryW,
2142 # which removes junction points safely.
2143 os.rmdir(self.junction)
2144
2145 def test_create_junction(self):
2146 _winapi.CreateJunction(self.junction_target, self.junction)
2147 self.assertTrue(os.path.exists(self.junction))
2148 self.assertTrue(os.path.isdir(self.junction))
2149
2150 # Junctions are not recognized as links.
2151 self.assertFalse(os.path.islink(self.junction))
2152
2153 def test_unlink_removes_junction(self):
2154 _winapi.CreateJunction(self.junction_target, self.junction)
2155 self.assertTrue(os.path.exists(self.junction))
2156
2157 os.unlink(self.junction)
2158 self.assertFalse(os.path.exists(self.junction))
2159
2160
Jason R. Coombs3a092862013-05-27 23:21:28 -04002161@support.skip_unless_symlink
2162class NonLocalSymlinkTests(unittest.TestCase):
2163
2164 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002165 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002166 Create this structure:
2167
2168 base
2169 \___ some_dir
2170 """
2171 os.makedirs('base/some_dir')
2172
2173 def tearDown(self):
2174 shutil.rmtree('base')
2175
2176 def test_directory_link_nonlocal(self):
2177 """
2178 The symlink target should resolve relative to the link, not relative
2179 to the current directory.
2180
2181 Then, link base/some_link -> base/some_dir and ensure that some_link
2182 is resolved as a directory.
2183
2184 In issue13772, it was discovered that directory detection failed if
2185 the symlink target was not specified relative to the current
2186 directory, which was a defect in the implementation.
2187 """
2188 src = os.path.join('base', 'some_link')
2189 os.symlink('some_dir', src)
2190 assert os.path.isdir(src)
2191
2192
Victor Stinnere8d51452010-08-19 01:05:19 +00002193class FSEncodingTests(unittest.TestCase):
2194 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2196 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002197
Victor Stinnere8d51452010-08-19 01:05:19 +00002198 def test_identity(self):
2199 # assert fsdecode(fsencode(x)) == x
2200 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2201 try:
2202 bytesfn = os.fsencode(fn)
2203 except UnicodeEncodeError:
2204 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002205 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002206
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002207
Brett Cannonefb00c02012-02-29 18:31:31 -05002208
2209class DeviceEncodingTests(unittest.TestCase):
2210
2211 def test_bad_fd(self):
2212 # Return None when an fd doesn't actually exist.
2213 self.assertIsNone(os.device_encoding(123456))
2214
Philip Jenveye308b7c2012-02-29 16:16:15 -08002215 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2216 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002217 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002218 def test_device_encoding(self):
2219 encoding = os.device_encoding(0)
2220 self.assertIsNotNone(encoding)
2221 self.assertTrue(codecs.lookup(encoding))
2222
2223
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002224class PidTests(unittest.TestCase):
2225 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2226 def test_getppid(self):
2227 p = subprocess.Popen([sys.executable, '-c',
2228 'import os; print(os.getppid())'],
2229 stdout=subprocess.PIPE)
2230 stdout, _ = p.communicate()
2231 # We are the parent of our subprocess
2232 self.assertEqual(int(stdout), os.getpid())
2233
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002234 def test_waitpid(self):
2235 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002236 # Add an implicit test for PyUnicode_FSConverter().
2237 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002238 status = os.waitpid(pid, 0)
2239 self.assertEqual(status, (pid, 0))
2240
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002241
Victor Stinner4659ccf2016-09-14 10:57:00 +02002242class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002243 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002244 self.exitcode = 17
2245
2246 filename = support.TESTFN
2247 self.addCleanup(support.unlink, filename)
2248
2249 if not with_env:
2250 code = 'import sys; sys.exit(%s)' % self.exitcode
2251 else:
2252 self.env = dict(os.environ)
2253 # create an unique key
2254 self.key = str(uuid.uuid4())
2255 self.env[self.key] = self.key
2256 # read the variable from os.environ to check that it exists
2257 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2258 % (self.key, self.exitcode))
2259
2260 with open(filename, "w") as fp:
2261 fp.write(code)
2262
Berker Peksag81816462016-09-15 20:19:47 +03002263 args = [sys.executable, filename]
2264 if use_bytes:
2265 args = [os.fsencode(a) for a in args]
2266 self.env = {os.fsencode(k): os.fsencode(v)
2267 for k, v in self.env.items()}
2268
2269 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002270
Berker Peksag4af23d72016-09-15 20:32:44 +03002271 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002272 def test_spawnl(self):
2273 args = self.create_args()
2274 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2275 self.assertEqual(exitcode, self.exitcode)
2276
Berker Peksag4af23d72016-09-15 20:32:44 +03002277 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002278 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002279 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002280 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2281 self.assertEqual(exitcode, self.exitcode)
2282
Berker Peksag4af23d72016-09-15 20:32:44 +03002283 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002284 def test_spawnlp(self):
2285 args = self.create_args()
2286 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2287 self.assertEqual(exitcode, self.exitcode)
2288
Berker Peksag4af23d72016-09-15 20:32:44 +03002289 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002290 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002291 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002292 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2293 self.assertEqual(exitcode, self.exitcode)
2294
Berker Peksag4af23d72016-09-15 20:32:44 +03002295 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002296 def test_spawnv(self):
2297 args = self.create_args()
2298 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2299 self.assertEqual(exitcode, self.exitcode)
2300
Berker Peksag4af23d72016-09-15 20:32:44 +03002301 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002302 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002303 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002304 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2305 self.assertEqual(exitcode, self.exitcode)
2306
Berker Peksag4af23d72016-09-15 20:32:44 +03002307 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002308 def test_spawnvp(self):
2309 args = self.create_args()
2310 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2311 self.assertEqual(exitcode, self.exitcode)
2312
Berker Peksag4af23d72016-09-15 20:32:44 +03002313 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002314 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002315 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002316 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2317 self.assertEqual(exitcode, self.exitcode)
2318
Berker Peksag4af23d72016-09-15 20:32:44 +03002319 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002320 def test_nowait(self):
2321 args = self.create_args()
2322 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2323 result = os.waitpid(pid, 0)
2324 self.assertEqual(result[0], pid)
2325 status = result[1]
2326 if hasattr(os, 'WIFEXITED'):
2327 self.assertTrue(os.WIFEXITED(status))
2328 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2329 else:
2330 self.assertEqual(status, self.exitcode << 8)
2331
Berker Peksag4af23d72016-09-15 20:32:44 +03002332 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002333 def test_spawnve_bytes(self):
2334 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2335 args = self.create_args(with_env=True, use_bytes=True)
2336 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2337 self.assertEqual(exitcode, self.exitcode)
2338
Steve Dower859fd7b2016-11-19 18:53:19 -08002339 @requires_os_func('spawnl')
2340 def test_spawnl_noargs(self):
2341 args = self.create_args()
2342 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002343 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002344
2345 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002346 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002347 args = self.create_args()
2348 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002349 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002350
2351 @requires_os_func('spawnv')
2352 def test_spawnv_noargs(self):
2353 args = self.create_args()
2354 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2355 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002356 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2357 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002358
2359 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002360 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002361 args = self.create_args()
2362 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2363 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002364 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2365 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002366
Brian Curtin0151b8e2010-09-24 13:43:43 +00002367# The introduction of this TestCase caused at least two different errors on
2368# *nix buildbots. Temporarily skip this to let the buildbots move along.
2369@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002370@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2371class LoginTests(unittest.TestCase):
2372 def test_getlogin(self):
2373 user_name = os.getlogin()
2374 self.assertNotEqual(len(user_name), 0)
2375
2376
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002377@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2378 "needs os.getpriority and os.setpriority")
2379class ProgramPriorityTests(unittest.TestCase):
2380 """Tests for os.getpriority() and os.setpriority()."""
2381
2382 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002383
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002384 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2385 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2386 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002387 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2388 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002389 raise unittest.SkipTest("unable to reliably test setpriority "
2390 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002391 else:
2392 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002393 finally:
2394 try:
2395 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2396 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002397 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002398 raise
2399
2400
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002401if threading is not None:
2402 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002403
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002404 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002405
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002406 def __init__(self, conn):
2407 asynchat.async_chat.__init__(self, conn)
2408 self.in_buffer = []
2409 self.closed = False
2410 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002411
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002412 def handle_read(self):
2413 data = self.recv(4096)
2414 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002415
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002416 def get_data(self):
2417 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002418
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002419 def handle_close(self):
2420 self.close()
2421 self.closed = True
2422
2423 def handle_error(self):
2424 raise
2425
2426 def __init__(self, address):
2427 threading.Thread.__init__(self)
2428 asyncore.dispatcher.__init__(self)
2429 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2430 self.bind(address)
2431 self.listen(5)
2432 self.host, self.port = self.socket.getsockname()[:2]
2433 self.handler_instance = None
2434 self._active = False
2435 self._active_lock = threading.Lock()
2436
2437 # --- public API
2438
2439 @property
2440 def running(self):
2441 return self._active
2442
2443 def start(self):
2444 assert not self.running
2445 self.__flag = threading.Event()
2446 threading.Thread.start(self)
2447 self.__flag.wait()
2448
2449 def stop(self):
2450 assert self.running
2451 self._active = False
2452 self.join()
2453
2454 def wait(self):
2455 # wait for handler connection to be closed, then stop the server
2456 while not getattr(self.handler_instance, "closed", False):
2457 time.sleep(0.001)
2458 self.stop()
2459
2460 # --- internals
2461
2462 def run(self):
2463 self._active = True
2464 self.__flag.set()
2465 while self._active and asyncore.socket_map:
2466 self._active_lock.acquire()
2467 asyncore.loop(timeout=0.001, count=1)
2468 self._active_lock.release()
2469 asyncore.close_all()
2470
2471 def handle_accept(self):
2472 conn, addr = self.accept()
2473 self.handler_instance = self.Handler(conn)
2474
2475 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002476 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002477 handle_read = handle_connect
2478
2479 def writable(self):
2480 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002481
2482 def handle_error(self):
2483 raise
2484
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002485
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002486@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002487@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2488class TestSendfile(unittest.TestCase):
2489
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002490 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002491 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002492 not sys.platform.startswith("solaris") and \
2493 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002494 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2495 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002496
2497 @classmethod
2498 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002499 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002500 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002501
2502 @classmethod
2503 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002504 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002505 support.unlink(support.TESTFN)
2506
2507 def setUp(self):
2508 self.server = SendfileTestServer((support.HOST, 0))
2509 self.server.start()
2510 self.client = socket.socket()
2511 self.client.connect((self.server.host, self.server.port))
2512 self.client.settimeout(1)
2513 # synchronize by waiting for "220 ready" response
2514 self.client.recv(1024)
2515 self.sockno = self.client.fileno()
2516 self.file = open(support.TESTFN, 'rb')
2517 self.fileno = self.file.fileno()
2518
2519 def tearDown(self):
2520 self.file.close()
2521 self.client.close()
2522 if self.server.running:
2523 self.server.stop()
2524
2525 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2526 """A higher level wrapper representing how an application is
2527 supposed to use sendfile().
2528 """
2529 while 1:
2530 try:
2531 if self.SUPPORT_HEADERS_TRAILERS:
2532 return os.sendfile(sock, file, offset, nbytes, headers,
2533 trailers)
2534 else:
2535 return os.sendfile(sock, file, offset, nbytes)
2536 except OSError as err:
2537 if err.errno == errno.ECONNRESET:
2538 # disconnected
2539 raise
2540 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2541 # we have to retry send data
2542 continue
2543 else:
2544 raise
2545
2546 def test_send_whole_file(self):
2547 # normal send
2548 total_sent = 0
2549 offset = 0
2550 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002551 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002552 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2553 if sent == 0:
2554 break
2555 offset += sent
2556 total_sent += sent
2557 self.assertTrue(sent <= nbytes)
2558 self.assertEqual(offset, total_sent)
2559
2560 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002561 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002562 self.client.close()
2563 self.server.wait()
2564 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002565 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002566 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002567
2568 def test_send_at_certain_offset(self):
2569 # start sending a file at a certain offset
2570 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002571 offset = len(self.DATA) // 2
2572 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002573 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002574 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002575 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2576 if sent == 0:
2577 break
2578 offset += sent
2579 total_sent += sent
2580 self.assertTrue(sent <= nbytes)
2581
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002582 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002583 self.client.close()
2584 self.server.wait()
2585 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002586 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002587 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002588 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002589 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002590
2591 def test_offset_overflow(self):
2592 # specify an offset > file size
2593 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002594 try:
2595 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2596 except OSError as e:
2597 # Solaris can raise EINVAL if offset >= file length, ignore.
2598 if e.errno != errno.EINVAL:
2599 raise
2600 else:
2601 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002602 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002603 self.client.close()
2604 self.server.wait()
2605 data = self.server.handler_instance.get_data()
2606 self.assertEqual(data, b'')
2607
2608 def test_invalid_offset(self):
2609 with self.assertRaises(OSError) as cm:
2610 os.sendfile(self.sockno, self.fileno, -1, 4096)
2611 self.assertEqual(cm.exception.errno, errno.EINVAL)
2612
Martin Panterbf19d162015-09-09 01:01:13 +00002613 def test_keywords(self):
2614 # Keyword arguments should be supported
2615 os.sendfile(out=self.sockno, offset=0, count=4096,
2616 **{'in': self.fileno})
2617 if self.SUPPORT_HEADERS_TRAILERS:
2618 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002619 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002620
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002621 # --- headers / trailers tests
2622
Serhiy Storchaka43767632013-11-03 21:31:38 +02002623 @requires_headers_trailers
2624 def test_headers(self):
2625 total_sent = 0
2626 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2627 headers=[b"x" * 512])
2628 total_sent += sent
2629 offset = 4096
2630 nbytes = 4096
2631 while 1:
2632 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2633 offset, nbytes)
2634 if sent == 0:
2635 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002636 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002637 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002638
Serhiy Storchaka43767632013-11-03 21:31:38 +02002639 expected_data = b"x" * 512 + self.DATA
2640 self.assertEqual(total_sent, len(expected_data))
2641 self.client.close()
2642 self.server.wait()
2643 data = self.server.handler_instance.get_data()
2644 self.assertEqual(hash(data), hash(expected_data))
2645
2646 @requires_headers_trailers
2647 def test_trailers(self):
2648 TESTFN2 = support.TESTFN + "2"
2649 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002650
2651 self.addCleanup(support.unlink, TESTFN2)
2652 create_file(TESTFN2, file_data)
2653
2654 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002655 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2656 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002657 self.client.close()
2658 self.server.wait()
2659 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002660 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002661
Serhiy Storchaka43767632013-11-03 21:31:38 +02002662 @requires_headers_trailers
2663 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2664 'test needs os.SF_NODISKIO')
2665 def test_flags(self):
2666 try:
2667 os.sendfile(self.sockno, self.fileno, 0, 4096,
2668 flags=os.SF_NODISKIO)
2669 except OSError as err:
2670 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2671 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002672
2673
Larry Hastings9cf065c2012-06-22 16:30:09 -07002674def supports_extended_attributes():
2675 if not hasattr(os, "setxattr"):
2676 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002677
Larry Hastings9cf065c2012-06-22 16:30:09 -07002678 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002679 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002680 try:
2681 os.setxattr(fp.fileno(), b"user.test", b"")
2682 except OSError:
2683 return False
2684 finally:
2685 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002686
2687 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002688
2689
2690@unittest.skipUnless(supports_extended_attributes(),
2691 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002692# Kernels < 2.6.39 don't respect setxattr flags.
2693@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002694class ExtendedAttributeTests(unittest.TestCase):
2695
Larry Hastings9cf065c2012-06-22 16:30:09 -07002696 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002697 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002698 self.addCleanup(support.unlink, fn)
2699 create_file(fn)
2700
Benjamin Peterson799bd802011-08-31 22:15:17 -04002701 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002702 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002703 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002704
Victor Stinnerf12e5062011-10-16 22:12:03 +02002705 init_xattr = listxattr(fn)
2706 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002707
Larry Hastings9cf065c2012-06-22 16:30:09 -07002708 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002709 xattr = set(init_xattr)
2710 xattr.add("user.test")
2711 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002712 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2713 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2714 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002715
Benjamin Peterson799bd802011-08-31 22:15:17 -04002716 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002717 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002718 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002719
Benjamin Peterson799bd802011-08-31 22:15:17 -04002720 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002721 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002722 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002723
Larry Hastings9cf065c2012-06-22 16:30:09 -07002724 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002725 xattr.add("user.test2")
2726 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002727 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002728
Benjamin Peterson799bd802011-08-31 22:15:17 -04002729 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002730 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002731 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002732
Victor Stinnerf12e5062011-10-16 22:12:03 +02002733 xattr.remove("user.test")
2734 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002735 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2736 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2737 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2738 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002739 many = sorted("user.test{}".format(i) for i in range(100))
2740 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002741 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002742 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002743
Larry Hastings9cf065c2012-06-22 16:30:09 -07002744 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002745 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002746 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002747
2748 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2749 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002750
2751 def test_simple(self):
2752 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2753 os.listxattr)
2754
2755 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002756 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2757 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002758
2759 def test_fds(self):
2760 def getxattr(path, *args):
2761 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002762 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002763 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002764 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002765 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002766 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002767 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002768 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002769 def listxattr(path, *args):
2770 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002771 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002772 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2773
2774
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002775@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2776class TermsizeTests(unittest.TestCase):
2777 def test_does_not_crash(self):
2778 """Check if get_terminal_size() returns a meaningful value.
2779
2780 There's no easy portable way to actually check the size of the
2781 terminal, so let's check if it returns something sensible instead.
2782 """
2783 try:
2784 size = os.get_terminal_size()
2785 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002786 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002787 # Under win32 a generic OSError can be thrown if the
2788 # handle cannot be retrieved
2789 self.skipTest("failed to query terminal size")
2790 raise
2791
Antoine Pitroucfade362012-02-08 23:48:59 +01002792 self.assertGreaterEqual(size.columns, 0)
2793 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002794
2795 def test_stty_match(self):
2796 """Check if stty returns the same results
2797
2798 stty actually tests stdin, so get_terminal_size is invoked on
2799 stdin explicitly. If stty succeeded, then get_terminal_size()
2800 should work too.
2801 """
2802 try:
2803 size = subprocess.check_output(['stty', 'size']).decode().split()
2804 except (FileNotFoundError, subprocess.CalledProcessError):
2805 self.skipTest("stty invocation failed")
2806 expected = (int(size[1]), int(size[0])) # reversed order
2807
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002808 try:
2809 actual = os.get_terminal_size(sys.__stdin__.fileno())
2810 except OSError as e:
2811 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2812 # Under win32 a generic OSError can be thrown if the
2813 # handle cannot be retrieved
2814 self.skipTest("failed to query terminal size")
2815 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002816 self.assertEqual(expected, actual)
2817
2818
Victor Stinner292c8352012-10-30 02:17:38 +01002819class OSErrorTests(unittest.TestCase):
2820 def setUp(self):
2821 class Str(str):
2822 pass
2823
Victor Stinnerafe17062012-10-31 22:47:43 +01002824 self.bytes_filenames = []
2825 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002826 if support.TESTFN_UNENCODABLE is not None:
2827 decoded = support.TESTFN_UNENCODABLE
2828 else:
2829 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002830 self.unicode_filenames.append(decoded)
2831 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002832 if support.TESTFN_UNDECODABLE is not None:
2833 encoded = support.TESTFN_UNDECODABLE
2834 else:
2835 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002836 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002837 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002838 self.bytes_filenames.append(memoryview(encoded))
2839
2840 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002841
2842 def test_oserror_filename(self):
2843 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002844 (self.filenames, os.chdir,),
2845 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002846 (self.filenames, os.lstat,),
2847 (self.filenames, os.open, os.O_RDONLY),
2848 (self.filenames, os.rmdir,),
2849 (self.filenames, os.stat,),
2850 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002851 ]
2852 if sys.platform == "win32":
2853 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002854 (self.bytes_filenames, os.rename, b"dst"),
2855 (self.bytes_filenames, os.replace, b"dst"),
2856 (self.unicode_filenames, os.rename, "dst"),
2857 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002858 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002859 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002860 else:
2861 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002862 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002863 (self.filenames, os.rename, "dst"),
2864 (self.filenames, os.replace, "dst"),
2865 ))
2866 if hasattr(os, "chown"):
2867 funcs.append((self.filenames, os.chown, 0, 0))
2868 if hasattr(os, "lchown"):
2869 funcs.append((self.filenames, os.lchown, 0, 0))
2870 if hasattr(os, "truncate"):
2871 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002872 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002873 funcs.append((self.filenames, os.chflags, 0))
2874 if hasattr(os, "lchflags"):
2875 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002876 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002877 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002878 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002879 if sys.platform == "win32":
2880 funcs.append((self.bytes_filenames, os.link, b"dst"))
2881 funcs.append((self.unicode_filenames, os.link, "dst"))
2882 else:
2883 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002884 if hasattr(os, "listxattr"):
2885 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002886 (self.filenames, os.listxattr,),
2887 (self.filenames, os.getxattr, "user.test"),
2888 (self.filenames, os.setxattr, "user.test", b'user'),
2889 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002890 ))
2891 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002892 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002893 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002894 if sys.platform == "win32":
2895 funcs.append((self.unicode_filenames, os.readlink,))
2896 else:
2897 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002898
Steve Dowercc16be82016-09-08 10:35:16 -07002899
Victor Stinnerafe17062012-10-31 22:47:43 +01002900 for filenames, func, *func_args in funcs:
2901 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002902 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002903 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002904 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002905 else:
2906 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2907 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002908 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002909 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002910 except UnicodeDecodeError:
2911 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002912 else:
2913 self.fail("No exception thrown by {}".format(func))
2914
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002915class CPUCountTests(unittest.TestCase):
2916 def test_cpu_count(self):
2917 cpus = os.cpu_count()
2918 if cpus is not None:
2919 self.assertIsInstance(cpus, int)
2920 self.assertGreater(cpus, 0)
2921 else:
2922 self.skipTest("Could not determine the number of CPUs")
2923
Victor Stinnerdaf45552013-08-28 00:53:59 +02002924
2925class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002926 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002927 fd = os.open(__file__, os.O_RDONLY)
2928 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002929 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002930
Victor Stinnerdaf45552013-08-28 00:53:59 +02002931 os.set_inheritable(fd, True)
2932 self.assertEqual(os.get_inheritable(fd), True)
2933
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002934 @unittest.skipIf(fcntl is None, "need fcntl")
2935 def test_get_inheritable_cloexec(self):
2936 fd = os.open(__file__, os.O_RDONLY)
2937 self.addCleanup(os.close, fd)
2938 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002939
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002940 # clear FD_CLOEXEC flag
2941 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2942 flags &= ~fcntl.FD_CLOEXEC
2943 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002944
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002945 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002946
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002947 @unittest.skipIf(fcntl is None, "need fcntl")
2948 def test_set_inheritable_cloexec(self):
2949 fd = os.open(__file__, os.O_RDONLY)
2950 self.addCleanup(os.close, fd)
2951 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2952 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002953
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002954 os.set_inheritable(fd, True)
2955 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2956 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002957
Victor Stinnerdaf45552013-08-28 00:53:59 +02002958 def test_open(self):
2959 fd = os.open(__file__, os.O_RDONLY)
2960 self.addCleanup(os.close, fd)
2961 self.assertEqual(os.get_inheritable(fd), False)
2962
2963 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2964 def test_pipe(self):
2965 rfd, wfd = os.pipe()
2966 self.addCleanup(os.close, rfd)
2967 self.addCleanup(os.close, wfd)
2968 self.assertEqual(os.get_inheritable(rfd), False)
2969 self.assertEqual(os.get_inheritable(wfd), False)
2970
2971 def test_dup(self):
2972 fd1 = os.open(__file__, os.O_RDONLY)
2973 self.addCleanup(os.close, fd1)
2974
2975 fd2 = os.dup(fd1)
2976 self.addCleanup(os.close, fd2)
2977 self.assertEqual(os.get_inheritable(fd2), False)
2978
2979 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2980 def test_dup2(self):
2981 fd = os.open(__file__, os.O_RDONLY)
2982 self.addCleanup(os.close, fd)
2983
2984 # inheritable by default
2985 fd2 = os.open(__file__, os.O_RDONLY)
2986 try:
2987 os.dup2(fd, fd2)
2988 self.assertEqual(os.get_inheritable(fd2), True)
2989 finally:
2990 os.close(fd2)
2991
2992 # force non-inheritable
2993 fd3 = os.open(__file__, os.O_RDONLY)
2994 try:
2995 os.dup2(fd, fd3, inheritable=False)
2996 self.assertEqual(os.get_inheritable(fd3), False)
2997 finally:
2998 os.close(fd3)
2999
3000 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3001 def test_openpty(self):
3002 master_fd, slave_fd = os.openpty()
3003 self.addCleanup(os.close, master_fd)
3004 self.addCleanup(os.close, slave_fd)
3005 self.assertEqual(os.get_inheritable(master_fd), False)
3006 self.assertEqual(os.get_inheritable(slave_fd), False)
3007
3008
Brett Cannon3f9183b2016-08-26 14:44:48 -07003009class PathTConverterTests(unittest.TestCase):
3010 # tuples of (function name, allows fd arguments, additional arguments to
3011 # function, cleanup function)
3012 functions = [
3013 ('stat', True, (), None),
3014 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003015 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003016 ('chflags', False, (0,), None),
3017 ('lchflags', False, (0,), None),
3018 ('open', False, (0,), getattr(os, 'close', None)),
3019 ]
3020
3021 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003022 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003023 if os.name == 'nt':
3024 bytes_fspath = bytes_filename = None
3025 else:
3026 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003027 bytes_fspath = _PathLike(bytes_filename)
3028 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003029 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003030 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003031
Brett Cannonec6ce872016-09-06 15:50:29 -07003032 int_fspath = _PathLike(fd)
3033 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003034
3035 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3036 with self.subTest(name=name):
3037 try:
3038 fn = getattr(os, name)
3039 except AttributeError:
3040 continue
3041
Brett Cannon8f96a302016-08-26 19:30:11 -07003042 for path in (str_filename, bytes_filename, str_fspath,
3043 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003044 if path is None:
3045 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003046 with self.subTest(name=name, path=path):
3047 result = fn(path, *extra_args)
3048 if cleanup_fn is not None:
3049 cleanup_fn(result)
3050
3051 with self.assertRaisesRegex(
3052 TypeError, 'should be string, bytes'):
3053 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003054
3055 if allow_fd:
3056 result = fn(fd, *extra_args) # should not fail
3057 if cleanup_fn is not None:
3058 cleanup_fn(result)
3059 else:
3060 with self.assertRaisesRegex(
3061 TypeError,
3062 'os.PathLike'):
3063 fn(fd, *extra_args)
3064
3065
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003066@unittest.skipUnless(hasattr(os, 'get_blocking'),
3067 'needs os.get_blocking() and os.set_blocking()')
3068class BlockingTests(unittest.TestCase):
3069 def test_blocking(self):
3070 fd = os.open(__file__, os.O_RDONLY)
3071 self.addCleanup(os.close, fd)
3072 self.assertEqual(os.get_blocking(fd), True)
3073
3074 os.set_blocking(fd, False)
3075 self.assertEqual(os.get_blocking(fd), False)
3076
3077 os.set_blocking(fd, True)
3078 self.assertEqual(os.get_blocking(fd), True)
3079
3080
Yury Selivanov97e2e062014-09-26 12:33:06 -04003081
3082class ExportsTests(unittest.TestCase):
3083 def test_os_all(self):
3084 self.assertIn('open', os.__all__)
3085 self.assertIn('walk', os.__all__)
3086
3087
Victor Stinner6036e442015-03-08 01:58:04 +01003088class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003089 check_no_resource_warning = support.check_no_resource_warning
3090
Victor Stinner6036e442015-03-08 01:58:04 +01003091 def setUp(self):
3092 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003093 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003094 self.addCleanup(support.rmtree, self.path)
3095 os.mkdir(self.path)
3096
3097 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003098 path = self.bytes_path if isinstance(name, bytes) else self.path
3099 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003100 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003101 return filename
3102
3103 def get_entries(self, names):
3104 entries = dict((entry.name, entry)
3105 for entry in os.scandir(self.path))
3106 self.assertEqual(sorted(entries.keys()), names)
3107 return entries
3108
3109 def assert_stat_equal(self, stat1, stat2, skip_fields):
3110 if skip_fields:
3111 for attr in dir(stat1):
3112 if not attr.startswith("st_"):
3113 continue
3114 if attr in ("st_dev", "st_ino", "st_nlink"):
3115 continue
3116 self.assertEqual(getattr(stat1, attr),
3117 getattr(stat2, attr),
3118 (stat1, stat2, attr))
3119 else:
3120 self.assertEqual(stat1, stat2)
3121
3122 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003123 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003124 self.assertEqual(entry.name, name)
3125 self.assertEqual(entry.path, os.path.join(self.path, name))
3126 self.assertEqual(entry.inode(),
3127 os.stat(entry.path, follow_symlinks=False).st_ino)
3128
3129 entry_stat = os.stat(entry.path)
3130 self.assertEqual(entry.is_dir(),
3131 stat.S_ISDIR(entry_stat.st_mode))
3132 self.assertEqual(entry.is_file(),
3133 stat.S_ISREG(entry_stat.st_mode))
3134 self.assertEqual(entry.is_symlink(),
3135 os.path.islink(entry.path))
3136
3137 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3138 self.assertEqual(entry.is_dir(follow_symlinks=False),
3139 stat.S_ISDIR(entry_lstat.st_mode))
3140 self.assertEqual(entry.is_file(follow_symlinks=False),
3141 stat.S_ISREG(entry_lstat.st_mode))
3142
3143 self.assert_stat_equal(entry.stat(),
3144 entry_stat,
3145 os.name == 'nt' and not is_symlink)
3146 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3147 entry_lstat,
3148 os.name == 'nt')
3149
3150 def test_attributes(self):
3151 link = hasattr(os, 'link')
3152 symlink = support.can_symlink()
3153
3154 dirname = os.path.join(self.path, "dir")
3155 os.mkdir(dirname)
3156 filename = self.create_file("file.txt")
3157 if link:
3158 os.link(filename, os.path.join(self.path, "link_file.txt"))
3159 if symlink:
3160 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3161 target_is_directory=True)
3162 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3163
3164 names = ['dir', 'file.txt']
3165 if link:
3166 names.append('link_file.txt')
3167 if symlink:
3168 names.extend(('symlink_dir', 'symlink_file.txt'))
3169 entries = self.get_entries(names)
3170
3171 entry = entries['dir']
3172 self.check_entry(entry, 'dir', True, False, False)
3173
3174 entry = entries['file.txt']
3175 self.check_entry(entry, 'file.txt', False, True, False)
3176
3177 if link:
3178 entry = entries['link_file.txt']
3179 self.check_entry(entry, 'link_file.txt', False, True, False)
3180
3181 if symlink:
3182 entry = entries['symlink_dir']
3183 self.check_entry(entry, 'symlink_dir', True, False, True)
3184
3185 entry = entries['symlink_file.txt']
3186 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3187
3188 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003189 path = self.bytes_path if isinstance(name, bytes) else self.path
3190 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003191 self.assertEqual(len(entries), 1)
3192
3193 entry = entries[0]
3194 self.assertEqual(entry.name, name)
3195 return entry
3196
Brett Cannon96881cd2016-06-10 14:37:21 -07003197 def create_file_entry(self, name='file.txt'):
3198 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003199 return self.get_entry(os.path.basename(filename))
3200
3201 def test_current_directory(self):
3202 filename = self.create_file()
3203 old_dir = os.getcwd()
3204 try:
3205 os.chdir(self.path)
3206
3207 # call scandir() without parameter: it must list the content
3208 # of the current directory
3209 entries = dict((entry.name, entry) for entry in os.scandir())
3210 self.assertEqual(sorted(entries.keys()),
3211 [os.path.basename(filename)])
3212 finally:
3213 os.chdir(old_dir)
3214
3215 def test_repr(self):
3216 entry = self.create_file_entry()
3217 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3218
Brett Cannon96881cd2016-06-10 14:37:21 -07003219 def test_fspath_protocol(self):
3220 entry = self.create_file_entry()
3221 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3222
3223 def test_fspath_protocol_bytes(self):
3224 bytes_filename = os.fsencode('bytesfile.txt')
3225 bytes_entry = self.create_file_entry(name=bytes_filename)
3226 fspath = os.fspath(bytes_entry)
3227 self.assertIsInstance(fspath, bytes)
3228 self.assertEqual(fspath,
3229 os.path.join(os.fsencode(self.path),bytes_filename))
3230
Victor Stinner6036e442015-03-08 01:58:04 +01003231 def test_removed_dir(self):
3232 path = os.path.join(self.path, 'dir')
3233
3234 os.mkdir(path)
3235 entry = self.get_entry('dir')
3236 os.rmdir(path)
3237
3238 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3239 if os.name == 'nt':
3240 self.assertTrue(entry.is_dir())
3241 self.assertFalse(entry.is_file())
3242 self.assertFalse(entry.is_symlink())
3243 if os.name == 'nt':
3244 self.assertRaises(FileNotFoundError, entry.inode)
3245 # don't fail
3246 entry.stat()
3247 entry.stat(follow_symlinks=False)
3248 else:
3249 self.assertGreater(entry.inode(), 0)
3250 self.assertRaises(FileNotFoundError, entry.stat)
3251 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3252
3253 def test_removed_file(self):
3254 entry = self.create_file_entry()
3255 os.unlink(entry.path)
3256
3257 self.assertFalse(entry.is_dir())
3258 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3259 if os.name == 'nt':
3260 self.assertTrue(entry.is_file())
3261 self.assertFalse(entry.is_symlink())
3262 if os.name == 'nt':
3263 self.assertRaises(FileNotFoundError, entry.inode)
3264 # don't fail
3265 entry.stat()
3266 entry.stat(follow_symlinks=False)
3267 else:
3268 self.assertGreater(entry.inode(), 0)
3269 self.assertRaises(FileNotFoundError, entry.stat)
3270 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3271
3272 def test_broken_symlink(self):
3273 if not support.can_symlink():
3274 return self.skipTest('cannot create symbolic link')
3275
3276 filename = self.create_file("file.txt")
3277 os.symlink(filename,
3278 os.path.join(self.path, "symlink.txt"))
3279 entries = self.get_entries(['file.txt', 'symlink.txt'])
3280 entry = entries['symlink.txt']
3281 os.unlink(filename)
3282
3283 self.assertGreater(entry.inode(), 0)
3284 self.assertFalse(entry.is_dir())
3285 self.assertFalse(entry.is_file()) # broken symlink returns False
3286 self.assertFalse(entry.is_dir(follow_symlinks=False))
3287 self.assertFalse(entry.is_file(follow_symlinks=False))
3288 self.assertTrue(entry.is_symlink())
3289 self.assertRaises(FileNotFoundError, entry.stat)
3290 # don't fail
3291 entry.stat(follow_symlinks=False)
3292
3293 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003294 self.create_file("file.txt")
3295
3296 path_bytes = os.fsencode(self.path)
3297 entries = list(os.scandir(path_bytes))
3298 self.assertEqual(len(entries), 1, entries)
3299 entry = entries[0]
3300
3301 self.assertEqual(entry.name, b'file.txt')
3302 self.assertEqual(entry.path,
3303 os.fsencode(os.path.join(self.path, 'file.txt')))
3304
3305 def test_empty_path(self):
3306 self.assertRaises(FileNotFoundError, os.scandir, '')
3307
3308 def test_consume_iterator_twice(self):
3309 self.create_file("file.txt")
3310 iterator = os.scandir(self.path)
3311
3312 entries = list(iterator)
3313 self.assertEqual(len(entries), 1, entries)
3314
3315 # check than consuming the iterator twice doesn't raise exception
3316 entries2 = list(iterator)
3317 self.assertEqual(len(entries2), 0, entries2)
3318
3319 def test_bad_path_type(self):
3320 for obj in [1234, 1.234, {}, []]:
3321 self.assertRaises(TypeError, os.scandir, obj)
3322
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003323 def test_close(self):
3324 self.create_file("file.txt")
3325 self.create_file("file2.txt")
3326 iterator = os.scandir(self.path)
3327 next(iterator)
3328 iterator.close()
3329 # multiple closes
3330 iterator.close()
3331 with self.check_no_resource_warning():
3332 del iterator
3333
3334 def test_context_manager(self):
3335 self.create_file("file.txt")
3336 self.create_file("file2.txt")
3337 with os.scandir(self.path) as iterator:
3338 next(iterator)
3339 with self.check_no_resource_warning():
3340 del iterator
3341
3342 def test_context_manager_close(self):
3343 self.create_file("file.txt")
3344 self.create_file("file2.txt")
3345 with os.scandir(self.path) as iterator:
3346 next(iterator)
3347 iterator.close()
3348
3349 def test_context_manager_exception(self):
3350 self.create_file("file.txt")
3351 self.create_file("file2.txt")
3352 with self.assertRaises(ZeroDivisionError):
3353 with os.scandir(self.path) as iterator:
3354 next(iterator)
3355 1/0
3356 with self.check_no_resource_warning():
3357 del iterator
3358
3359 def test_resource_warning(self):
3360 self.create_file("file.txt")
3361 self.create_file("file2.txt")
3362 iterator = os.scandir(self.path)
3363 next(iterator)
3364 with self.assertWarns(ResourceWarning):
3365 del iterator
3366 support.gc_collect()
3367 # exhausted iterator
3368 iterator = os.scandir(self.path)
3369 list(iterator)
3370 with self.check_no_resource_warning():
3371 del iterator
3372
Victor Stinner6036e442015-03-08 01:58:04 +01003373
Ethan Furmancdc08792016-06-02 15:06:09 -07003374class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003375
3376 # Abstracted so it can be overridden to test pure Python implementation
3377 # if a C version is provided.
3378 fspath = staticmethod(os.fspath)
3379
Ethan Furmancdc08792016-06-02 15:06:09 -07003380 def test_return_bytes(self):
3381 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003382 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003383
3384 def test_return_string(self):
3385 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003386 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003387
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003388 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003389 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003390 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003391
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003392 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003393 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3394 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3395
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003396 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003397 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3398 self.assertTrue(issubclass(_PathLike, os.PathLike))
3399 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003400
Ethan Furmancdc08792016-06-02 15:06:09 -07003401 def test_garbage_in_exception_out(self):
3402 vapor = type('blah', (), {})
3403 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003404 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003405
3406 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003407 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003408
Brett Cannon044283a2016-07-15 10:41:49 -07003409 def test_bad_pathlike(self):
3410 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003411 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003412 # __fspath__ attribute that is not callable.
3413 c = type('foo', (), {})
3414 c.__fspath__ = 1
3415 self.assertRaises(TypeError, self.fspath, c())
3416 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003417 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003418 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003419
3420# Only test if the C version is provided, otherwise TestPEP519 already tested
3421# the pure Python implementation.
3422if hasattr(os, "_fspath"):
3423 class TestPEP519PurePython(TestPEP519):
3424
3425 """Explicitly test the pure Python implementation of os.fspath()."""
3426
3427 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003428
3429
Fred Drake2e2be372001-09-20 21:33:42 +00003430if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003431 unittest.main()