blob: e9fdb0719f4bf4267d0c51bbbe5f53e861bcb6f6 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010059except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050060 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
Berker Peksag4af23d72016-09-15 20:32:44 +030093def requires_os_func(name):
94 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
95
96
Brett Cannonec6ce872016-09-06 15:50:29 -070097class _PathLike(os.PathLike):
98
99 def __init__(self, path=""):
100 self.path = path
101
102 def __str__(self):
103 return str(self.path)
104
105 def __fspath__(self):
106 if isinstance(self.path, BaseException):
107 raise self.path
108 else:
109 return self.path
110
111
Victor Stinnerae39d232016-03-24 17:12:55 +0100112def create_file(filename, content=b'content'):
113 with open(filename, "xb", 0) as fp:
114 fp.write(content)
115
116
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117# Tests creating TESTFN
118class FileTests(unittest.TestCase):
119 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000120 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000122 tearDown = setUp
123
124 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000125 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000126 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000127 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000128
Christian Heimesfdab48e2008-01-20 09:06:41 +0000129 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000130 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
131 # We must allocate two consecutive file descriptors, otherwise
132 # it will mess up other file descriptors (perhaps even the three
133 # standard ones).
134 second = os.dup(first)
135 try:
136 retries = 0
137 while second != first + 1:
138 os.close(first)
139 retries += 1
140 if retries > 10:
141 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000142 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000143 first, second = second, os.dup(second)
144 finally:
145 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000146 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000147 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000148 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000149
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000150 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000151 def test_rename(self):
152 path = support.TESTFN
153 old = sys.getrefcount(path)
154 self.assertRaises(TypeError, os.rename, path, 0)
155 new = sys.getrefcount(path)
156 self.assertEqual(old, new)
157
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000158 def test_read(self):
159 with open(support.TESTFN, "w+b") as fobj:
160 fobj.write(b"spam")
161 fobj.flush()
162 fd = fobj.fileno()
163 os.lseek(fd, 0, 0)
164 s = os.read(fd, 4)
165 self.assertEqual(type(s), bytes)
166 self.assertEqual(s, b"spam")
167
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200168 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200169 # Skip the test on 32-bit platforms: the number of bytes must fit in a
170 # Py_ssize_t type
171 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
172 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200173 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
174 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200175 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100176 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200177
178 # Issue #21932: Make sure that os.read() does not raise an
179 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200180 with open(support.TESTFN, "rb") as fp:
181 data = os.read(fp.fileno(), size)
182
183 # The test does not try to read more than 2 GB at once because the
184 # operating system is free to return less bytes than requested.
185 self.assertEqual(data, b'test')
186
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000187 def test_write(self):
188 # os.write() accepts bytes- and buffer-like objects but not strings
189 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
190 self.assertRaises(TypeError, os.write, fd, "beans")
191 os.write(fd, b"bacon\n")
192 os.write(fd, bytearray(b"eggs\n"))
193 os.write(fd, memoryview(b"spam\n"))
194 os.close(fd)
195 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000196 self.assertEqual(fobj.read().splitlines(),
197 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000198
Victor Stinnere0daff12011-03-20 23:36:35 +0100199 def write_windows_console(self, *args):
200 retcode = subprocess.call(args,
201 # use a new console to not flood the test output
202 creationflags=subprocess.CREATE_NEW_CONSOLE,
203 # use a shell to hide the console window (SW_HIDE)
204 shell=True)
205 self.assertEqual(retcode, 0)
206
207 @unittest.skipUnless(sys.platform == 'win32',
208 'test specific to the Windows console')
209 def test_write_windows_console(self):
210 # Issue #11395: the Windows console returns an error (12: not enough
211 # space error) on writing into stdout if stdout mode is binary and the
212 # length is greater than 66,000 bytes (or less, depending on heap
213 # usage).
214 code = "print('x' * 100000)"
215 self.write_windows_console(sys.executable, "-c", code)
216 self.write_windows_console(sys.executable, "-u", "-c", code)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 def fdopen_helper(self, *args):
219 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200220 f = os.fdopen(fd, *args)
221 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000222
223 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200224 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
225 os.close(fd)
226
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000227 self.fdopen_helper()
228 self.fdopen_helper('r')
229 self.fdopen_helper('r', 100)
230
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100231 def test_replace(self):
232 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100233 self.addCleanup(support.unlink, support.TESTFN)
234 self.addCleanup(support.unlink, TESTFN2)
235
236 create_file(support.TESTFN, b"1")
237 create_file(TESTFN2, b"2")
238
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100239 os.replace(support.TESTFN, TESTFN2)
240 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
241 with open(TESTFN2, 'r') as f:
242 self.assertEqual(f.read(), "1")
243
Martin Panterbf19d162015-09-09 01:01:13 +0000244 def test_open_keywords(self):
245 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
246 dir_fd=None)
247 os.close(f)
248
249 def test_symlink_keywords(self):
250 symlink = support.get_attribute(os, "symlink")
251 try:
252 symlink(src='target', dst=support.TESTFN,
253 target_is_directory=False, dir_fd=None)
254 except (NotImplementedError, OSError):
255 pass # No OS support or unprivileged user
256
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258# Test attributes on return values from os.*stat* family.
259class StatAttributeTests(unittest.TestCase):
260 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200261 self.fname = support.TESTFN
262 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100263 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000267 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268
269 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000270 self.assertEqual(result[stat.ST_SIZE], 3)
271 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 # Make sure all the attributes are there
274 members = dir(result)
275 for name in dir(stat):
276 if name[:3] == 'ST_':
277 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000278 if name.endswith("TIME"):
279 def trunc(x): return int(x)
280 else:
281 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000282 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285
Larry Hastings6fe20b32012-04-19 15:07:49 -0700286 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700287 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700288 for name in 'st_atime st_mtime st_ctime'.split():
289 floaty = int(getattr(result, name) * 100000)
290 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700291 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700292
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 try:
294 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except IndexError:
297 pass
298
299 # Make sure that assignment fails
300 try:
301 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200302 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000303 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000304 pass
305
306 try:
307 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200308 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000309 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000310 pass
311
312 try:
313 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200314 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315 except AttributeError:
316 pass
317
318 # Use the stat_result constructor with a too-short tuple.
319 try:
320 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200321 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322 except TypeError:
323 pass
324
Ezio Melotti42da6632011-03-15 05:18:48 +0200325 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000326 try:
327 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
328 except TypeError:
329 pass
330
Antoine Pitrou38425292010-09-21 18:19:07 +0000331 def test_stat_attributes(self):
332 self.check_stat_attributes(self.fname)
333
334 def test_stat_attributes_bytes(self):
335 try:
336 fname = self.fname.encode(sys.getfilesystemencoding())
337 except UnicodeEncodeError:
338 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700339 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000340
Christian Heimes25827622013-10-12 01:27:08 +0200341 def test_stat_result_pickle(self):
342 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200343 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
344 p = pickle.dumps(result, proto)
345 self.assertIn(b'stat_result', p)
346 if proto < 4:
347 self.assertIn(b'cos\nstat_result\n', p)
348 unpickled = pickle.loads(p)
349 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200350
Serhiy Storchaka43767632013-11-03 21:31:38 +0200351 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000352 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000353 try:
354 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000355 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000357 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200358 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359
360 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000361 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000363 # Make sure all the attributes are there.
364 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
365 'ffree', 'favail', 'flag', 'namemax')
366 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000367 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000368
369 # Make sure that assignment really fails
370 try:
371 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200372 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000373 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374 pass
375
376 try:
377 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200378 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000379 except AttributeError:
380 pass
381
382 # Use the constructor with a too-short tuple.
383 try:
384 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200385 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386 except TypeError:
387 pass
388
Ezio Melotti42da6632011-03-15 05:18:48 +0200389 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390 try:
391 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
392 except TypeError:
393 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000394
Christian Heimes25827622013-10-12 01:27:08 +0200395 @unittest.skipUnless(hasattr(os, 'statvfs'),
396 "need os.statvfs()")
397 def test_statvfs_result_pickle(self):
398 try:
399 result = os.statvfs(self.fname)
400 except OSError as e:
401 # On AtheOS, glibc always returns ENOSYS
402 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200403 self.skipTest('os.statvfs() failed with ENOSYS')
404
Serhiy Storchakabad12572014-12-15 14:03:42 +0200405 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
406 p = pickle.dumps(result, proto)
407 self.assertIn(b'statvfs_result', p)
408 if proto < 4:
409 self.assertIn(b'cos\nstatvfs_result\n', p)
410 unpickled = pickle.loads(p)
411 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200412
Serhiy Storchaka43767632013-11-03 21:31:38 +0200413 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
414 def test_1686475(self):
415 # Verify that an open file can be stat'ed
416 try:
417 os.stat(r"c:\pagefile.sys")
418 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600419 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200420 except OSError as e:
421 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000422
Serhiy Storchaka43767632013-11-03 21:31:38 +0200423 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
424 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
425 def test_15261(self):
426 # Verify that stat'ing a closed fd does not cause crash
427 r, w = os.pipe()
428 try:
429 os.stat(r) # should not raise error
430 finally:
431 os.close(r)
432 os.close(w)
433 with self.assertRaises(OSError) as ctx:
434 os.stat(r)
435 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100436
Zachary Ware63f277b2014-06-19 09:46:37 -0500437 def check_file_attributes(self, result):
438 self.assertTrue(hasattr(result, 'st_file_attributes'))
439 self.assertTrue(isinstance(result.st_file_attributes, int))
440 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
441
442 @unittest.skipUnless(sys.platform == "win32",
443 "st_file_attributes is Win32 specific")
444 def test_file_attributes(self):
445 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
446 result = os.stat(self.fname)
447 self.check_file_attributes(result)
448 self.assertEqual(
449 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
450 0)
451
452 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200453 dirname = support.TESTFN + "dir"
454 os.mkdir(dirname)
455 self.addCleanup(os.rmdir, dirname)
456
457 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500458 self.check_file_attributes(result)
459 self.assertEqual(
460 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
461 stat.FILE_ATTRIBUTE_DIRECTORY)
462
Berker Peksag0b4dc482016-09-17 15:49:59 +0300463 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
464 def test_access_denied(self):
465 # Default to FindFirstFile WIN32_FIND_DATA when access is
466 # denied. See issue 28075.
467 # os.environ['TEMP'] should be located on a volume that
468 # supports file ACLs.
469 fname = os.path.join(os.environ['TEMP'], self.fname)
470 self.addCleanup(support.unlink, fname)
471 create_file(fname, b'ABC')
472 # Deny the right to [S]YNCHRONIZE on the file to
473 # force CreateFile to fail with ERROR_ACCESS_DENIED.
474 DETACHED_PROCESS = 8
475 subprocess.check_call(
476 ['icacls.exe', fname, '/deny', 'Users:(S)'],
477 creationflags=DETACHED_PROCESS
478 )
479 result = os.stat(fname)
480 self.assertNotEqual(result.st_size, 0)
481
Victor Stinner47aacc82015-06-12 17:26:23 +0200482
483class UtimeTests(unittest.TestCase):
484 def setUp(self):
485 self.dirname = support.TESTFN
486 self.fname = os.path.join(self.dirname, "f1")
487
488 self.addCleanup(support.rmtree, self.dirname)
489 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100490 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200491
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200492 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100493 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200494 os.stat_float_times(state)
495
Victor Stinner47aacc82015-06-12 17:26:23 +0200496 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100497 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200498 old_float_times = os.stat_float_times(-1)
499 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200500
501 os.stat_float_times(True)
502
503 def support_subsecond(self, filename):
504 # Heuristic to check if the filesystem supports timestamp with
505 # subsecond resolution: check if float and int timestamps are different
506 st = os.stat(filename)
507 return ((st.st_atime != st[7])
508 or (st.st_mtime != st[8])
509 or (st.st_ctime != st[9]))
510
511 def _test_utime(self, set_time, filename=None):
512 if not filename:
513 filename = self.fname
514
515 support_subsecond = self.support_subsecond(filename)
516 if support_subsecond:
517 # Timestamp with a resolution of 1 microsecond (10^-6).
518 #
519 # The resolution of the C internal function used by os.utime()
520 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
521 # test with a resolution of 1 ns requires more work:
522 # see the issue #15745.
523 atime_ns = 1002003000 # 1.002003 seconds
524 mtime_ns = 4005006000 # 4.005006 seconds
525 else:
526 # use a resolution of 1 second
527 atime_ns = 5 * 10**9
528 mtime_ns = 8 * 10**9
529
530 set_time(filename, (atime_ns, mtime_ns))
531 st = os.stat(filename)
532
533 if support_subsecond:
534 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
535 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
536 else:
537 self.assertEqual(st.st_atime, atime_ns * 1e-9)
538 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
539 self.assertEqual(st.st_atime_ns, atime_ns)
540 self.assertEqual(st.st_mtime_ns, mtime_ns)
541
542 def test_utime(self):
543 def set_time(filename, ns):
544 # test the ns keyword parameter
545 os.utime(filename, ns=ns)
546 self._test_utime(set_time)
547
548 @staticmethod
549 def ns_to_sec(ns):
550 # Convert a number of nanosecond (int) to a number of seconds (float).
551 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
552 # issue, os.utime() rounds towards minus infinity.
553 return (ns * 1e-9) + 0.5e-9
554
555 def test_utime_by_indexed(self):
556 # pass times as floating point seconds as the second indexed parameter
557 def set_time(filename, ns):
558 atime_ns, mtime_ns = ns
559 atime = self.ns_to_sec(atime_ns)
560 mtime = self.ns_to_sec(mtime_ns)
561 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
562 # or utime(time_t)
563 os.utime(filename, (atime, mtime))
564 self._test_utime(set_time)
565
566 def test_utime_by_times(self):
567 def set_time(filename, ns):
568 atime_ns, mtime_ns = ns
569 atime = self.ns_to_sec(atime_ns)
570 mtime = self.ns_to_sec(mtime_ns)
571 # test the times keyword parameter
572 os.utime(filename, times=(atime, mtime))
573 self._test_utime(set_time)
574
575 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
576 "follow_symlinks support for utime required "
577 "for this test.")
578 def test_utime_nofollow_symlinks(self):
579 def set_time(filename, ns):
580 # use follow_symlinks=False to test utimensat(timespec)
581 # or lutimes(timeval)
582 os.utime(filename, ns=ns, follow_symlinks=False)
583 self._test_utime(set_time)
584
585 @unittest.skipUnless(os.utime in os.supports_fd,
586 "fd support for utime required for this test.")
587 def test_utime_fd(self):
588 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100589 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200590 # use a file descriptor to test futimens(timespec)
591 # or futimes(timeval)
592 os.utime(fp.fileno(), ns=ns)
593 self._test_utime(set_time)
594
595 @unittest.skipUnless(os.utime in os.supports_dir_fd,
596 "dir_fd support for utime required for this test.")
597 def test_utime_dir_fd(self):
598 def set_time(filename, ns):
599 dirname, name = os.path.split(filename)
600 dirfd = os.open(dirname, os.O_RDONLY)
601 try:
602 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
603 os.utime(name, dir_fd=dirfd, ns=ns)
604 finally:
605 os.close(dirfd)
606 self._test_utime(set_time)
607
608 def test_utime_directory(self):
609 def set_time(filename, ns):
610 # test calling os.utime() on a directory
611 os.utime(filename, ns=ns)
612 self._test_utime(set_time, filename=self.dirname)
613
614 def _test_utime_current(self, set_time):
615 # Get the system clock
616 current = time.time()
617
618 # Call os.utime() to set the timestamp to the current system clock
619 set_time(self.fname)
620
621 if not self.support_subsecond(self.fname):
622 delta = 1.0
623 else:
624 # On Windows, the usual resolution of time.time() is 15.6 ms
625 delta = 0.020
626 st = os.stat(self.fname)
627 msg = ("st_time=%r, current=%r, dt=%r"
628 % (st.st_mtime, current, st.st_mtime - current))
629 self.assertAlmostEqual(st.st_mtime, current,
630 delta=delta, msg=msg)
631
632 def test_utime_current(self):
633 def set_time(filename):
634 # Set to the current time in the new way
635 os.utime(self.fname)
636 self._test_utime_current(set_time)
637
638 def test_utime_current_old(self):
639 def set_time(filename):
640 # Set to the current time in the old explicit way.
641 os.utime(self.fname, None)
642 self._test_utime_current(set_time)
643
644 def get_file_system(self, path):
645 if sys.platform == 'win32':
646 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
647 import ctypes
648 kernel32 = ctypes.windll.kernel32
649 buf = ctypes.create_unicode_buffer("", 100)
650 ok = kernel32.GetVolumeInformationW(root, None, 0,
651 None, None, None,
652 buf, len(buf))
653 if ok:
654 return buf.value
655 # return None if the filesystem is unknown
656
657 def test_large_time(self):
658 # Many filesystems are limited to the year 2038. At least, the test
659 # pass with NTFS filesystem.
660 if self.get_file_system(self.dirname) != "NTFS":
661 self.skipTest("requires NTFS")
662
663 large = 5000000000 # some day in 2128
664 os.utime(self.fname, (large, large))
665 self.assertEqual(os.stat(self.fname).st_mtime, large)
666
667 def test_utime_invalid_arguments(self):
668 # seconds and nanoseconds parameters are mutually exclusive
669 with self.assertRaises(ValueError):
670 os.utime(self.fname, (5, 5), ns=(5, 5))
671
672
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000673from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000674
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000675class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000676 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000677 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000678
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000679 def setUp(self):
680 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000681 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000682 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000683 for key, value in self._reference().items():
684 os.environ[key] = value
685
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000686 def tearDown(self):
687 os.environ.clear()
688 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000689 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000690 os.environb.clear()
691 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000692
Christian Heimes90333392007-11-01 19:08:42 +0000693 def _reference(self):
694 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
695
696 def _empty_mapping(self):
697 os.environ.clear()
698 return os.environ
699
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000700 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200701 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
702 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000703 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000704 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300705 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200706 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300707 value = popen.read().strip()
708 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000709
Xavier de Gayed1415312016-07-22 12:15:29 +0200710 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
711 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000712 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200713 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
714 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300715 it = iter(popen)
716 self.assertEqual(next(it), "line1\n")
717 self.assertEqual(next(it), "line2\n")
718 self.assertEqual(next(it), "line3\n")
719 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000720
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000721 # Verify environ keys and values from the OS are of the
722 # correct str type.
723 def test_keyvalue_types(self):
724 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000725 self.assertEqual(type(key), str)
726 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000727
Christian Heimes90333392007-11-01 19:08:42 +0000728 def test_items(self):
729 for key, value in self._reference().items():
730 self.assertEqual(os.environ.get(key), value)
731
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000732 # Issue 7310
733 def test___repr__(self):
734 """Check that the repr() of os.environ looks like environ({...})."""
735 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000736 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
737 '{!r}: {!r}'.format(key, value)
738 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000739
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000740 def test_get_exec_path(self):
741 defpath_list = os.defpath.split(os.pathsep)
742 test_path = ['/monty', '/python', '', '/flying/circus']
743 test_env = {'PATH': os.pathsep.join(test_path)}
744
745 saved_environ = os.environ
746 try:
747 os.environ = dict(test_env)
748 # Test that defaulting to os.environ works.
749 self.assertSequenceEqual(test_path, os.get_exec_path())
750 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
751 finally:
752 os.environ = saved_environ
753
754 # No PATH environment variable
755 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
756 # Empty PATH environment variable
757 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
758 # Supplied PATH environment variable
759 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
760
Victor Stinnerb745a742010-05-18 17:17:23 +0000761 if os.supports_bytes_environ:
762 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000763 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000764 # ignore BytesWarning warning
765 with warnings.catch_warnings(record=True):
766 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000767 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000768 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000769 pass
770 else:
771 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000772
773 # bytes key and/or value
774 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
775 ['abc'])
776 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
777 ['abc'])
778 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
779 ['abc'])
780
781 @unittest.skipUnless(os.supports_bytes_environ,
782 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000783 def test_environb(self):
784 # os.environ -> os.environb
785 value = 'euro\u20ac'
786 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000787 value_bytes = value.encode(sys.getfilesystemencoding(),
788 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000789 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000790 msg = "U+20AC character is not encodable to %s" % (
791 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000792 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000793 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000794 self.assertEqual(os.environ['unicode'], value)
795 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000796
797 # os.environb -> os.environ
798 value = b'\xff'
799 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000800 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000801 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000802 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000803
Charles-François Natali2966f102011-11-26 11:32:46 +0100804 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
805 # #13415).
806 @support.requires_freebsd_version(7)
807 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100808 def test_unset_error(self):
809 if sys.platform == "win32":
810 # an environment variable is limited to 32,767 characters
811 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100812 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100813 else:
814 # "=" is not allowed in a variable name
815 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100816 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100817
Victor Stinner6d101392013-04-14 16:35:04 +0200818 def test_key_type(self):
819 missing = 'missingkey'
820 self.assertNotIn(missing, os.environ)
821
Victor Stinner839e5ea2013-04-14 16:43:03 +0200822 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200823 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200824 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200825 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200826
Victor Stinner839e5ea2013-04-14 16:43:03 +0200827 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200828 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200829 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200830 self.assertTrue(cm.exception.__suppress_context__)
831
Victor Stinner6d101392013-04-14 16:35:04 +0200832
Tim Petersc4e09402003-04-25 07:11:48 +0000833class WalkTests(unittest.TestCase):
834 """Tests for os.walk()."""
835
Victor Stinner0561c532015-03-12 10:28:24 +0100836 # Wrapper to hide minor differences between os.walk and os.fwalk
837 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200838 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200839 if 'follow_symlinks' in kwargs:
840 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200841 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100842
Charles-François Natali7372b062012-02-05 15:15:38 +0100843 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100844 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100845 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000846
847 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000848 # TESTFN/
849 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000850 # tmp1
851 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000852 # tmp2
853 # SUB11/ no kids
854 # SUB2/ a file kid and a dirsymlink kid
855 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300856 # SUB21/ not readable
857 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000858 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200859 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300860 # broken_link2
861 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000862 # TEST2/
863 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100864 self.walk_path = join(support.TESTFN, "TEST1")
865 self.sub1_path = join(self.walk_path, "SUB1")
866 self.sub11_path = join(self.sub1_path, "SUB11")
867 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300868 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100869 tmp1_path = join(self.walk_path, "tmp1")
870 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000871 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300872 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100873 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000874 t2_path = join(support.TESTFN, "TEST2")
875 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200876 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300877 broken_link2_path = join(sub2_path, "broken_link2")
878 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000879
880 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100881 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000882 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300883 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000884 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100885
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300886 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100887 with open(path, "x") as f:
888 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000889
Victor Stinner0561c532015-03-12 10:28:24 +0100890 if support.can_symlink():
891 os.symlink(os.path.abspath(t2_path), self.link_path)
892 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300893 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
894 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300895 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300896 ["broken_link", "broken_link2", "broken_link3",
897 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100898 else:
899 self.sub2_tree = (sub2_path, [], ["tmp3"])
900
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300901 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300902 try:
903 os.listdir(sub21_path)
904 except PermissionError:
905 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
906 else:
907 os.chmod(sub21_path, stat.S_IRWXU)
908 os.unlink(tmp5_path)
909 os.rmdir(sub21_path)
910 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300911
Victor Stinner0561c532015-03-12 10:28:24 +0100912 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000913 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300914 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100915
Tim Petersc4e09402003-04-25 07:11:48 +0000916 self.assertEqual(len(all), 4)
917 # We can't know which order SUB1 and SUB2 will appear in.
918 # Not flipped: TESTFN, SUB1, SUB11, SUB2
919 # flipped: TESTFN, SUB2, SUB1, SUB11
920 flipped = all[0][1][0] != "SUB1"
921 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200922 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300923 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100924 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
925 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
926 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
927 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000928
Brett Cannon3f9183b2016-08-26 14:44:48 -0700929 def test_walk_prune(self, walk_path=None):
930 if walk_path is None:
931 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000932 # Prune the search.
933 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700934 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000935 all.append((root, dirs, files))
936 # Don't descend into SUB1.
937 if 'SUB1' in dirs:
938 # Note that this also mutates the dirs we appended to all!
939 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000940
Victor Stinner0561c532015-03-12 10:28:24 +0100941 self.assertEqual(len(all), 2)
942 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700943 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100944
945 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300946 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100947 self.assertEqual(all[1], self.sub2_tree)
948
Brett Cannon3f9183b2016-08-26 14:44:48 -0700949 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700950 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700951
Victor Stinner0561c532015-03-12 10:28:24 +0100952 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000953 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100954 all = list(self.walk(self.walk_path, topdown=False))
955
Victor Stinner53b0a412016-03-26 01:12:36 +0100956 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000957 # We can't know which order SUB1 and SUB2 will appear in.
958 # Not flipped: SUB11, SUB1, SUB2, TESTFN
959 # flipped: SUB2, SUB11, SUB1, TESTFN
960 flipped = all[3][1][0] != "SUB1"
961 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200962 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300963 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100964 self.assertEqual(all[3],
965 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
966 self.assertEqual(all[flipped],
967 (self.sub11_path, [], []))
968 self.assertEqual(all[flipped + 1],
969 (self.sub1_path, ["SUB11"], ["tmp2"]))
970 self.assertEqual(all[2 - 2 * flipped],
971 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000972
Victor Stinner0561c532015-03-12 10:28:24 +0100973 def test_walk_symlink(self):
974 if not support.can_symlink():
975 self.skipTest("need symlink support")
976
977 # Walk, following symlinks.
978 walk_it = self.walk(self.walk_path, follow_symlinks=True)
979 for root, dirs, files in walk_it:
980 if root == self.link_path:
981 self.assertEqual(dirs, [])
982 self.assertEqual(files, ["tmp4"])
983 break
984 else:
985 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000986
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200987 def test_walk_bad_dir(self):
988 # Walk top-down.
989 errors = []
990 walk_it = self.walk(self.walk_path, onerror=errors.append)
991 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300992 self.assertEqual(errors, [])
993 dir1 = 'SUB1'
994 path1 = os.path.join(root, dir1)
995 path1new = os.path.join(root, dir1 + '.new')
996 os.rename(path1, path1new)
997 try:
998 roots = [r for r, d, f in walk_it]
999 self.assertTrue(errors)
1000 self.assertNotIn(path1, roots)
1001 self.assertNotIn(path1new, roots)
1002 for dir2 in dirs:
1003 if dir2 != dir1:
1004 self.assertIn(os.path.join(root, dir2), roots)
1005 finally:
1006 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001007
Charles-François Natali7372b062012-02-05 15:15:38 +01001008
1009@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1010class FwalkTests(WalkTests):
1011 """Tests for os.fwalk()."""
1012
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001013 def walk(self, top, **kwargs):
1014 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001015 yield (root, dirs, files)
1016
Larry Hastingsc48fe982012-06-25 04:49:05 -07001017 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1018 """
1019 compare with walk() results.
1020 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001021 walk_kwargs = walk_kwargs.copy()
1022 fwalk_kwargs = fwalk_kwargs.copy()
1023 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1024 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1025 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001026
Charles-François Natali7372b062012-02-05 15:15:38 +01001027 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001028 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001029 expected[root] = (set(dirs), set(files))
1030
Larry Hastingsc48fe982012-06-25 04:49:05 -07001031 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001032 self.assertIn(root, expected)
1033 self.assertEqual(expected[root], (set(dirs), set(files)))
1034
Larry Hastingsc48fe982012-06-25 04:49:05 -07001035 def test_compare_to_walk(self):
1036 kwargs = {'top': support.TESTFN}
1037 self._compare_to_walk(kwargs, kwargs)
1038
Charles-François Natali7372b062012-02-05 15:15:38 +01001039 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001040 try:
1041 fd = os.open(".", os.O_RDONLY)
1042 walk_kwargs = {'top': support.TESTFN}
1043 fwalk_kwargs = walk_kwargs.copy()
1044 fwalk_kwargs['dir_fd'] = fd
1045 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1046 finally:
1047 os.close(fd)
1048
1049 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001050 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001051 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1052 args = support.TESTFN, topdown, None
1053 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001054 # check that the FD is valid
1055 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001056 # redundant check
1057 os.stat(rootfd)
1058 # check that listdir() returns consistent information
1059 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001060
1061 def test_fd_leak(self):
1062 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1063 # we both check that calling fwalk() a large number of times doesn't
1064 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1065 minfd = os.dup(1)
1066 os.close(minfd)
1067 for i in range(256):
1068 for x in os.fwalk(support.TESTFN):
1069 pass
1070 newfd = os.dup(1)
1071 self.addCleanup(os.close, newfd)
1072 self.assertEqual(newfd, minfd)
1073
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001074class BytesWalkTests(WalkTests):
1075 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001076 def setUp(self):
1077 super().setUp()
1078 self.stack = contextlib.ExitStack()
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001079
1080 def tearDown(self):
1081 self.stack.close()
1082 super().tearDown()
1083
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001084 def walk(self, top, **kwargs):
1085 if 'follow_symlinks' in kwargs:
1086 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1087 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1088 root = os.fsdecode(broot)
1089 dirs = list(map(os.fsdecode, bdirs))
1090 files = list(map(os.fsdecode, bfiles))
1091 yield (root, dirs, files)
1092 bdirs[:] = list(map(os.fsencode, dirs))
1093 bfiles[:] = list(map(os.fsencode, files))
1094
Charles-François Natali7372b062012-02-05 15:15:38 +01001095
Guido van Rossume7ba4952007-06-06 23:52:48 +00001096class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001097 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001098 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001099
1100 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001101 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001102 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1103 os.makedirs(path) # Should work
1104 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1105 os.makedirs(path)
1106
1107 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001108 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1110 os.makedirs(path)
1111 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1112 'dir5', 'dir6')
1113 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001114
Terry Reedy5a22b652010-12-02 07:05:56 +00001115 def test_exist_ok_existing_directory(self):
1116 path = os.path.join(support.TESTFN, 'dir1')
1117 mode = 0o777
1118 old_mask = os.umask(0o022)
1119 os.makedirs(path, mode)
1120 self.assertRaises(OSError, os.makedirs, path, mode)
1121 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001122 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001123 os.makedirs(path, mode=mode, exist_ok=True)
1124 os.umask(old_mask)
1125
Martin Pantera82642f2015-11-19 04:48:44 +00001126 # Issue #25583: A drive root could raise PermissionError on Windows
1127 os.makedirs(os.path.abspath('/'), exist_ok=True)
1128
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001129 def test_exist_ok_s_isgid_directory(self):
1130 path = os.path.join(support.TESTFN, 'dir1')
1131 S_ISGID = stat.S_ISGID
1132 mode = 0o777
1133 old_mask = os.umask(0o022)
1134 try:
1135 existing_testfn_mode = stat.S_IMODE(
1136 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001137 try:
1138 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001139 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001140 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001141 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1142 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1143 # The os should apply S_ISGID from the parent dir for us, but
1144 # this test need not depend on that behavior. Be explicit.
1145 os.makedirs(path, mode | S_ISGID)
1146 # http://bugs.python.org/issue14992
1147 # Should not fail when the bit is already set.
1148 os.makedirs(path, mode, exist_ok=True)
1149 # remove the bit.
1150 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001151 # May work even when the bit is not already set when demanded.
1152 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001153 finally:
1154 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001155
1156 def test_exist_ok_existing_regular_file(self):
1157 base = support.TESTFN
1158 path = os.path.join(support.TESTFN, 'dir1')
1159 f = open(path, 'w')
1160 f.write('abc')
1161 f.close()
1162 self.assertRaises(OSError, os.makedirs, path)
1163 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1164 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1165 os.remove(path)
1166
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001167 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001168 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001169 'dir4', 'dir5', 'dir6')
1170 # If the tests failed, the bottom-most directory ('../dir6')
1171 # may not have been created, so we look for the outermost directory
1172 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001173 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001174 path = os.path.dirname(path)
1175
1176 os.removedirs(path)
1177
Andrew Svetlov405faed2012-12-25 12:18:09 +02001178
R David Murrayf2ad1732014-12-25 18:36:56 -05001179@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1180class ChownFileTests(unittest.TestCase):
1181
Berker Peksag036a71b2015-07-21 09:29:48 +03001182 @classmethod
1183 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001184 os.mkdir(support.TESTFN)
1185
1186 def test_chown_uid_gid_arguments_must_be_index(self):
1187 stat = os.stat(support.TESTFN)
1188 uid = stat.st_uid
1189 gid = stat.st_gid
1190 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1191 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1192 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1193 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1194 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1195
1196 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1197 def test_chown(self):
1198 gid_1, gid_2 = groups[:2]
1199 uid = os.stat(support.TESTFN).st_uid
1200 os.chown(support.TESTFN, uid, gid_1)
1201 gid = os.stat(support.TESTFN).st_gid
1202 self.assertEqual(gid, gid_1)
1203 os.chown(support.TESTFN, uid, gid_2)
1204 gid = os.stat(support.TESTFN).st_gid
1205 self.assertEqual(gid, gid_2)
1206
1207 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1208 "test needs root privilege and more than one user")
1209 def test_chown_with_root(self):
1210 uid_1, uid_2 = all_users[:2]
1211 gid = os.stat(support.TESTFN).st_gid
1212 os.chown(support.TESTFN, uid_1, gid)
1213 uid = os.stat(support.TESTFN).st_uid
1214 self.assertEqual(uid, uid_1)
1215 os.chown(support.TESTFN, uid_2, gid)
1216 uid = os.stat(support.TESTFN).st_uid
1217 self.assertEqual(uid, uid_2)
1218
1219 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1220 "test needs non-root account and more than one user")
1221 def test_chown_without_permission(self):
1222 uid_1, uid_2 = all_users[:2]
1223 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001224 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001225 os.chown(support.TESTFN, uid_1, gid)
1226 os.chown(support.TESTFN, uid_2, gid)
1227
Berker Peksag036a71b2015-07-21 09:29:48 +03001228 @classmethod
1229 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001230 os.rmdir(support.TESTFN)
1231
1232
Andrew Svetlov405faed2012-12-25 12:18:09 +02001233class RemoveDirsTests(unittest.TestCase):
1234 def setUp(self):
1235 os.makedirs(support.TESTFN)
1236
1237 def tearDown(self):
1238 support.rmtree(support.TESTFN)
1239
1240 def test_remove_all(self):
1241 dira = os.path.join(support.TESTFN, 'dira')
1242 os.mkdir(dira)
1243 dirb = os.path.join(dira, 'dirb')
1244 os.mkdir(dirb)
1245 os.removedirs(dirb)
1246 self.assertFalse(os.path.exists(dirb))
1247 self.assertFalse(os.path.exists(dira))
1248 self.assertFalse(os.path.exists(support.TESTFN))
1249
1250 def test_remove_partial(self):
1251 dira = os.path.join(support.TESTFN, 'dira')
1252 os.mkdir(dira)
1253 dirb = os.path.join(dira, 'dirb')
1254 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001255 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001256 os.removedirs(dirb)
1257 self.assertFalse(os.path.exists(dirb))
1258 self.assertTrue(os.path.exists(dira))
1259 self.assertTrue(os.path.exists(support.TESTFN))
1260
1261 def test_remove_nothing(self):
1262 dira = os.path.join(support.TESTFN, 'dira')
1263 os.mkdir(dira)
1264 dirb = os.path.join(dira, 'dirb')
1265 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001266 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001267 with self.assertRaises(OSError):
1268 os.removedirs(dirb)
1269 self.assertTrue(os.path.exists(dirb))
1270 self.assertTrue(os.path.exists(dira))
1271 self.assertTrue(os.path.exists(support.TESTFN))
1272
1273
Guido van Rossume7ba4952007-06-06 23:52:48 +00001274class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001275 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001276 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001277 f.write(b'hello')
1278 f.close()
1279 with open(os.devnull, 'rb') as f:
1280 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001281
Andrew Svetlov405faed2012-12-25 12:18:09 +02001282
Guido van Rossume7ba4952007-06-06 23:52:48 +00001283class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001284 def test_urandom_length(self):
1285 self.assertEqual(len(os.urandom(0)), 0)
1286 self.assertEqual(len(os.urandom(1)), 1)
1287 self.assertEqual(len(os.urandom(10)), 10)
1288 self.assertEqual(len(os.urandom(100)), 100)
1289 self.assertEqual(len(os.urandom(1000)), 1000)
1290
1291 def test_urandom_value(self):
1292 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001293 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001294 data2 = os.urandom(16)
1295 self.assertNotEqual(data1, data2)
1296
1297 def get_urandom_subprocess(self, count):
1298 code = '\n'.join((
1299 'import os, sys',
1300 'data = os.urandom(%s)' % count,
1301 'sys.stdout.buffer.write(data)',
1302 'sys.stdout.buffer.flush()'))
1303 out = assert_python_ok('-c', code)
1304 stdout = out[1]
1305 self.assertEqual(len(stdout), 16)
1306 return stdout
1307
1308 def test_urandom_subprocess(self):
1309 data1 = self.get_urandom_subprocess(16)
1310 data2 = self.get_urandom_subprocess(16)
1311 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001312
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001313
Victor Stinner9b1f4742016-09-06 16:18:52 -07001314@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1315class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001316 @classmethod
1317 def setUpClass(cls):
1318 try:
1319 os.getrandom(1)
1320 except OSError as exc:
1321 if exc.errno == errno.ENOSYS:
1322 # Python compiled on a more recent Linux version
1323 # than the current Linux kernel
1324 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1325 else:
1326 raise
1327
Victor Stinner9b1f4742016-09-06 16:18:52 -07001328 def test_getrandom_type(self):
1329 data = os.getrandom(16)
1330 self.assertIsInstance(data, bytes)
1331 self.assertEqual(len(data), 16)
1332
1333 def test_getrandom0(self):
1334 empty = os.getrandom(0)
1335 self.assertEqual(empty, b'')
1336
1337 def test_getrandom_random(self):
1338 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1339
1340 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1341 # resource /dev/random
1342
1343 def test_getrandom_nonblock(self):
1344 # The call must not fail. Check also that the flag exists
1345 try:
1346 os.getrandom(1, os.GRND_NONBLOCK)
1347 except BlockingIOError:
1348 # System urandom is not initialized yet
1349 pass
1350
1351 def test_getrandom_value(self):
1352 data1 = os.getrandom(16)
1353 data2 = os.getrandom(16)
1354 self.assertNotEqual(data1, data2)
1355
1356
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001357# os.urandom() doesn't use a file descriptor when it is implemented with the
1358# getentropy() function, the getrandom() function or the getrandom() syscall
1359OS_URANDOM_DONT_USE_FD = (
1360 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1361 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1362 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001363
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001364@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1365 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001366class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001367 @unittest.skipUnless(resource, "test requires the resource module")
1368 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001369 # Check urandom() failing when it is not able to open /dev/random.
1370 # We spawn a new process to make the test more robust (if getrlimit()
1371 # failed to restore the file descriptor limit after this, the whole
1372 # test suite would crash; this actually happened on the OS X Tiger
1373 # buildbot).
1374 code = """if 1:
1375 import errno
1376 import os
1377 import resource
1378
1379 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1380 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1381 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001382 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001383 except OSError as e:
1384 assert e.errno == errno.EMFILE, e.errno
1385 else:
1386 raise AssertionError("OSError not raised")
1387 """
1388 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001389
Antoine Pitroue472aea2014-04-26 14:33:03 +02001390 def test_urandom_fd_closed(self):
1391 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1392 # closed.
1393 code = """if 1:
1394 import os
1395 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001396 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001397 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001398 with test.support.SuppressCrashReport():
1399 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001400 sys.stdout.buffer.write(os.urandom(4))
1401 """
1402 rc, out, err = assert_python_ok('-Sc', code)
1403
1404 def test_urandom_fd_reopened(self):
1405 # Issue #21207: urandom() should detect its fd to /dev/urandom
1406 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001407 self.addCleanup(support.unlink, support.TESTFN)
1408 create_file(support.TESTFN, b"x" * 256)
1409
Antoine Pitroue472aea2014-04-26 14:33:03 +02001410 code = """if 1:
1411 import os
1412 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001413 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001414 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001415 with test.support.SuppressCrashReport():
1416 for fd in range(3, 256):
1417 try:
1418 os.close(fd)
1419 except OSError:
1420 pass
1421 else:
1422 # Found the urandom fd (XXX hopefully)
1423 break
1424 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001425 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001426 new_fd = f.fileno()
1427 # Issue #26935: posix allows new_fd and fd to be equal but
1428 # some libc implementations have dup2 return an error in this
1429 # case.
1430 if new_fd != fd:
1431 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001432 sys.stdout.buffer.write(os.urandom(4))
1433 sys.stdout.buffer.write(os.urandom(4))
1434 """.format(TESTFN=support.TESTFN)
1435 rc, out, err = assert_python_ok('-Sc', code)
1436 self.assertEqual(len(out), 8)
1437 self.assertNotEqual(out[0:4], out[4:8])
1438 rc, out2, err2 = assert_python_ok('-Sc', code)
1439 self.assertEqual(len(out2), 8)
1440 self.assertNotEqual(out2, out)
1441
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001442
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001443@contextlib.contextmanager
1444def _execvpe_mockup(defpath=None):
1445 """
1446 Stubs out execv and execve functions when used as context manager.
1447 Records exec calls. The mock execv and execve functions always raise an
1448 exception as they would normally never return.
1449 """
1450 # A list of tuples containing (function name, first arg, args)
1451 # of calls to execv or execve that have been made.
1452 calls = []
1453
1454 def mock_execv(name, *args):
1455 calls.append(('execv', name, args))
1456 raise RuntimeError("execv called")
1457
1458 def mock_execve(name, *args):
1459 calls.append(('execve', name, args))
1460 raise OSError(errno.ENOTDIR, "execve called")
1461
1462 try:
1463 orig_execv = os.execv
1464 orig_execve = os.execve
1465 orig_defpath = os.defpath
1466 os.execv = mock_execv
1467 os.execve = mock_execve
1468 if defpath is not None:
1469 os.defpath = defpath
1470 yield calls
1471 finally:
1472 os.execv = orig_execv
1473 os.execve = orig_execve
1474 os.defpath = orig_defpath
1475
Victor Stinner4659ccf2016-09-14 10:57:00 +02001476
Guido van Rossume7ba4952007-06-06 23:52:48 +00001477class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001478 @unittest.skipIf(USING_LINUXTHREADS,
1479 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001480 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001481 self.assertRaises(OSError, os.execvpe, 'no such app-',
1482 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001483
Steve Dowerbce26262016-11-19 19:17:26 -08001484 def test_execv_with_bad_arglist(self):
1485 self.assertRaises(ValueError, os.execv, 'notepad', ())
1486 self.assertRaises(ValueError, os.execv, 'notepad', [])
1487 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1488 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1489
Thomas Heller6790d602007-08-30 17:15:14 +00001490 def test_execvpe_with_bad_arglist(self):
1491 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001492 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1493 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001494
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001495 @unittest.skipUnless(hasattr(os, '_execvpe'),
1496 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001497 def _test_internal_execvpe(self, test_type):
1498 program_path = os.sep + 'absolutepath'
1499 if test_type is bytes:
1500 program = b'executable'
1501 fullpath = os.path.join(os.fsencode(program_path), program)
1502 native_fullpath = fullpath
1503 arguments = [b'progname', 'arg1', 'arg2']
1504 else:
1505 program = 'executable'
1506 arguments = ['progname', 'arg1', 'arg2']
1507 fullpath = os.path.join(program_path, program)
1508 if os.name != "nt":
1509 native_fullpath = os.fsencode(fullpath)
1510 else:
1511 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001512 env = {'spam': 'beans'}
1513
Victor Stinnerb745a742010-05-18 17:17:23 +00001514 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001515 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001516 self.assertRaises(RuntimeError,
1517 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001518 self.assertEqual(len(calls), 1)
1519 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1520
Victor Stinnerb745a742010-05-18 17:17:23 +00001521 # test os._execvpe() with a relative path:
1522 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001523 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001524 self.assertRaises(OSError,
1525 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001526 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001527 self.assertSequenceEqual(calls[0],
1528 ('execve', native_fullpath, (arguments, env)))
1529
1530 # test os._execvpe() with a relative path:
1531 # os.get_exec_path() reads the 'PATH' variable
1532 with _execvpe_mockup() as calls:
1533 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001534 if test_type is bytes:
1535 env_path[b'PATH'] = program_path
1536 else:
1537 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001538 self.assertRaises(OSError,
1539 os._execvpe, program, arguments, env=env_path)
1540 self.assertEqual(len(calls), 1)
1541 self.assertSequenceEqual(calls[0],
1542 ('execve', native_fullpath, (arguments, env_path)))
1543
1544 def test_internal_execvpe_str(self):
1545 self._test_internal_execvpe(str)
1546 if os.name != "nt":
1547 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001548
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001549
Serhiy Storchaka43767632013-11-03 21:31:38 +02001550@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001551class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001552 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001553 try:
1554 os.stat(support.TESTFN)
1555 except FileNotFoundError:
1556 exists = False
1557 except OSError as exc:
1558 exists = True
1559 self.fail("file %s must not exist; os.stat failed with %s"
1560 % (support.TESTFN, exc))
1561 else:
1562 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001563
Thomas Wouters477c8d52006-05-27 19:21:47 +00001564 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001565 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001566
1567 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001568 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001569
1570 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001571 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001572
1573 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001574 self.addCleanup(support.unlink, support.TESTFN)
1575
Victor Stinnere77c9742016-03-25 10:28:23 +01001576 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001577 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001578
1579 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001580 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001581
Thomas Wouters477c8d52006-05-27 19:21:47 +00001582 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001583 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001584
Victor Stinnere77c9742016-03-25 10:28:23 +01001585
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001586class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001587 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001588 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1589 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001590 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001591 def get_single(f):
1592 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001593 if hasattr(os, f):
1594 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001595 return helper
1596 for f in singles:
1597 locals()["test_"+f] = get_single(f)
1598
Benjamin Peterson7522c742009-01-19 21:00:09 +00001599 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001600 try:
1601 f(support.make_bad_fd(), *args)
1602 except OSError as e:
1603 self.assertEqual(e.errno, errno.EBADF)
1604 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001605 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001606 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001607
Serhiy Storchaka43767632013-11-03 21:31:38 +02001608 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001609 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001610 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001611
Serhiy Storchaka43767632013-11-03 21:31:38 +02001612 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001613 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001614 fd = support.make_bad_fd()
1615 # Make sure none of the descriptors we are about to close are
1616 # currently valid (issue 6542).
1617 for i in range(10):
1618 try: os.fstat(fd+i)
1619 except OSError:
1620 pass
1621 else:
1622 break
1623 if i < 2:
1624 raise unittest.SkipTest(
1625 "Unable to acquire a range of invalid file descriptors")
1626 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001627
Serhiy Storchaka43767632013-11-03 21:31:38 +02001628 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001629 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001630 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001631
Serhiy Storchaka43767632013-11-03 21:31:38 +02001632 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001633 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001634 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001635
Serhiy Storchaka43767632013-11-03 21:31:38 +02001636 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001637 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001638 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001639
Serhiy Storchaka43767632013-11-03 21:31:38 +02001640 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001641 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001642 self.check(os.pathconf, "PC_NAME_MAX")
1643 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001644
Serhiy Storchaka43767632013-11-03 21:31:38 +02001645 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001646 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001647 self.check(os.truncate, 0)
1648 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001649
Serhiy Storchaka43767632013-11-03 21:31:38 +02001650 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001651 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001653
Serhiy Storchaka43767632013-11-03 21:31:38 +02001654 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001655 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001656 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001657
Victor Stinner57ddf782014-01-08 15:21:28 +01001658 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1659 def test_readv(self):
1660 buf = bytearray(10)
1661 self.check(os.readv, [buf])
1662
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001666
Serhiy Storchaka43767632013-11-03 21:31:38 +02001667 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001668 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001669 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001670
Victor Stinner57ddf782014-01-08 15:21:28 +01001671 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1672 def test_writev(self):
1673 self.check(os.writev, [b'abc'])
1674
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001675 def test_inheritable(self):
1676 self.check(os.get_inheritable)
1677 self.check(os.set_inheritable, True)
1678
1679 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1680 'needs os.get_blocking() and os.set_blocking()')
1681 def test_blocking(self):
1682 self.check(os.get_blocking)
1683 self.check(os.set_blocking, True)
1684
Brian Curtin1b9df392010-11-24 20:24:31 +00001685
1686class LinkTests(unittest.TestCase):
1687 def setUp(self):
1688 self.file1 = support.TESTFN
1689 self.file2 = os.path.join(support.TESTFN + "2")
1690
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001691 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001692 for file in (self.file1, self.file2):
1693 if os.path.exists(file):
1694 os.unlink(file)
1695
Brian Curtin1b9df392010-11-24 20:24:31 +00001696 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001697 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001698
Steve Dowercc16be82016-09-08 10:35:16 -07001699 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001700 with open(file1, "r") as f1, open(file2, "r") as f2:
1701 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1702
1703 def test_link(self):
1704 self._test_link(self.file1, self.file2)
1705
1706 def test_link_bytes(self):
1707 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1708 bytes(self.file2, sys.getfilesystemencoding()))
1709
Brian Curtinf498b752010-11-30 15:54:04 +00001710 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001711 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001712 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001713 except UnicodeError:
1714 raise unittest.SkipTest("Unable to encode for this platform.")
1715
Brian Curtinf498b752010-11-30 15:54:04 +00001716 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001717 self.file2 = self.file1 + "2"
1718 self._test_link(self.file1, self.file2)
1719
Serhiy Storchaka43767632013-11-03 21:31:38 +02001720@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1721class PosixUidGidTests(unittest.TestCase):
1722 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1723 def test_setuid(self):
1724 if os.getuid() != 0:
1725 self.assertRaises(OSError, os.setuid, 0)
1726 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001727
Serhiy Storchaka43767632013-11-03 21:31:38 +02001728 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1729 def test_setgid(self):
1730 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1731 self.assertRaises(OSError, os.setgid, 0)
1732 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001733
Serhiy Storchaka43767632013-11-03 21:31:38 +02001734 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1735 def test_seteuid(self):
1736 if os.getuid() != 0:
1737 self.assertRaises(OSError, os.seteuid, 0)
1738 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001739
Serhiy Storchaka43767632013-11-03 21:31:38 +02001740 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1741 def test_setegid(self):
1742 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1743 self.assertRaises(OSError, os.setegid, 0)
1744 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001745
Serhiy Storchaka43767632013-11-03 21:31:38 +02001746 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1747 def test_setreuid(self):
1748 if os.getuid() != 0:
1749 self.assertRaises(OSError, os.setreuid, 0, 0)
1750 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1751 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001752
Serhiy Storchaka43767632013-11-03 21:31:38 +02001753 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1754 def test_setreuid_neg1(self):
1755 # Needs to accept -1. We run this in a subprocess to avoid
1756 # altering the test runner's process state (issue8045).
1757 subprocess.check_call([
1758 sys.executable, '-c',
1759 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001760
Serhiy Storchaka43767632013-11-03 21:31:38 +02001761 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1762 def test_setregid(self):
1763 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1764 self.assertRaises(OSError, os.setregid, 0, 0)
1765 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1766 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001767
Serhiy Storchaka43767632013-11-03 21:31:38 +02001768 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1769 def test_setregid_neg1(self):
1770 # Needs to accept -1. We run this in a subprocess to avoid
1771 # altering the test runner's process state (issue8045).
1772 subprocess.check_call([
1773 sys.executable, '-c',
1774 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001775
Serhiy Storchaka43767632013-11-03 21:31:38 +02001776@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1777class Pep383Tests(unittest.TestCase):
1778 def setUp(self):
1779 if support.TESTFN_UNENCODABLE:
1780 self.dir = support.TESTFN_UNENCODABLE
1781 elif support.TESTFN_NONASCII:
1782 self.dir = support.TESTFN_NONASCII
1783 else:
1784 self.dir = support.TESTFN
1785 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001786
Serhiy Storchaka43767632013-11-03 21:31:38 +02001787 bytesfn = []
1788 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001789 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001790 fn = os.fsencode(fn)
1791 except UnicodeEncodeError:
1792 return
1793 bytesfn.append(fn)
1794 add_filename(support.TESTFN_UNICODE)
1795 if support.TESTFN_UNENCODABLE:
1796 add_filename(support.TESTFN_UNENCODABLE)
1797 if support.TESTFN_NONASCII:
1798 add_filename(support.TESTFN_NONASCII)
1799 if not bytesfn:
1800 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001801
Serhiy Storchaka43767632013-11-03 21:31:38 +02001802 self.unicodefn = set()
1803 os.mkdir(self.dir)
1804 try:
1805 for fn in bytesfn:
1806 support.create_empty_file(os.path.join(self.bdir, fn))
1807 fn = os.fsdecode(fn)
1808 if fn in self.unicodefn:
1809 raise ValueError("duplicate filename")
1810 self.unicodefn.add(fn)
1811 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001812 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001813 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001814
Serhiy Storchaka43767632013-11-03 21:31:38 +02001815 def tearDown(self):
1816 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001817
Serhiy Storchaka43767632013-11-03 21:31:38 +02001818 def test_listdir(self):
1819 expected = self.unicodefn
1820 found = set(os.listdir(self.dir))
1821 self.assertEqual(found, expected)
1822 # test listdir without arguments
1823 current_directory = os.getcwd()
1824 try:
1825 os.chdir(os.sep)
1826 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1827 finally:
1828 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001829
Serhiy Storchaka43767632013-11-03 21:31:38 +02001830 def test_open(self):
1831 for fn in self.unicodefn:
1832 f = open(os.path.join(self.dir, fn), 'rb')
1833 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001834
Serhiy Storchaka43767632013-11-03 21:31:38 +02001835 @unittest.skipUnless(hasattr(os, 'statvfs'),
1836 "need os.statvfs()")
1837 def test_statvfs(self):
1838 # issue #9645
1839 for fn in self.unicodefn:
1840 # should not fail with file not found error
1841 fullname = os.path.join(self.dir, fn)
1842 os.statvfs(fullname)
1843
1844 def test_stat(self):
1845 for fn in self.unicodefn:
1846 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001847
Brian Curtineb24d742010-04-12 17:16:38 +00001848@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1849class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001850 def _kill(self, sig):
1851 # Start sys.executable as a subprocess and communicate from the
1852 # subprocess to the parent that the interpreter is ready. When it
1853 # becomes ready, send *sig* via os.kill to the subprocess and check
1854 # that the return code is equal to *sig*.
1855 import ctypes
1856 from ctypes import wintypes
1857 import msvcrt
1858
1859 # Since we can't access the contents of the process' stdout until the
1860 # process has exited, use PeekNamedPipe to see what's inside stdout
1861 # without waiting. This is done so we can tell that the interpreter
1862 # is started and running at a point where it could handle a signal.
1863 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1864 PeekNamedPipe.restype = wintypes.BOOL
1865 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1866 ctypes.POINTER(ctypes.c_char), # stdout buf
1867 wintypes.DWORD, # Buffer size
1868 ctypes.POINTER(wintypes.DWORD), # bytes read
1869 ctypes.POINTER(wintypes.DWORD), # bytes avail
1870 ctypes.POINTER(wintypes.DWORD)) # bytes left
1871 msg = "running"
1872 proc = subprocess.Popen([sys.executable, "-c",
1873 "import sys;"
1874 "sys.stdout.write('{}');"
1875 "sys.stdout.flush();"
1876 "input()".format(msg)],
1877 stdout=subprocess.PIPE,
1878 stderr=subprocess.PIPE,
1879 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001880 self.addCleanup(proc.stdout.close)
1881 self.addCleanup(proc.stderr.close)
1882 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001883
1884 count, max = 0, 100
1885 while count < max and proc.poll() is None:
1886 # Create a string buffer to store the result of stdout from the pipe
1887 buf = ctypes.create_string_buffer(len(msg))
1888 # Obtain the text currently in proc.stdout
1889 # Bytes read/avail/left are left as NULL and unused
1890 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1891 buf, ctypes.sizeof(buf), None, None, None)
1892 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1893 if buf.value:
1894 self.assertEqual(msg, buf.value.decode())
1895 break
1896 time.sleep(0.1)
1897 count += 1
1898 else:
1899 self.fail("Did not receive communication from the subprocess")
1900
Brian Curtineb24d742010-04-12 17:16:38 +00001901 os.kill(proc.pid, sig)
1902 self.assertEqual(proc.wait(), sig)
1903
1904 def test_kill_sigterm(self):
1905 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001906 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001907
1908 def test_kill_int(self):
1909 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001910 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001911
1912 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001913 tagname = "test_os_%s" % uuid.uuid1()
1914 m = mmap.mmap(-1, 1, tagname)
1915 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001916 # Run a script which has console control handling enabled.
1917 proc = subprocess.Popen([sys.executable,
1918 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001919 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001920 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1921 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001922 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001923 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001924 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001925 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001926 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001927 count += 1
1928 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001929 # Forcefully kill the process if we weren't able to signal it.
1930 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001931 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001932 os.kill(proc.pid, event)
1933 # proc.send_signal(event) could also be done here.
1934 # Allow time for the signal to be passed and the process to exit.
1935 time.sleep(0.5)
1936 if not proc.poll():
1937 # Forcefully kill the process if we weren't able to signal it.
1938 os.kill(proc.pid, signal.SIGINT)
1939 self.fail("subprocess did not stop on {}".format(name))
1940
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001941 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001942 def test_CTRL_C_EVENT(self):
1943 from ctypes import wintypes
1944 import ctypes
1945
1946 # Make a NULL value by creating a pointer with no argument.
1947 NULL = ctypes.POINTER(ctypes.c_int)()
1948 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1949 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1950 wintypes.BOOL)
1951 SetConsoleCtrlHandler.restype = wintypes.BOOL
1952
1953 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001954 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001955 # by subprocesses.
1956 SetConsoleCtrlHandler(NULL, 0)
1957
1958 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1959
1960 def test_CTRL_BREAK_EVENT(self):
1961 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1962
1963
Brian Curtind40e6f72010-07-08 21:39:08 +00001964@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001965class Win32ListdirTests(unittest.TestCase):
1966 """Test listdir on Windows."""
1967
1968 def setUp(self):
1969 self.created_paths = []
1970 for i in range(2):
1971 dir_name = 'SUB%d' % i
1972 dir_path = os.path.join(support.TESTFN, dir_name)
1973 file_name = 'FILE%d' % i
1974 file_path = os.path.join(support.TESTFN, file_name)
1975 os.makedirs(dir_path)
1976 with open(file_path, 'w') as f:
1977 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1978 self.created_paths.extend([dir_name, file_name])
1979 self.created_paths.sort()
1980
1981 def tearDown(self):
1982 shutil.rmtree(support.TESTFN)
1983
1984 def test_listdir_no_extended_path(self):
1985 """Test when the path is not an "extended" path."""
1986 # unicode
1987 self.assertEqual(
1988 sorted(os.listdir(support.TESTFN)),
1989 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001990
Tim Golden781bbeb2013-10-25 20:24:06 +01001991 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001992 self.assertEqual(
1993 sorted(os.listdir(os.fsencode(support.TESTFN))),
1994 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001995
1996 def test_listdir_extended_path(self):
1997 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001998 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001999 # unicode
2000 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2001 self.assertEqual(
2002 sorted(os.listdir(path)),
2003 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002004
Tim Golden781bbeb2013-10-25 20:24:06 +01002005 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002006 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2007 self.assertEqual(
2008 sorted(os.listdir(path)),
2009 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002010
2011
2012@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002013@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002014class Win32SymlinkTests(unittest.TestCase):
2015 filelink = 'filelinktest'
2016 filelink_target = os.path.abspath(__file__)
2017 dirlink = 'dirlinktest'
2018 dirlink_target = os.path.dirname(filelink_target)
2019 missing_link = 'missing link'
2020
2021 def setUp(self):
2022 assert os.path.exists(self.dirlink_target)
2023 assert os.path.exists(self.filelink_target)
2024 assert not os.path.exists(self.dirlink)
2025 assert not os.path.exists(self.filelink)
2026 assert not os.path.exists(self.missing_link)
2027
2028 def tearDown(self):
2029 if os.path.exists(self.filelink):
2030 os.remove(self.filelink)
2031 if os.path.exists(self.dirlink):
2032 os.rmdir(self.dirlink)
2033 if os.path.lexists(self.missing_link):
2034 os.remove(self.missing_link)
2035
2036 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002037 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002038 self.assertTrue(os.path.exists(self.dirlink))
2039 self.assertTrue(os.path.isdir(self.dirlink))
2040 self.assertTrue(os.path.islink(self.dirlink))
2041 self.check_stat(self.dirlink, self.dirlink_target)
2042
2043 def test_file_link(self):
2044 os.symlink(self.filelink_target, self.filelink)
2045 self.assertTrue(os.path.exists(self.filelink))
2046 self.assertTrue(os.path.isfile(self.filelink))
2047 self.assertTrue(os.path.islink(self.filelink))
2048 self.check_stat(self.filelink, self.filelink_target)
2049
2050 def _create_missing_dir_link(self):
2051 'Create a "directory" link to a non-existent target'
2052 linkname = self.missing_link
2053 if os.path.lexists(linkname):
2054 os.remove(linkname)
2055 target = r'c:\\target does not exist.29r3c740'
2056 assert not os.path.exists(target)
2057 target_is_dir = True
2058 os.symlink(target, linkname, target_is_dir)
2059
2060 def test_remove_directory_link_to_missing_target(self):
2061 self._create_missing_dir_link()
2062 # For compatibility with Unix, os.remove will check the
2063 # directory status and call RemoveDirectory if the symlink
2064 # was created with target_is_dir==True.
2065 os.remove(self.missing_link)
2066
2067 @unittest.skip("currently fails; consider for improvement")
2068 def test_isdir_on_directory_link_to_missing_target(self):
2069 self._create_missing_dir_link()
2070 # consider having isdir return true for directory links
2071 self.assertTrue(os.path.isdir(self.missing_link))
2072
2073 @unittest.skip("currently fails; consider for improvement")
2074 def test_rmdir_on_directory_link_to_missing_target(self):
2075 self._create_missing_dir_link()
2076 # consider allowing rmdir to remove directory links
2077 os.rmdir(self.missing_link)
2078
2079 def check_stat(self, link, target):
2080 self.assertEqual(os.stat(link), os.stat(target))
2081 self.assertNotEqual(os.lstat(link), os.stat(link))
2082
Brian Curtind25aef52011-06-13 15:16:04 -05002083 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002084 self.assertEqual(os.stat(bytes_link), os.stat(target))
2085 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002086
2087 def test_12084(self):
2088 level1 = os.path.abspath(support.TESTFN)
2089 level2 = os.path.join(level1, "level2")
2090 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002091 self.addCleanup(support.rmtree, level1)
2092
2093 os.mkdir(level1)
2094 os.mkdir(level2)
2095 os.mkdir(level3)
2096
2097 file1 = os.path.abspath(os.path.join(level1, "file1"))
2098 create_file(file1)
2099
2100 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002101 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002102 os.chdir(level2)
2103 link = os.path.join(level2, "link")
2104 os.symlink(os.path.relpath(file1), "link")
2105 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002106
Victor Stinnerae39d232016-03-24 17:12:55 +01002107 # Check os.stat calls from the same dir as the link
2108 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002109
Victor Stinnerae39d232016-03-24 17:12:55 +01002110 # Check os.stat calls from a dir below the link
2111 os.chdir(level1)
2112 self.assertEqual(os.stat(file1),
2113 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002114
Victor Stinnerae39d232016-03-24 17:12:55 +01002115 # Check os.stat calls from a dir above the link
2116 os.chdir(level3)
2117 self.assertEqual(os.stat(file1),
2118 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002119 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002120 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002121
Brian Curtind40e6f72010-07-08 21:39:08 +00002122
Tim Golden0321cf22014-05-05 19:46:17 +01002123@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2124class Win32JunctionTests(unittest.TestCase):
2125 junction = 'junctiontest'
2126 junction_target = os.path.dirname(os.path.abspath(__file__))
2127
2128 def setUp(self):
2129 assert os.path.exists(self.junction_target)
2130 assert not os.path.exists(self.junction)
2131
2132 def tearDown(self):
2133 if os.path.exists(self.junction):
2134 # os.rmdir delegates to Windows' RemoveDirectoryW,
2135 # which removes junction points safely.
2136 os.rmdir(self.junction)
2137
2138 def test_create_junction(self):
2139 _winapi.CreateJunction(self.junction_target, self.junction)
2140 self.assertTrue(os.path.exists(self.junction))
2141 self.assertTrue(os.path.isdir(self.junction))
2142
2143 # Junctions are not recognized as links.
2144 self.assertFalse(os.path.islink(self.junction))
2145
2146 def test_unlink_removes_junction(self):
2147 _winapi.CreateJunction(self.junction_target, self.junction)
2148 self.assertTrue(os.path.exists(self.junction))
2149
2150 os.unlink(self.junction)
2151 self.assertFalse(os.path.exists(self.junction))
2152
2153
Jason R. Coombs3a092862013-05-27 23:21:28 -04002154@support.skip_unless_symlink
2155class NonLocalSymlinkTests(unittest.TestCase):
2156
2157 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002158 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002159 Create this structure:
2160
2161 base
2162 \___ some_dir
2163 """
2164 os.makedirs('base/some_dir')
2165
2166 def tearDown(self):
2167 shutil.rmtree('base')
2168
2169 def test_directory_link_nonlocal(self):
2170 """
2171 The symlink target should resolve relative to the link, not relative
2172 to the current directory.
2173
2174 Then, link base/some_link -> base/some_dir and ensure that some_link
2175 is resolved as a directory.
2176
2177 In issue13772, it was discovered that directory detection failed if
2178 the symlink target was not specified relative to the current
2179 directory, which was a defect in the implementation.
2180 """
2181 src = os.path.join('base', 'some_link')
2182 os.symlink('some_dir', src)
2183 assert os.path.isdir(src)
2184
2185
Victor Stinnere8d51452010-08-19 01:05:19 +00002186class FSEncodingTests(unittest.TestCase):
2187 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002188 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2189 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002190
Victor Stinnere8d51452010-08-19 01:05:19 +00002191 def test_identity(self):
2192 # assert fsdecode(fsencode(x)) == x
2193 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2194 try:
2195 bytesfn = os.fsencode(fn)
2196 except UnicodeEncodeError:
2197 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002199
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002200
Brett Cannonefb00c02012-02-29 18:31:31 -05002201
2202class DeviceEncodingTests(unittest.TestCase):
2203
2204 def test_bad_fd(self):
2205 # Return None when an fd doesn't actually exist.
2206 self.assertIsNone(os.device_encoding(123456))
2207
Philip Jenveye308b7c2012-02-29 16:16:15 -08002208 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2209 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002210 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002211 def test_device_encoding(self):
2212 encoding = os.device_encoding(0)
2213 self.assertIsNotNone(encoding)
2214 self.assertTrue(codecs.lookup(encoding))
2215
2216
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002217class PidTests(unittest.TestCase):
2218 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2219 def test_getppid(self):
2220 p = subprocess.Popen([sys.executable, '-c',
2221 'import os; print(os.getppid())'],
2222 stdout=subprocess.PIPE)
2223 stdout, _ = p.communicate()
2224 # We are the parent of our subprocess
2225 self.assertEqual(int(stdout), os.getpid())
2226
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002227 def test_waitpid(self):
2228 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002229 # Add an implicit test for PyUnicode_FSConverter().
2230 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002231 status = os.waitpid(pid, 0)
2232 self.assertEqual(status, (pid, 0))
2233
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002234
Victor Stinner4659ccf2016-09-14 10:57:00 +02002235class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002236 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002237 self.exitcode = 17
2238
2239 filename = support.TESTFN
2240 self.addCleanup(support.unlink, filename)
2241
2242 if not with_env:
2243 code = 'import sys; sys.exit(%s)' % self.exitcode
2244 else:
2245 self.env = dict(os.environ)
2246 # create an unique key
2247 self.key = str(uuid.uuid4())
2248 self.env[self.key] = self.key
2249 # read the variable from os.environ to check that it exists
2250 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2251 % (self.key, self.exitcode))
2252
2253 with open(filename, "w") as fp:
2254 fp.write(code)
2255
Berker Peksag81816462016-09-15 20:19:47 +03002256 args = [sys.executable, filename]
2257 if use_bytes:
2258 args = [os.fsencode(a) for a in args]
2259 self.env = {os.fsencode(k): os.fsencode(v)
2260 for k, v in self.env.items()}
2261
2262 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002263
Berker Peksag4af23d72016-09-15 20:32:44 +03002264 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002265 def test_spawnl(self):
2266 args = self.create_args()
2267 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2268 self.assertEqual(exitcode, self.exitcode)
2269
Berker Peksag4af23d72016-09-15 20:32:44 +03002270 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002271 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002272 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002273 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2274 self.assertEqual(exitcode, self.exitcode)
2275
Berker Peksag4af23d72016-09-15 20:32:44 +03002276 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002277 def test_spawnlp(self):
2278 args = self.create_args()
2279 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2280 self.assertEqual(exitcode, self.exitcode)
2281
Berker Peksag4af23d72016-09-15 20:32:44 +03002282 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002283 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002284 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002285 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2286 self.assertEqual(exitcode, self.exitcode)
2287
Berker Peksag4af23d72016-09-15 20:32:44 +03002288 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002289 def test_spawnv(self):
2290 args = self.create_args()
2291 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2292 self.assertEqual(exitcode, self.exitcode)
2293
Berker Peksag4af23d72016-09-15 20:32:44 +03002294 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002295 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002296 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002297 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2298 self.assertEqual(exitcode, self.exitcode)
2299
Berker Peksag4af23d72016-09-15 20:32:44 +03002300 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002301 def test_spawnvp(self):
2302 args = self.create_args()
2303 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2304 self.assertEqual(exitcode, self.exitcode)
2305
Berker Peksag4af23d72016-09-15 20:32:44 +03002306 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002307 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002308 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002309 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2310 self.assertEqual(exitcode, self.exitcode)
2311
Berker Peksag4af23d72016-09-15 20:32:44 +03002312 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002313 def test_nowait(self):
2314 args = self.create_args()
2315 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2316 result = os.waitpid(pid, 0)
2317 self.assertEqual(result[0], pid)
2318 status = result[1]
2319 if hasattr(os, 'WIFEXITED'):
2320 self.assertTrue(os.WIFEXITED(status))
2321 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2322 else:
2323 self.assertEqual(status, self.exitcode << 8)
2324
Berker Peksag4af23d72016-09-15 20:32:44 +03002325 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002326 def test_spawnve_bytes(self):
2327 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2328 args = self.create_args(with_env=True, use_bytes=True)
2329 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2330 self.assertEqual(exitcode, self.exitcode)
2331
Steve Dower859fd7b2016-11-19 18:53:19 -08002332 @requires_os_func('spawnl')
2333 def test_spawnl_noargs(self):
2334 args = self.create_args()
2335 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002336 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002337
2338 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002339 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002340 args = self.create_args()
2341 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002342 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002343
2344 @requires_os_func('spawnv')
2345 def test_spawnv_noargs(self):
2346 args = self.create_args()
2347 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2348 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002349 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2350 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002351
2352 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002353 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002354 args = self.create_args()
2355 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2356 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002357 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2358 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002359
Brian Curtin0151b8e2010-09-24 13:43:43 +00002360# The introduction of this TestCase caused at least two different errors on
2361# *nix buildbots. Temporarily skip this to let the buildbots move along.
2362@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002363@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2364class LoginTests(unittest.TestCase):
2365 def test_getlogin(self):
2366 user_name = os.getlogin()
2367 self.assertNotEqual(len(user_name), 0)
2368
2369
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002370@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2371 "needs os.getpriority and os.setpriority")
2372class ProgramPriorityTests(unittest.TestCase):
2373 """Tests for os.getpriority() and os.setpriority()."""
2374
2375 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002376
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002377 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2378 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2379 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002380 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2381 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002382 raise unittest.SkipTest("unable to reliably test setpriority "
2383 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002384 else:
2385 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002386 finally:
2387 try:
2388 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2389 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002390 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002391 raise
2392
2393
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002394if threading is not None:
2395 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002396
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002397 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002398
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002399 def __init__(self, conn):
2400 asynchat.async_chat.__init__(self, conn)
2401 self.in_buffer = []
2402 self.closed = False
2403 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002404
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002405 def handle_read(self):
2406 data = self.recv(4096)
2407 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002408
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002409 def get_data(self):
2410 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002411
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002412 def handle_close(self):
2413 self.close()
2414 self.closed = True
2415
2416 def handle_error(self):
2417 raise
2418
2419 def __init__(self, address):
2420 threading.Thread.__init__(self)
2421 asyncore.dispatcher.__init__(self)
2422 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2423 self.bind(address)
2424 self.listen(5)
2425 self.host, self.port = self.socket.getsockname()[:2]
2426 self.handler_instance = None
2427 self._active = False
2428 self._active_lock = threading.Lock()
2429
2430 # --- public API
2431
2432 @property
2433 def running(self):
2434 return self._active
2435
2436 def start(self):
2437 assert not self.running
2438 self.__flag = threading.Event()
2439 threading.Thread.start(self)
2440 self.__flag.wait()
2441
2442 def stop(self):
2443 assert self.running
2444 self._active = False
2445 self.join()
2446
2447 def wait(self):
2448 # wait for handler connection to be closed, then stop the server
2449 while not getattr(self.handler_instance, "closed", False):
2450 time.sleep(0.001)
2451 self.stop()
2452
2453 # --- internals
2454
2455 def run(self):
2456 self._active = True
2457 self.__flag.set()
2458 while self._active and asyncore.socket_map:
2459 self._active_lock.acquire()
2460 asyncore.loop(timeout=0.001, count=1)
2461 self._active_lock.release()
2462 asyncore.close_all()
2463
2464 def handle_accept(self):
2465 conn, addr = self.accept()
2466 self.handler_instance = self.Handler(conn)
2467
2468 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002469 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002470 handle_read = handle_connect
2471
2472 def writable(self):
2473 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002474
2475 def handle_error(self):
2476 raise
2477
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002478
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002479@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002480@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2481class TestSendfile(unittest.TestCase):
2482
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002483 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002484 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002485 not sys.platform.startswith("solaris") and \
2486 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002487 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2488 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002489
2490 @classmethod
2491 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002492 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002493 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002494
2495 @classmethod
2496 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002497 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002498 support.unlink(support.TESTFN)
2499
2500 def setUp(self):
2501 self.server = SendfileTestServer((support.HOST, 0))
2502 self.server.start()
2503 self.client = socket.socket()
2504 self.client.connect((self.server.host, self.server.port))
2505 self.client.settimeout(1)
2506 # synchronize by waiting for "220 ready" response
2507 self.client.recv(1024)
2508 self.sockno = self.client.fileno()
2509 self.file = open(support.TESTFN, 'rb')
2510 self.fileno = self.file.fileno()
2511
2512 def tearDown(self):
2513 self.file.close()
2514 self.client.close()
2515 if self.server.running:
2516 self.server.stop()
2517
2518 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2519 """A higher level wrapper representing how an application is
2520 supposed to use sendfile().
2521 """
2522 while 1:
2523 try:
2524 if self.SUPPORT_HEADERS_TRAILERS:
2525 return os.sendfile(sock, file, offset, nbytes, headers,
2526 trailers)
2527 else:
2528 return os.sendfile(sock, file, offset, nbytes)
2529 except OSError as err:
2530 if err.errno == errno.ECONNRESET:
2531 # disconnected
2532 raise
2533 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2534 # we have to retry send data
2535 continue
2536 else:
2537 raise
2538
2539 def test_send_whole_file(self):
2540 # normal send
2541 total_sent = 0
2542 offset = 0
2543 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002544 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002545 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2546 if sent == 0:
2547 break
2548 offset += sent
2549 total_sent += sent
2550 self.assertTrue(sent <= nbytes)
2551 self.assertEqual(offset, total_sent)
2552
2553 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002554 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002555 self.client.close()
2556 self.server.wait()
2557 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002558 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002559 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002560
2561 def test_send_at_certain_offset(self):
2562 # start sending a file at a certain offset
2563 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002564 offset = len(self.DATA) // 2
2565 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002566 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002567 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002568 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2569 if sent == 0:
2570 break
2571 offset += sent
2572 total_sent += sent
2573 self.assertTrue(sent <= nbytes)
2574
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002575 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002576 self.client.close()
2577 self.server.wait()
2578 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002579 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002580 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002581 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002582 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002583
2584 def test_offset_overflow(self):
2585 # specify an offset > file size
2586 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002587 try:
2588 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2589 except OSError as e:
2590 # Solaris can raise EINVAL if offset >= file length, ignore.
2591 if e.errno != errno.EINVAL:
2592 raise
2593 else:
2594 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002595 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002596 self.client.close()
2597 self.server.wait()
2598 data = self.server.handler_instance.get_data()
2599 self.assertEqual(data, b'')
2600
2601 def test_invalid_offset(self):
2602 with self.assertRaises(OSError) as cm:
2603 os.sendfile(self.sockno, self.fileno, -1, 4096)
2604 self.assertEqual(cm.exception.errno, errno.EINVAL)
2605
Martin Panterbf19d162015-09-09 01:01:13 +00002606 def test_keywords(self):
2607 # Keyword arguments should be supported
2608 os.sendfile(out=self.sockno, offset=0, count=4096,
2609 **{'in': self.fileno})
2610 if self.SUPPORT_HEADERS_TRAILERS:
2611 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002612 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002613
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002614 # --- headers / trailers tests
2615
Serhiy Storchaka43767632013-11-03 21:31:38 +02002616 @requires_headers_trailers
2617 def test_headers(self):
2618 total_sent = 0
2619 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2620 headers=[b"x" * 512])
2621 total_sent += sent
2622 offset = 4096
2623 nbytes = 4096
2624 while 1:
2625 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2626 offset, nbytes)
2627 if sent == 0:
2628 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002629 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002630 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002631
Serhiy Storchaka43767632013-11-03 21:31:38 +02002632 expected_data = b"x" * 512 + self.DATA
2633 self.assertEqual(total_sent, len(expected_data))
2634 self.client.close()
2635 self.server.wait()
2636 data = self.server.handler_instance.get_data()
2637 self.assertEqual(hash(data), hash(expected_data))
2638
2639 @requires_headers_trailers
2640 def test_trailers(self):
2641 TESTFN2 = support.TESTFN + "2"
2642 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002643
2644 self.addCleanup(support.unlink, TESTFN2)
2645 create_file(TESTFN2, file_data)
2646
2647 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002648 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2649 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002650 self.client.close()
2651 self.server.wait()
2652 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002653 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002654
Serhiy Storchaka43767632013-11-03 21:31:38 +02002655 @requires_headers_trailers
2656 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2657 'test needs os.SF_NODISKIO')
2658 def test_flags(self):
2659 try:
2660 os.sendfile(self.sockno, self.fileno, 0, 4096,
2661 flags=os.SF_NODISKIO)
2662 except OSError as err:
2663 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2664 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002665
2666
Larry Hastings9cf065c2012-06-22 16:30:09 -07002667def supports_extended_attributes():
2668 if not hasattr(os, "setxattr"):
2669 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002670
Larry Hastings9cf065c2012-06-22 16:30:09 -07002671 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002672 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002673 try:
2674 os.setxattr(fp.fileno(), b"user.test", b"")
2675 except OSError:
2676 return False
2677 finally:
2678 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002679
2680 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002681
2682
2683@unittest.skipUnless(supports_extended_attributes(),
2684 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002685# Kernels < 2.6.39 don't respect setxattr flags.
2686@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002687class ExtendedAttributeTests(unittest.TestCase):
2688
Larry Hastings9cf065c2012-06-22 16:30:09 -07002689 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002690 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002691 self.addCleanup(support.unlink, fn)
2692 create_file(fn)
2693
Benjamin Peterson799bd802011-08-31 22:15:17 -04002694 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002695 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002696 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002697
Victor Stinnerf12e5062011-10-16 22:12:03 +02002698 init_xattr = listxattr(fn)
2699 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002700
Larry Hastings9cf065c2012-06-22 16:30:09 -07002701 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002702 xattr = set(init_xattr)
2703 xattr.add("user.test")
2704 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002705 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2706 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2707 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002708
Benjamin Peterson799bd802011-08-31 22:15:17 -04002709 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002710 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002711 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002712
Benjamin Peterson799bd802011-08-31 22:15:17 -04002713 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002714 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002715 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002716
Larry Hastings9cf065c2012-06-22 16:30:09 -07002717 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002718 xattr.add("user.test2")
2719 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002720 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002721
Benjamin Peterson799bd802011-08-31 22:15:17 -04002722 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002723 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002724 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002725
Victor Stinnerf12e5062011-10-16 22:12:03 +02002726 xattr.remove("user.test")
2727 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002728 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2729 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2730 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2731 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002732 many = sorted("user.test{}".format(i) for i in range(100))
2733 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002734 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002735 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002736
Larry Hastings9cf065c2012-06-22 16:30:09 -07002737 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002738 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002739 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002740
2741 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2742 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002743
2744 def test_simple(self):
2745 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2746 os.listxattr)
2747
2748 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002749 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2750 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002751
2752 def test_fds(self):
2753 def getxattr(path, *args):
2754 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002755 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002756 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002757 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002758 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002759 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002760 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002761 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002762 def listxattr(path, *args):
2763 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002764 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002765 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2766
2767
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002768@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2769class TermsizeTests(unittest.TestCase):
2770 def test_does_not_crash(self):
2771 """Check if get_terminal_size() returns a meaningful value.
2772
2773 There's no easy portable way to actually check the size of the
2774 terminal, so let's check if it returns something sensible instead.
2775 """
2776 try:
2777 size = os.get_terminal_size()
2778 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002779 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002780 # Under win32 a generic OSError can be thrown if the
2781 # handle cannot be retrieved
2782 self.skipTest("failed to query terminal size")
2783 raise
2784
Antoine Pitroucfade362012-02-08 23:48:59 +01002785 self.assertGreaterEqual(size.columns, 0)
2786 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002787
2788 def test_stty_match(self):
2789 """Check if stty returns the same results
2790
2791 stty actually tests stdin, so get_terminal_size is invoked on
2792 stdin explicitly. If stty succeeded, then get_terminal_size()
2793 should work too.
2794 """
2795 try:
2796 size = subprocess.check_output(['stty', 'size']).decode().split()
2797 except (FileNotFoundError, subprocess.CalledProcessError):
2798 self.skipTest("stty invocation failed")
2799 expected = (int(size[1]), int(size[0])) # reversed order
2800
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002801 try:
2802 actual = os.get_terminal_size(sys.__stdin__.fileno())
2803 except OSError as e:
2804 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2805 # Under win32 a generic OSError can be thrown if the
2806 # handle cannot be retrieved
2807 self.skipTest("failed to query terminal size")
2808 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002809 self.assertEqual(expected, actual)
2810
2811
Victor Stinner292c8352012-10-30 02:17:38 +01002812class OSErrorTests(unittest.TestCase):
2813 def setUp(self):
2814 class Str(str):
2815 pass
2816
Victor Stinnerafe17062012-10-31 22:47:43 +01002817 self.bytes_filenames = []
2818 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002819 if support.TESTFN_UNENCODABLE is not None:
2820 decoded = support.TESTFN_UNENCODABLE
2821 else:
2822 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002823 self.unicode_filenames.append(decoded)
2824 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002825 if support.TESTFN_UNDECODABLE is not None:
2826 encoded = support.TESTFN_UNDECODABLE
2827 else:
2828 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002829 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002830 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002831 self.bytes_filenames.append(memoryview(encoded))
2832
2833 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002834
2835 def test_oserror_filename(self):
2836 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002837 (self.filenames, os.chdir,),
2838 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002839 (self.filenames, os.lstat,),
2840 (self.filenames, os.open, os.O_RDONLY),
2841 (self.filenames, os.rmdir,),
2842 (self.filenames, os.stat,),
2843 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002844 ]
2845 if sys.platform == "win32":
2846 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002847 (self.bytes_filenames, os.rename, b"dst"),
2848 (self.bytes_filenames, os.replace, b"dst"),
2849 (self.unicode_filenames, os.rename, "dst"),
2850 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002851 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002852 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002853 else:
2854 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002855 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002856 (self.filenames, os.rename, "dst"),
2857 (self.filenames, os.replace, "dst"),
2858 ))
2859 if hasattr(os, "chown"):
2860 funcs.append((self.filenames, os.chown, 0, 0))
2861 if hasattr(os, "lchown"):
2862 funcs.append((self.filenames, os.lchown, 0, 0))
2863 if hasattr(os, "truncate"):
2864 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002865 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002866 funcs.append((self.filenames, os.chflags, 0))
2867 if hasattr(os, "lchflags"):
2868 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002869 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002870 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002871 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002872 if sys.platform == "win32":
2873 funcs.append((self.bytes_filenames, os.link, b"dst"))
2874 funcs.append((self.unicode_filenames, os.link, "dst"))
2875 else:
2876 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002877 if hasattr(os, "listxattr"):
2878 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002879 (self.filenames, os.listxattr,),
2880 (self.filenames, os.getxattr, "user.test"),
2881 (self.filenames, os.setxattr, "user.test", b'user'),
2882 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002883 ))
2884 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002885 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002886 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002887 if sys.platform == "win32":
2888 funcs.append((self.unicode_filenames, os.readlink,))
2889 else:
2890 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002891
Steve Dowercc16be82016-09-08 10:35:16 -07002892
Victor Stinnerafe17062012-10-31 22:47:43 +01002893 for filenames, func, *func_args in funcs:
2894 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002895 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002896 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002897 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002898 else:
2899 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2900 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002901 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002902 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002903 except UnicodeDecodeError:
2904 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002905 else:
2906 self.fail("No exception thrown by {}".format(func))
2907
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002908class CPUCountTests(unittest.TestCase):
2909 def test_cpu_count(self):
2910 cpus = os.cpu_count()
2911 if cpus is not None:
2912 self.assertIsInstance(cpus, int)
2913 self.assertGreater(cpus, 0)
2914 else:
2915 self.skipTest("Could not determine the number of CPUs")
2916
Victor Stinnerdaf45552013-08-28 00:53:59 +02002917
2918class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002919 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002920 fd = os.open(__file__, os.O_RDONLY)
2921 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002922 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002923
Victor Stinnerdaf45552013-08-28 00:53:59 +02002924 os.set_inheritable(fd, True)
2925 self.assertEqual(os.get_inheritable(fd), True)
2926
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002927 @unittest.skipIf(fcntl is None, "need fcntl")
2928 def test_get_inheritable_cloexec(self):
2929 fd = os.open(__file__, os.O_RDONLY)
2930 self.addCleanup(os.close, fd)
2931 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002932
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002933 # clear FD_CLOEXEC flag
2934 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2935 flags &= ~fcntl.FD_CLOEXEC
2936 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002937
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002938 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002939
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002940 @unittest.skipIf(fcntl is None, "need fcntl")
2941 def test_set_inheritable_cloexec(self):
2942 fd = os.open(__file__, os.O_RDONLY)
2943 self.addCleanup(os.close, fd)
2944 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2945 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002946
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002947 os.set_inheritable(fd, True)
2948 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2949 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002950
Victor Stinnerdaf45552013-08-28 00:53:59 +02002951 def test_open(self):
2952 fd = os.open(__file__, os.O_RDONLY)
2953 self.addCleanup(os.close, fd)
2954 self.assertEqual(os.get_inheritable(fd), False)
2955
2956 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2957 def test_pipe(self):
2958 rfd, wfd = os.pipe()
2959 self.addCleanup(os.close, rfd)
2960 self.addCleanup(os.close, wfd)
2961 self.assertEqual(os.get_inheritable(rfd), False)
2962 self.assertEqual(os.get_inheritable(wfd), False)
2963
2964 def test_dup(self):
2965 fd1 = os.open(__file__, os.O_RDONLY)
2966 self.addCleanup(os.close, fd1)
2967
2968 fd2 = os.dup(fd1)
2969 self.addCleanup(os.close, fd2)
2970 self.assertEqual(os.get_inheritable(fd2), False)
2971
2972 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2973 def test_dup2(self):
2974 fd = os.open(__file__, os.O_RDONLY)
2975 self.addCleanup(os.close, fd)
2976
2977 # inheritable by default
2978 fd2 = os.open(__file__, os.O_RDONLY)
2979 try:
2980 os.dup2(fd, fd2)
2981 self.assertEqual(os.get_inheritable(fd2), True)
2982 finally:
2983 os.close(fd2)
2984
2985 # force non-inheritable
2986 fd3 = os.open(__file__, os.O_RDONLY)
2987 try:
2988 os.dup2(fd, fd3, inheritable=False)
2989 self.assertEqual(os.get_inheritable(fd3), False)
2990 finally:
2991 os.close(fd3)
2992
2993 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2994 def test_openpty(self):
2995 master_fd, slave_fd = os.openpty()
2996 self.addCleanup(os.close, master_fd)
2997 self.addCleanup(os.close, slave_fd)
2998 self.assertEqual(os.get_inheritable(master_fd), False)
2999 self.assertEqual(os.get_inheritable(slave_fd), False)
3000
3001
Brett Cannon3f9183b2016-08-26 14:44:48 -07003002class PathTConverterTests(unittest.TestCase):
3003 # tuples of (function name, allows fd arguments, additional arguments to
3004 # function, cleanup function)
3005 functions = [
3006 ('stat', True, (), None),
3007 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003008 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003009 ('chflags', False, (0,), None),
3010 ('lchflags', False, (0,), None),
3011 ('open', False, (0,), getattr(os, 'close', None)),
3012 ]
3013
3014 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003015 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003016 if os.name == 'nt':
3017 bytes_fspath = bytes_filename = None
3018 else:
3019 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003020 bytes_fspath = _PathLike(bytes_filename)
3021 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003022 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003023 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003024
Brett Cannonec6ce872016-09-06 15:50:29 -07003025 int_fspath = _PathLike(fd)
3026 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003027
3028 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3029 with self.subTest(name=name):
3030 try:
3031 fn = getattr(os, name)
3032 except AttributeError:
3033 continue
3034
Brett Cannon8f96a302016-08-26 19:30:11 -07003035 for path in (str_filename, bytes_filename, str_fspath,
3036 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003037 if path is None:
3038 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003039 with self.subTest(name=name, path=path):
3040 result = fn(path, *extra_args)
3041 if cleanup_fn is not None:
3042 cleanup_fn(result)
3043
3044 with self.assertRaisesRegex(
3045 TypeError, 'should be string, bytes'):
3046 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003047
3048 if allow_fd:
3049 result = fn(fd, *extra_args) # should not fail
3050 if cleanup_fn is not None:
3051 cleanup_fn(result)
3052 else:
3053 with self.assertRaisesRegex(
3054 TypeError,
3055 'os.PathLike'):
3056 fn(fd, *extra_args)
3057
3058
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003059@unittest.skipUnless(hasattr(os, 'get_blocking'),
3060 'needs os.get_blocking() and os.set_blocking()')
3061class BlockingTests(unittest.TestCase):
3062 def test_blocking(self):
3063 fd = os.open(__file__, os.O_RDONLY)
3064 self.addCleanup(os.close, fd)
3065 self.assertEqual(os.get_blocking(fd), True)
3066
3067 os.set_blocking(fd, False)
3068 self.assertEqual(os.get_blocking(fd), False)
3069
3070 os.set_blocking(fd, True)
3071 self.assertEqual(os.get_blocking(fd), True)
3072
3073
Yury Selivanov97e2e062014-09-26 12:33:06 -04003074
3075class ExportsTests(unittest.TestCase):
3076 def test_os_all(self):
3077 self.assertIn('open', os.__all__)
3078 self.assertIn('walk', os.__all__)
3079
3080
Victor Stinner6036e442015-03-08 01:58:04 +01003081class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003082 check_no_resource_warning = support.check_no_resource_warning
3083
Victor Stinner6036e442015-03-08 01:58:04 +01003084 def setUp(self):
3085 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003086 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003087 self.addCleanup(support.rmtree, self.path)
3088 os.mkdir(self.path)
3089
3090 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003091 path = self.bytes_path if isinstance(name, bytes) else self.path
3092 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003093 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003094 return filename
3095
3096 def get_entries(self, names):
3097 entries = dict((entry.name, entry)
3098 for entry in os.scandir(self.path))
3099 self.assertEqual(sorted(entries.keys()), names)
3100 return entries
3101
3102 def assert_stat_equal(self, stat1, stat2, skip_fields):
3103 if skip_fields:
3104 for attr in dir(stat1):
3105 if not attr.startswith("st_"):
3106 continue
3107 if attr in ("st_dev", "st_ino", "st_nlink"):
3108 continue
3109 self.assertEqual(getattr(stat1, attr),
3110 getattr(stat2, attr),
3111 (stat1, stat2, attr))
3112 else:
3113 self.assertEqual(stat1, stat2)
3114
3115 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003116 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003117 self.assertEqual(entry.name, name)
3118 self.assertEqual(entry.path, os.path.join(self.path, name))
3119 self.assertEqual(entry.inode(),
3120 os.stat(entry.path, follow_symlinks=False).st_ino)
3121
3122 entry_stat = os.stat(entry.path)
3123 self.assertEqual(entry.is_dir(),
3124 stat.S_ISDIR(entry_stat.st_mode))
3125 self.assertEqual(entry.is_file(),
3126 stat.S_ISREG(entry_stat.st_mode))
3127 self.assertEqual(entry.is_symlink(),
3128 os.path.islink(entry.path))
3129
3130 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3131 self.assertEqual(entry.is_dir(follow_symlinks=False),
3132 stat.S_ISDIR(entry_lstat.st_mode))
3133 self.assertEqual(entry.is_file(follow_symlinks=False),
3134 stat.S_ISREG(entry_lstat.st_mode))
3135
3136 self.assert_stat_equal(entry.stat(),
3137 entry_stat,
3138 os.name == 'nt' and not is_symlink)
3139 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3140 entry_lstat,
3141 os.name == 'nt')
3142
3143 def test_attributes(self):
3144 link = hasattr(os, 'link')
3145 symlink = support.can_symlink()
3146
3147 dirname = os.path.join(self.path, "dir")
3148 os.mkdir(dirname)
3149 filename = self.create_file("file.txt")
3150 if link:
3151 os.link(filename, os.path.join(self.path, "link_file.txt"))
3152 if symlink:
3153 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3154 target_is_directory=True)
3155 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3156
3157 names = ['dir', 'file.txt']
3158 if link:
3159 names.append('link_file.txt')
3160 if symlink:
3161 names.extend(('symlink_dir', 'symlink_file.txt'))
3162 entries = self.get_entries(names)
3163
3164 entry = entries['dir']
3165 self.check_entry(entry, 'dir', True, False, False)
3166
3167 entry = entries['file.txt']
3168 self.check_entry(entry, 'file.txt', False, True, False)
3169
3170 if link:
3171 entry = entries['link_file.txt']
3172 self.check_entry(entry, 'link_file.txt', False, True, False)
3173
3174 if symlink:
3175 entry = entries['symlink_dir']
3176 self.check_entry(entry, 'symlink_dir', True, False, True)
3177
3178 entry = entries['symlink_file.txt']
3179 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3180
3181 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003182 path = self.bytes_path if isinstance(name, bytes) else self.path
3183 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003184 self.assertEqual(len(entries), 1)
3185
3186 entry = entries[0]
3187 self.assertEqual(entry.name, name)
3188 return entry
3189
Brett Cannon96881cd2016-06-10 14:37:21 -07003190 def create_file_entry(self, name='file.txt'):
3191 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003192 return self.get_entry(os.path.basename(filename))
3193
3194 def test_current_directory(self):
3195 filename = self.create_file()
3196 old_dir = os.getcwd()
3197 try:
3198 os.chdir(self.path)
3199
3200 # call scandir() without parameter: it must list the content
3201 # of the current directory
3202 entries = dict((entry.name, entry) for entry in os.scandir())
3203 self.assertEqual(sorted(entries.keys()),
3204 [os.path.basename(filename)])
3205 finally:
3206 os.chdir(old_dir)
3207
3208 def test_repr(self):
3209 entry = self.create_file_entry()
3210 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3211
Brett Cannon96881cd2016-06-10 14:37:21 -07003212 def test_fspath_protocol(self):
3213 entry = self.create_file_entry()
3214 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3215
3216 def test_fspath_protocol_bytes(self):
3217 bytes_filename = os.fsencode('bytesfile.txt')
3218 bytes_entry = self.create_file_entry(name=bytes_filename)
3219 fspath = os.fspath(bytes_entry)
3220 self.assertIsInstance(fspath, bytes)
3221 self.assertEqual(fspath,
3222 os.path.join(os.fsencode(self.path),bytes_filename))
3223
Victor Stinner6036e442015-03-08 01:58:04 +01003224 def test_removed_dir(self):
3225 path = os.path.join(self.path, 'dir')
3226
3227 os.mkdir(path)
3228 entry = self.get_entry('dir')
3229 os.rmdir(path)
3230
3231 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3232 if os.name == 'nt':
3233 self.assertTrue(entry.is_dir())
3234 self.assertFalse(entry.is_file())
3235 self.assertFalse(entry.is_symlink())
3236 if os.name == 'nt':
3237 self.assertRaises(FileNotFoundError, entry.inode)
3238 # don't fail
3239 entry.stat()
3240 entry.stat(follow_symlinks=False)
3241 else:
3242 self.assertGreater(entry.inode(), 0)
3243 self.assertRaises(FileNotFoundError, entry.stat)
3244 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3245
3246 def test_removed_file(self):
3247 entry = self.create_file_entry()
3248 os.unlink(entry.path)
3249
3250 self.assertFalse(entry.is_dir())
3251 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3252 if os.name == 'nt':
3253 self.assertTrue(entry.is_file())
3254 self.assertFalse(entry.is_symlink())
3255 if os.name == 'nt':
3256 self.assertRaises(FileNotFoundError, entry.inode)
3257 # don't fail
3258 entry.stat()
3259 entry.stat(follow_symlinks=False)
3260 else:
3261 self.assertGreater(entry.inode(), 0)
3262 self.assertRaises(FileNotFoundError, entry.stat)
3263 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3264
3265 def test_broken_symlink(self):
3266 if not support.can_symlink():
3267 return self.skipTest('cannot create symbolic link')
3268
3269 filename = self.create_file("file.txt")
3270 os.symlink(filename,
3271 os.path.join(self.path, "symlink.txt"))
3272 entries = self.get_entries(['file.txt', 'symlink.txt'])
3273 entry = entries['symlink.txt']
3274 os.unlink(filename)
3275
3276 self.assertGreater(entry.inode(), 0)
3277 self.assertFalse(entry.is_dir())
3278 self.assertFalse(entry.is_file()) # broken symlink returns False
3279 self.assertFalse(entry.is_dir(follow_symlinks=False))
3280 self.assertFalse(entry.is_file(follow_symlinks=False))
3281 self.assertTrue(entry.is_symlink())
3282 self.assertRaises(FileNotFoundError, entry.stat)
3283 # don't fail
3284 entry.stat(follow_symlinks=False)
3285
3286 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003287 self.create_file("file.txt")
3288
3289 path_bytes = os.fsencode(self.path)
3290 entries = list(os.scandir(path_bytes))
3291 self.assertEqual(len(entries), 1, entries)
3292 entry = entries[0]
3293
3294 self.assertEqual(entry.name, b'file.txt')
3295 self.assertEqual(entry.path,
3296 os.fsencode(os.path.join(self.path, 'file.txt')))
3297
3298 def test_empty_path(self):
3299 self.assertRaises(FileNotFoundError, os.scandir, '')
3300
3301 def test_consume_iterator_twice(self):
3302 self.create_file("file.txt")
3303 iterator = os.scandir(self.path)
3304
3305 entries = list(iterator)
3306 self.assertEqual(len(entries), 1, entries)
3307
3308 # check than consuming the iterator twice doesn't raise exception
3309 entries2 = list(iterator)
3310 self.assertEqual(len(entries2), 0, entries2)
3311
3312 def test_bad_path_type(self):
3313 for obj in [1234, 1.234, {}, []]:
3314 self.assertRaises(TypeError, os.scandir, obj)
3315
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003316 def test_close(self):
3317 self.create_file("file.txt")
3318 self.create_file("file2.txt")
3319 iterator = os.scandir(self.path)
3320 next(iterator)
3321 iterator.close()
3322 # multiple closes
3323 iterator.close()
3324 with self.check_no_resource_warning():
3325 del iterator
3326
3327 def test_context_manager(self):
3328 self.create_file("file.txt")
3329 self.create_file("file2.txt")
3330 with os.scandir(self.path) as iterator:
3331 next(iterator)
3332 with self.check_no_resource_warning():
3333 del iterator
3334
3335 def test_context_manager_close(self):
3336 self.create_file("file.txt")
3337 self.create_file("file2.txt")
3338 with os.scandir(self.path) as iterator:
3339 next(iterator)
3340 iterator.close()
3341
3342 def test_context_manager_exception(self):
3343 self.create_file("file.txt")
3344 self.create_file("file2.txt")
3345 with self.assertRaises(ZeroDivisionError):
3346 with os.scandir(self.path) as iterator:
3347 next(iterator)
3348 1/0
3349 with self.check_no_resource_warning():
3350 del iterator
3351
3352 def test_resource_warning(self):
3353 self.create_file("file.txt")
3354 self.create_file("file2.txt")
3355 iterator = os.scandir(self.path)
3356 next(iterator)
3357 with self.assertWarns(ResourceWarning):
3358 del iterator
3359 support.gc_collect()
3360 # exhausted iterator
3361 iterator = os.scandir(self.path)
3362 list(iterator)
3363 with self.check_no_resource_warning():
3364 del iterator
3365
Victor Stinner6036e442015-03-08 01:58:04 +01003366
Ethan Furmancdc08792016-06-02 15:06:09 -07003367class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003368
3369 # Abstracted so it can be overridden to test pure Python implementation
3370 # if a C version is provided.
3371 fspath = staticmethod(os.fspath)
3372
Ethan Furmancdc08792016-06-02 15:06:09 -07003373 def test_return_bytes(self):
3374 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003375 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003376
3377 def test_return_string(self):
3378 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003379 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003380
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003381 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003382 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003383 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003384
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003385 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003386 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3387 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3388
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003389 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003390 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3391 self.assertTrue(issubclass(_PathLike, os.PathLike))
3392 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003393
Ethan Furmancdc08792016-06-02 15:06:09 -07003394 def test_garbage_in_exception_out(self):
3395 vapor = type('blah', (), {})
3396 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003397 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003398
3399 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003400 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003401
Brett Cannon044283a2016-07-15 10:41:49 -07003402 def test_bad_pathlike(self):
3403 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003404 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003405 # __fspath__ attribute that is not callable.
3406 c = type('foo', (), {})
3407 c.__fspath__ = 1
3408 self.assertRaises(TypeError, self.fspath, c())
3409 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003410 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003411 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003412
3413# Only test if the C version is provided, otherwise TestPEP519 already tested
3414# the pure Python implementation.
3415if hasattr(os, "_fspath"):
3416 class TestPEP519PurePython(TestPEP519):
3417
3418 """Explicitly test the pure Python implementation of os.fspath()."""
3419
3420 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003421
3422
Fred Drake2e2be372001-09-20 21:33:42 +00003423if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003424 unittest.main()