blob: 746b3f8be8e0bfc2d11fd8fd8ea8717dcf33514e [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020025import time
26import unittest
27import uuid
28import warnings
29from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import grp
48 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
49 if hasattr(os, 'getgid'):
50 process_gid = os.getgid()
51 if process_gid not in groups:
52 groups.append(process_gid)
53except ImportError:
54 groups = []
55try:
56 import pwd
57 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010058except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050059 all_users = []
60try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020062except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020066from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000067
Victor Stinner923590e2016-03-24 09:11:48 +010068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Victor Stinner923590e2016-03-24 09:11:48 +010085
86@contextlib.contextmanager
87def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
Berker Peksag4af23d72016-09-15 20:32:44 +030092def requires_os_func(name):
93 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
94
95
Brett Cannonec6ce872016-09-06 15:50:29 -070096class _PathLike(os.PathLike):
97
98 def __init__(self, path=""):
99 self.path = path
100
101 def __str__(self):
102 return str(self.path)
103
104 def __fspath__(self):
105 if isinstance(self.path, BaseException):
106 raise self.path
107 else:
108 return self.path
109
110
Victor Stinnerae39d232016-03-24 17:12:55 +0100111def create_file(filename, content=b'content'):
112 with open(filename, "xb", 0) as fp:
113 fp.write(content)
114
115
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116# Tests creating TESTFN
117class FileTests(unittest.TestCase):
118 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000119 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000120 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121 tearDown = setUp
122
123 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Christian Heimesfdab48e2008-01-20 09:06:41 +0000128 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000129 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
130 # We must allocate two consecutive file descriptors, otherwise
131 # it will mess up other file descriptors (perhaps even the three
132 # standard ones).
133 second = os.dup(first)
134 try:
135 retries = 0
136 while second != first + 1:
137 os.close(first)
138 retries += 1
139 if retries > 10:
140 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000141 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000142 first, second = second, os.dup(second)
143 finally:
144 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000145 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000146 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000147 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000148
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000149 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000150 def test_rename(self):
151 path = support.TESTFN
152 old = sys.getrefcount(path)
153 self.assertRaises(TypeError, os.rename, path, 0)
154 new = sys.getrefcount(path)
155 self.assertEqual(old, new)
156
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157 def test_read(self):
158 with open(support.TESTFN, "w+b") as fobj:
159 fobj.write(b"spam")
160 fobj.flush()
161 fd = fobj.fileno()
162 os.lseek(fd, 0, 0)
163 s = os.read(fd, 4)
164 self.assertEqual(type(s), bytes)
165 self.assertEqual(s, b"spam")
166
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200167 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200168 # Skip the test on 32-bit platforms: the number of bytes must fit in a
169 # Py_ssize_t type
170 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
171 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200172 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
173 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200174 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100175 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200176
177 # Issue #21932: Make sure that os.read() does not raise an
178 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200179 with open(support.TESTFN, "rb") as fp:
180 data = os.read(fp.fileno(), size)
181
182 # The test does not try to read more than 2 GB at once because the
183 # operating system is free to return less bytes than requested.
184 self.assertEqual(data, b'test')
185
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000186 def test_write(self):
187 # os.write() accepts bytes- and buffer-like objects but not strings
188 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
189 self.assertRaises(TypeError, os.write, fd, "beans")
190 os.write(fd, b"bacon\n")
191 os.write(fd, bytearray(b"eggs\n"))
192 os.write(fd, memoryview(b"spam\n"))
193 os.close(fd)
194 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000195 self.assertEqual(fobj.read().splitlines(),
196 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000197
Victor Stinnere0daff12011-03-20 23:36:35 +0100198 def write_windows_console(self, *args):
199 retcode = subprocess.call(args,
200 # use a new console to not flood the test output
201 creationflags=subprocess.CREATE_NEW_CONSOLE,
202 # use a shell to hide the console window (SW_HIDE)
203 shell=True)
204 self.assertEqual(retcode, 0)
205
206 @unittest.skipUnless(sys.platform == 'win32',
207 'test specific to the Windows console')
208 def test_write_windows_console(self):
209 # Issue #11395: the Windows console returns an error (12: not enough
210 # space error) on writing into stdout if stdout mode is binary and the
211 # length is greater than 66,000 bytes (or less, depending on heap
212 # usage).
213 code = "print('x' * 100000)"
214 self.write_windows_console(sys.executable, "-c", code)
215 self.write_windows_console(sys.executable, "-u", "-c", code)
216
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000217 def fdopen_helper(self, *args):
218 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200219 f = os.fdopen(fd, *args)
220 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000221
222 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200223 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
224 os.close(fd)
225
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000226 self.fdopen_helper()
227 self.fdopen_helper('r')
228 self.fdopen_helper('r', 100)
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 def test_replace(self):
231 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100232 self.addCleanup(support.unlink, support.TESTFN)
233 self.addCleanup(support.unlink, TESTFN2)
234
235 create_file(support.TESTFN, b"1")
236 create_file(TESTFN2, b"2")
237
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100238 os.replace(support.TESTFN, TESTFN2)
239 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
240 with open(TESTFN2, 'r') as f:
241 self.assertEqual(f.read(), "1")
242
Martin Panterbf19d162015-09-09 01:01:13 +0000243 def test_open_keywords(self):
244 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
245 dir_fd=None)
246 os.close(f)
247
248 def test_symlink_keywords(self):
249 symlink = support.get_attribute(os, "symlink")
250 try:
251 symlink(src='target', dst=support.TESTFN,
252 target_is_directory=False, dir_fd=None)
253 except (NotImplementedError, OSError):
254 pass # No OS support or unprivileged user
255
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257# Test attributes on return values from os.*stat* family.
258class StatAttributeTests(unittest.TestCase):
259 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200260 self.fname = support.TESTFN
261 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100262 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Serhiy Storchaka43767632013-11-03 21:31:38 +0200264 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000265 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result[stat.ST_SIZE], 3)
270 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 # Make sure all the attributes are there
273 members = dir(result)
274 for name in dir(stat):
275 if name[:3] == 'ST_':
276 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000277 if name.endswith("TIME"):
278 def trunc(x): return int(x)
279 else:
280 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000281 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Larry Hastings6fe20b32012-04-19 15:07:49 -0700285 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700286 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700287 for name in 'st_atime st_mtime st_ctime'.split():
288 floaty = int(getattr(result, name) * 100000)
289 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700290 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700291
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except IndexError:
296 pass
297
298 # Make sure that assignment fails
299 try:
300 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000302 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 pass
304
305 try:
306 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000308 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309 pass
310
311 try:
312 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200313 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000314 except AttributeError:
315 pass
316
317 # Use the stat_result constructor with a too-short tuple.
318 try:
319 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200320 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 except TypeError:
322 pass
323
Ezio Melotti42da6632011-03-15 05:18:48 +0200324 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 try:
326 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
327 except TypeError:
328 pass
329
Antoine Pitrou38425292010-09-21 18:19:07 +0000330 def test_stat_attributes(self):
331 self.check_stat_attributes(self.fname)
332
333 def test_stat_attributes_bytes(self):
334 try:
335 fname = self.fname.encode(sys.getfilesystemencoding())
336 except UnicodeEncodeError:
337 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700338 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000339
Christian Heimes25827622013-10-12 01:27:08 +0200340 def test_stat_result_pickle(self):
341 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200342 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
343 p = pickle.dumps(result, proto)
344 self.assertIn(b'stat_result', p)
345 if proto < 4:
346 self.assertIn(b'cos\nstat_result\n', p)
347 unpickled = pickle.loads(p)
348 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200349
Serhiy Storchaka43767632013-11-03 21:31:38 +0200350 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000352 try:
353 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000354 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000355 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200357 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000358
359 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000362 # Make sure all the attributes are there.
363 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
364 'ffree', 'favail', 'flag', 'namemax')
365 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000366 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000367
368 # Make sure that assignment really fails
369 try:
370 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000372 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000373 pass
374
375 try:
376 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except AttributeError:
379 pass
380
381 # Use the constructor with a too-short tuple.
382 try:
383 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200384 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385 except TypeError:
386 pass
387
Ezio Melotti42da6632011-03-15 05:18:48 +0200388 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 try:
390 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
391 except TypeError:
392 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000393
Christian Heimes25827622013-10-12 01:27:08 +0200394 @unittest.skipUnless(hasattr(os, 'statvfs'),
395 "need os.statvfs()")
396 def test_statvfs_result_pickle(self):
397 try:
398 result = os.statvfs(self.fname)
399 except OSError as e:
400 # On AtheOS, glibc always returns ENOSYS
401 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200402 self.skipTest('os.statvfs() failed with ENOSYS')
403
Serhiy Storchakabad12572014-12-15 14:03:42 +0200404 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405 p = pickle.dumps(result, proto)
406 self.assertIn(b'statvfs_result', p)
407 if proto < 4:
408 self.assertIn(b'cos\nstatvfs_result\n', p)
409 unpickled = pickle.loads(p)
410 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200411
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
413 def test_1686475(self):
414 # Verify that an open file can be stat'ed
415 try:
416 os.stat(r"c:\pagefile.sys")
417 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600418 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200419 except OSError as e:
420 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000421
Serhiy Storchaka43767632013-11-03 21:31:38 +0200422 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
423 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
424 def test_15261(self):
425 # Verify that stat'ing a closed fd does not cause crash
426 r, w = os.pipe()
427 try:
428 os.stat(r) # should not raise error
429 finally:
430 os.close(r)
431 os.close(w)
432 with self.assertRaises(OSError) as ctx:
433 os.stat(r)
434 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100435
Zachary Ware63f277b2014-06-19 09:46:37 -0500436 def check_file_attributes(self, result):
437 self.assertTrue(hasattr(result, 'st_file_attributes'))
438 self.assertTrue(isinstance(result.st_file_attributes, int))
439 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
440
441 @unittest.skipUnless(sys.platform == "win32",
442 "st_file_attributes is Win32 specific")
443 def test_file_attributes(self):
444 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
445 result = os.stat(self.fname)
446 self.check_file_attributes(result)
447 self.assertEqual(
448 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
449 0)
450
451 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200452 dirname = support.TESTFN + "dir"
453 os.mkdir(dirname)
454 self.addCleanup(os.rmdir, dirname)
455
456 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500457 self.check_file_attributes(result)
458 self.assertEqual(
459 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
460 stat.FILE_ATTRIBUTE_DIRECTORY)
461
Berker Peksag0b4dc482016-09-17 15:49:59 +0300462 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
463 def test_access_denied(self):
464 # Default to FindFirstFile WIN32_FIND_DATA when access is
465 # denied. See issue 28075.
466 # os.environ['TEMP'] should be located on a volume that
467 # supports file ACLs.
468 fname = os.path.join(os.environ['TEMP'], self.fname)
469 self.addCleanup(support.unlink, fname)
470 create_file(fname, b'ABC')
471 # Deny the right to [S]YNCHRONIZE on the file to
472 # force CreateFile to fail with ERROR_ACCESS_DENIED.
473 DETACHED_PROCESS = 8
474 subprocess.check_call(
475 ['icacls.exe', fname, '/deny', 'Users:(S)'],
476 creationflags=DETACHED_PROCESS
477 )
478 result = os.stat(fname)
479 self.assertNotEqual(result.st_size, 0)
480
Victor Stinner47aacc82015-06-12 17:26:23 +0200481
482class UtimeTests(unittest.TestCase):
483 def setUp(self):
484 self.dirname = support.TESTFN
485 self.fname = os.path.join(self.dirname, "f1")
486
487 self.addCleanup(support.rmtree, self.dirname)
488 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100489 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200490
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200491 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100492 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200493 os.stat_float_times(state)
494
Victor Stinner47aacc82015-06-12 17:26:23 +0200495 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100496 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200497 old_float_times = os.stat_float_times(-1)
498 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200499
500 os.stat_float_times(True)
501
502 def support_subsecond(self, filename):
503 # Heuristic to check if the filesystem supports timestamp with
504 # subsecond resolution: check if float and int timestamps are different
505 st = os.stat(filename)
506 return ((st.st_atime != st[7])
507 or (st.st_mtime != st[8])
508 or (st.st_ctime != st[9]))
509
510 def _test_utime(self, set_time, filename=None):
511 if not filename:
512 filename = self.fname
513
514 support_subsecond = self.support_subsecond(filename)
515 if support_subsecond:
516 # Timestamp with a resolution of 1 microsecond (10^-6).
517 #
518 # The resolution of the C internal function used by os.utime()
519 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
520 # test with a resolution of 1 ns requires more work:
521 # see the issue #15745.
522 atime_ns = 1002003000 # 1.002003 seconds
523 mtime_ns = 4005006000 # 4.005006 seconds
524 else:
525 # use a resolution of 1 second
526 atime_ns = 5 * 10**9
527 mtime_ns = 8 * 10**9
528
529 set_time(filename, (atime_ns, mtime_ns))
530 st = os.stat(filename)
531
532 if support_subsecond:
533 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
534 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
535 else:
536 self.assertEqual(st.st_atime, atime_ns * 1e-9)
537 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
538 self.assertEqual(st.st_atime_ns, atime_ns)
539 self.assertEqual(st.st_mtime_ns, mtime_ns)
540
541 def test_utime(self):
542 def set_time(filename, ns):
543 # test the ns keyword parameter
544 os.utime(filename, ns=ns)
545 self._test_utime(set_time)
546
547 @staticmethod
548 def ns_to_sec(ns):
549 # Convert a number of nanosecond (int) to a number of seconds (float).
550 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
551 # issue, os.utime() rounds towards minus infinity.
552 return (ns * 1e-9) + 0.5e-9
553
554 def test_utime_by_indexed(self):
555 # pass times as floating point seconds as the second indexed parameter
556 def set_time(filename, ns):
557 atime_ns, mtime_ns = ns
558 atime = self.ns_to_sec(atime_ns)
559 mtime = self.ns_to_sec(mtime_ns)
560 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
561 # or utime(time_t)
562 os.utime(filename, (atime, mtime))
563 self._test_utime(set_time)
564
565 def test_utime_by_times(self):
566 def set_time(filename, ns):
567 atime_ns, mtime_ns = ns
568 atime = self.ns_to_sec(atime_ns)
569 mtime = self.ns_to_sec(mtime_ns)
570 # test the times keyword parameter
571 os.utime(filename, times=(atime, mtime))
572 self._test_utime(set_time)
573
574 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
575 "follow_symlinks support for utime required "
576 "for this test.")
577 def test_utime_nofollow_symlinks(self):
578 def set_time(filename, ns):
579 # use follow_symlinks=False to test utimensat(timespec)
580 # or lutimes(timeval)
581 os.utime(filename, ns=ns, follow_symlinks=False)
582 self._test_utime(set_time)
583
584 @unittest.skipUnless(os.utime in os.supports_fd,
585 "fd support for utime required for this test.")
586 def test_utime_fd(self):
587 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100588 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200589 # use a file descriptor to test futimens(timespec)
590 # or futimes(timeval)
591 os.utime(fp.fileno(), ns=ns)
592 self._test_utime(set_time)
593
594 @unittest.skipUnless(os.utime in os.supports_dir_fd,
595 "dir_fd support for utime required for this test.")
596 def test_utime_dir_fd(self):
597 def set_time(filename, ns):
598 dirname, name = os.path.split(filename)
599 dirfd = os.open(dirname, os.O_RDONLY)
600 try:
601 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
602 os.utime(name, dir_fd=dirfd, ns=ns)
603 finally:
604 os.close(dirfd)
605 self._test_utime(set_time)
606
607 def test_utime_directory(self):
608 def set_time(filename, ns):
609 # test calling os.utime() on a directory
610 os.utime(filename, ns=ns)
611 self._test_utime(set_time, filename=self.dirname)
612
613 def _test_utime_current(self, set_time):
614 # Get the system clock
615 current = time.time()
616
617 # Call os.utime() to set the timestamp to the current system clock
618 set_time(self.fname)
619
620 if not self.support_subsecond(self.fname):
621 delta = 1.0
622 else:
623 # On Windows, the usual resolution of time.time() is 15.6 ms
624 delta = 0.020
625 st = os.stat(self.fname)
626 msg = ("st_time=%r, current=%r, dt=%r"
627 % (st.st_mtime, current, st.st_mtime - current))
628 self.assertAlmostEqual(st.st_mtime, current,
629 delta=delta, msg=msg)
630
631 def test_utime_current(self):
632 def set_time(filename):
633 # Set to the current time in the new way
634 os.utime(self.fname)
635 self._test_utime_current(set_time)
636
637 def test_utime_current_old(self):
638 def set_time(filename):
639 # Set to the current time in the old explicit way.
640 os.utime(self.fname, None)
641 self._test_utime_current(set_time)
642
643 def get_file_system(self, path):
644 if sys.platform == 'win32':
645 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
646 import ctypes
647 kernel32 = ctypes.windll.kernel32
648 buf = ctypes.create_unicode_buffer("", 100)
649 ok = kernel32.GetVolumeInformationW(root, None, 0,
650 None, None, None,
651 buf, len(buf))
652 if ok:
653 return buf.value
654 # return None if the filesystem is unknown
655
656 def test_large_time(self):
657 # Many filesystems are limited to the year 2038. At least, the test
658 # pass with NTFS filesystem.
659 if self.get_file_system(self.dirname) != "NTFS":
660 self.skipTest("requires NTFS")
661
662 large = 5000000000 # some day in 2128
663 os.utime(self.fname, (large, large))
664 self.assertEqual(os.stat(self.fname).st_mtime, large)
665
666 def test_utime_invalid_arguments(self):
667 # seconds and nanoseconds parameters are mutually exclusive
668 with self.assertRaises(ValueError):
669 os.utime(self.fname, (5, 5), ns=(5, 5))
670
671
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000672from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000673
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000674class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000675 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000676 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000677
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678 def setUp(self):
679 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000680 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000681 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000682 for key, value in self._reference().items():
683 os.environ[key] = value
684
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000685 def tearDown(self):
686 os.environ.clear()
687 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000688 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000689 os.environb.clear()
690 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000691
Christian Heimes90333392007-11-01 19:08:42 +0000692 def _reference(self):
693 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
694
695 def _empty_mapping(self):
696 os.environ.clear()
697 return os.environ
698
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000699 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200700 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
701 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000702 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000703 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300704 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200705 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300706 value = popen.read().strip()
707 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000708
Xavier de Gayed1415312016-07-22 12:15:29 +0200709 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
710 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000711 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200712 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
713 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300714 it = iter(popen)
715 self.assertEqual(next(it), "line1\n")
716 self.assertEqual(next(it), "line2\n")
717 self.assertEqual(next(it), "line3\n")
718 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000719
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000720 # Verify environ keys and values from the OS are of the
721 # correct str type.
722 def test_keyvalue_types(self):
723 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000724 self.assertEqual(type(key), str)
725 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000726
Christian Heimes90333392007-11-01 19:08:42 +0000727 def test_items(self):
728 for key, value in self._reference().items():
729 self.assertEqual(os.environ.get(key), value)
730
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000731 # Issue 7310
732 def test___repr__(self):
733 """Check that the repr() of os.environ looks like environ({...})."""
734 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000735 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
736 '{!r}: {!r}'.format(key, value)
737 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000738
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000739 def test_get_exec_path(self):
740 defpath_list = os.defpath.split(os.pathsep)
741 test_path = ['/monty', '/python', '', '/flying/circus']
742 test_env = {'PATH': os.pathsep.join(test_path)}
743
744 saved_environ = os.environ
745 try:
746 os.environ = dict(test_env)
747 # Test that defaulting to os.environ works.
748 self.assertSequenceEqual(test_path, os.get_exec_path())
749 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
750 finally:
751 os.environ = saved_environ
752
753 # No PATH environment variable
754 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
755 # Empty PATH environment variable
756 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
757 # Supplied PATH environment variable
758 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
759
Victor Stinnerb745a742010-05-18 17:17:23 +0000760 if os.supports_bytes_environ:
761 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000762 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000763 # ignore BytesWarning warning
764 with warnings.catch_warnings(record=True):
765 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000766 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000767 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000768 pass
769 else:
770 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000771
772 # bytes key and/or value
773 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
774 ['abc'])
775 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
776 ['abc'])
777 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
778 ['abc'])
779
780 @unittest.skipUnless(os.supports_bytes_environ,
781 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000782 def test_environb(self):
783 # os.environ -> os.environb
784 value = 'euro\u20ac'
785 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000786 value_bytes = value.encode(sys.getfilesystemencoding(),
787 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000788 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000789 msg = "U+20AC character is not encodable to %s" % (
790 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000791 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000792 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000793 self.assertEqual(os.environ['unicode'], value)
794 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000795
796 # os.environb -> os.environ
797 value = b'\xff'
798 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000799 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000801 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000802
Charles-François Natali2966f102011-11-26 11:32:46 +0100803 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
804 # #13415).
805 @support.requires_freebsd_version(7)
806 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100807 def test_unset_error(self):
808 if sys.platform == "win32":
809 # an environment variable is limited to 32,767 characters
810 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100811 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100812 else:
813 # "=" is not allowed in a variable name
814 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100815 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100816
Victor Stinner6d101392013-04-14 16:35:04 +0200817 def test_key_type(self):
818 missing = 'missingkey'
819 self.assertNotIn(missing, os.environ)
820
Victor Stinner839e5ea2013-04-14 16:43:03 +0200821 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200822 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200823 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200824 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200825
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200827 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200828 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200829 self.assertTrue(cm.exception.__suppress_context__)
830
Victor Stinner6d101392013-04-14 16:35:04 +0200831
Tim Petersc4e09402003-04-25 07:11:48 +0000832class WalkTests(unittest.TestCase):
833 """Tests for os.walk()."""
834
Victor Stinner0561c532015-03-12 10:28:24 +0100835 # Wrapper to hide minor differences between os.walk and os.fwalk
836 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200837 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200838 if 'follow_symlinks' in kwargs:
839 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200840 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100841
Charles-François Natali7372b062012-02-05 15:15:38 +0100842 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100843 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100844 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000845
846 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000847 # TESTFN/
848 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000849 # tmp1
850 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000851 # tmp2
852 # SUB11/ no kids
853 # SUB2/ a file kid and a dirsymlink kid
854 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300855 # SUB21/ not readable
856 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000857 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200858 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300859 # broken_link2
860 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000861 # TEST2/
862 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100863 self.walk_path = join(support.TESTFN, "TEST1")
864 self.sub1_path = join(self.walk_path, "SUB1")
865 self.sub11_path = join(self.sub1_path, "SUB11")
866 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300867 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100868 tmp1_path = join(self.walk_path, "tmp1")
869 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000870 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300871 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100872 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000873 t2_path = join(support.TESTFN, "TEST2")
874 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200875 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300876 broken_link2_path = join(sub2_path, "broken_link2")
877 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000878
879 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100880 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000881 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300882 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000883 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100884
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300885 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100886 with open(path, "x") as f:
887 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000888
Victor Stinner0561c532015-03-12 10:28:24 +0100889 if support.can_symlink():
890 os.symlink(os.path.abspath(t2_path), self.link_path)
891 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300892 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
893 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300894 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300895 ["broken_link", "broken_link2", "broken_link3",
896 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100897 else:
898 self.sub2_tree = (sub2_path, [], ["tmp3"])
899
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300900 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300901 try:
902 os.listdir(sub21_path)
903 except PermissionError:
904 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
905 else:
906 os.chmod(sub21_path, stat.S_IRWXU)
907 os.unlink(tmp5_path)
908 os.rmdir(sub21_path)
909 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300910
Victor Stinner0561c532015-03-12 10:28:24 +0100911 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000912 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300913 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100914
Tim Petersc4e09402003-04-25 07:11:48 +0000915 self.assertEqual(len(all), 4)
916 # We can't know which order SUB1 and SUB2 will appear in.
917 # Not flipped: TESTFN, SUB1, SUB11, SUB2
918 # flipped: TESTFN, SUB2, SUB1, SUB11
919 flipped = all[0][1][0] != "SUB1"
920 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200921 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300922 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100923 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
924 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
925 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
926 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000927
Brett Cannon3f9183b2016-08-26 14:44:48 -0700928 def test_walk_prune(self, walk_path=None):
929 if walk_path is None:
930 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000931 # Prune the search.
932 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700933 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000934 all.append((root, dirs, files))
935 # Don't descend into SUB1.
936 if 'SUB1' in dirs:
937 # Note that this also mutates the dirs we appended to all!
938 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000939
Victor Stinner0561c532015-03-12 10:28:24 +0100940 self.assertEqual(len(all), 2)
941 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700942 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100943
944 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300945 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100946 self.assertEqual(all[1], self.sub2_tree)
947
Brett Cannon3f9183b2016-08-26 14:44:48 -0700948 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700949 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700950
Victor Stinner0561c532015-03-12 10:28:24 +0100951 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000952 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100953 all = list(self.walk(self.walk_path, topdown=False))
954
Victor Stinner53b0a412016-03-26 01:12:36 +0100955 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000956 # We can't know which order SUB1 and SUB2 will appear in.
957 # Not flipped: SUB11, SUB1, SUB2, TESTFN
958 # flipped: SUB2, SUB11, SUB1, TESTFN
959 flipped = all[3][1][0] != "SUB1"
960 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200961 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300962 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100963 self.assertEqual(all[3],
964 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
965 self.assertEqual(all[flipped],
966 (self.sub11_path, [], []))
967 self.assertEqual(all[flipped + 1],
968 (self.sub1_path, ["SUB11"], ["tmp2"]))
969 self.assertEqual(all[2 - 2 * flipped],
970 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000971
Victor Stinner0561c532015-03-12 10:28:24 +0100972 def test_walk_symlink(self):
973 if not support.can_symlink():
974 self.skipTest("need symlink support")
975
976 # Walk, following symlinks.
977 walk_it = self.walk(self.walk_path, follow_symlinks=True)
978 for root, dirs, files in walk_it:
979 if root == self.link_path:
980 self.assertEqual(dirs, [])
981 self.assertEqual(files, ["tmp4"])
982 break
983 else:
984 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000985
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200986 def test_walk_bad_dir(self):
987 # Walk top-down.
988 errors = []
989 walk_it = self.walk(self.walk_path, onerror=errors.append)
990 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300991 self.assertEqual(errors, [])
992 dir1 = 'SUB1'
993 path1 = os.path.join(root, dir1)
994 path1new = os.path.join(root, dir1 + '.new')
995 os.rename(path1, path1new)
996 try:
997 roots = [r for r, d, f in walk_it]
998 self.assertTrue(errors)
999 self.assertNotIn(path1, roots)
1000 self.assertNotIn(path1new, roots)
1001 for dir2 in dirs:
1002 if dir2 != dir1:
1003 self.assertIn(os.path.join(root, dir2), roots)
1004 finally:
1005 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001006
Charles-François Natali7372b062012-02-05 15:15:38 +01001007
1008@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1009class FwalkTests(WalkTests):
1010 """Tests for os.fwalk()."""
1011
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001012 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001013 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001014 yield (root, dirs, files)
1015
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001016 def fwalk(self, *args, **kwargs):
1017 return os.fwalk(*args, **kwargs)
1018
Larry Hastingsc48fe982012-06-25 04:49:05 -07001019 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1020 """
1021 compare with walk() results.
1022 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001023 walk_kwargs = walk_kwargs.copy()
1024 fwalk_kwargs = fwalk_kwargs.copy()
1025 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1026 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1027 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001028
Charles-François Natali7372b062012-02-05 15:15:38 +01001029 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001030 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001031 expected[root] = (set(dirs), set(files))
1032
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001033 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001034 self.assertIn(root, expected)
1035 self.assertEqual(expected[root], (set(dirs), set(files)))
1036
Larry Hastingsc48fe982012-06-25 04:49:05 -07001037 def test_compare_to_walk(self):
1038 kwargs = {'top': support.TESTFN}
1039 self._compare_to_walk(kwargs, kwargs)
1040
Charles-François Natali7372b062012-02-05 15:15:38 +01001041 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001042 try:
1043 fd = os.open(".", os.O_RDONLY)
1044 walk_kwargs = {'top': support.TESTFN}
1045 fwalk_kwargs = walk_kwargs.copy()
1046 fwalk_kwargs['dir_fd'] = fd
1047 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1048 finally:
1049 os.close(fd)
1050
1051 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001052 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001053 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1054 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001055 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 # check that the FD is valid
1057 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001058 # redundant check
1059 os.stat(rootfd)
1060 # check that listdir() returns consistent information
1061 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001062
1063 def test_fd_leak(self):
1064 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1065 # we both check that calling fwalk() a large number of times doesn't
1066 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1067 minfd = os.dup(1)
1068 os.close(minfd)
1069 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001070 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001071 pass
1072 newfd = os.dup(1)
1073 self.addCleanup(os.close, newfd)
1074 self.assertEqual(newfd, minfd)
1075
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001076class BytesWalkTests(WalkTests):
1077 """Tests for os.walk() with bytes."""
1078 def walk(self, top, **kwargs):
1079 if 'follow_symlinks' in kwargs:
1080 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1081 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1082 root = os.fsdecode(broot)
1083 dirs = list(map(os.fsdecode, bdirs))
1084 files = list(map(os.fsdecode, bfiles))
1085 yield (root, dirs, files)
1086 bdirs[:] = list(map(os.fsencode, dirs))
1087 bfiles[:] = list(map(os.fsencode, files))
1088
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001089@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1090class BytesFwalkTests(FwalkTests):
1091 """Tests for os.walk() with bytes."""
1092 def fwalk(self, top='.', *args, **kwargs):
1093 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1094 root = os.fsdecode(broot)
1095 dirs = list(map(os.fsdecode, bdirs))
1096 files = list(map(os.fsdecode, bfiles))
1097 yield (root, dirs, files, topfd)
1098 bdirs[:] = list(map(os.fsencode, dirs))
1099 bfiles[:] = list(map(os.fsencode, files))
1100
Charles-François Natali7372b062012-02-05 15:15:38 +01001101
Guido van Rossume7ba4952007-06-06 23:52:48 +00001102class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001103 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001104 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001105
1106 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001107 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001108 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1109 os.makedirs(path) # Should work
1110 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1111 os.makedirs(path)
1112
1113 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001114 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001115 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1116 os.makedirs(path)
1117 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1118 'dir5', 'dir6')
1119 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001120
Serhiy Storchakae304e332017-03-24 13:27:42 +02001121 def test_mode(self):
1122 with support.temp_umask(0o002):
1123 base = support.TESTFN
1124 parent = os.path.join(base, 'dir1')
1125 path = os.path.join(parent, 'dir2')
1126 os.makedirs(path, 0o555)
1127 self.assertTrue(os.path.exists(path))
1128 self.assertTrue(os.path.isdir(path))
1129 if os.name != 'nt':
1130 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1131 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1132
Terry Reedy5a22b652010-12-02 07:05:56 +00001133 def test_exist_ok_existing_directory(self):
1134 path = os.path.join(support.TESTFN, 'dir1')
1135 mode = 0o777
1136 old_mask = os.umask(0o022)
1137 os.makedirs(path, mode)
1138 self.assertRaises(OSError, os.makedirs, path, mode)
1139 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001140 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001141 os.makedirs(path, mode=mode, exist_ok=True)
1142 os.umask(old_mask)
1143
Martin Pantera82642f2015-11-19 04:48:44 +00001144 # Issue #25583: A drive root could raise PermissionError on Windows
1145 os.makedirs(os.path.abspath('/'), exist_ok=True)
1146
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001147 def test_exist_ok_s_isgid_directory(self):
1148 path = os.path.join(support.TESTFN, 'dir1')
1149 S_ISGID = stat.S_ISGID
1150 mode = 0o777
1151 old_mask = os.umask(0o022)
1152 try:
1153 existing_testfn_mode = stat.S_IMODE(
1154 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001155 try:
1156 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001157 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001158 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001159 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1160 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1161 # The os should apply S_ISGID from the parent dir for us, but
1162 # this test need not depend on that behavior. Be explicit.
1163 os.makedirs(path, mode | S_ISGID)
1164 # http://bugs.python.org/issue14992
1165 # Should not fail when the bit is already set.
1166 os.makedirs(path, mode, exist_ok=True)
1167 # remove the bit.
1168 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001169 # May work even when the bit is not already set when demanded.
1170 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001171 finally:
1172 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001173
1174 def test_exist_ok_existing_regular_file(self):
1175 base = support.TESTFN
1176 path = os.path.join(support.TESTFN, 'dir1')
1177 f = open(path, 'w')
1178 f.write('abc')
1179 f.close()
1180 self.assertRaises(OSError, os.makedirs, path)
1181 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1182 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1183 os.remove(path)
1184
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001185 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001186 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001187 'dir4', 'dir5', 'dir6')
1188 # If the tests failed, the bottom-most directory ('../dir6')
1189 # may not have been created, so we look for the outermost directory
1190 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001191 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001192 path = os.path.dirname(path)
1193
1194 os.removedirs(path)
1195
Andrew Svetlov405faed2012-12-25 12:18:09 +02001196
R David Murrayf2ad1732014-12-25 18:36:56 -05001197@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1198class ChownFileTests(unittest.TestCase):
1199
Berker Peksag036a71b2015-07-21 09:29:48 +03001200 @classmethod
1201 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001202 os.mkdir(support.TESTFN)
1203
1204 def test_chown_uid_gid_arguments_must_be_index(self):
1205 stat = os.stat(support.TESTFN)
1206 uid = stat.st_uid
1207 gid = stat.st_gid
1208 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1209 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1210 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1211 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1212 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1213
1214 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1215 def test_chown(self):
1216 gid_1, gid_2 = groups[:2]
1217 uid = os.stat(support.TESTFN).st_uid
1218 os.chown(support.TESTFN, uid, gid_1)
1219 gid = os.stat(support.TESTFN).st_gid
1220 self.assertEqual(gid, gid_1)
1221 os.chown(support.TESTFN, uid, gid_2)
1222 gid = os.stat(support.TESTFN).st_gid
1223 self.assertEqual(gid, gid_2)
1224
1225 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1226 "test needs root privilege and more than one user")
1227 def test_chown_with_root(self):
1228 uid_1, uid_2 = all_users[:2]
1229 gid = os.stat(support.TESTFN).st_gid
1230 os.chown(support.TESTFN, uid_1, gid)
1231 uid = os.stat(support.TESTFN).st_uid
1232 self.assertEqual(uid, uid_1)
1233 os.chown(support.TESTFN, uid_2, gid)
1234 uid = os.stat(support.TESTFN).st_uid
1235 self.assertEqual(uid, uid_2)
1236
1237 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1238 "test needs non-root account and more than one user")
1239 def test_chown_without_permission(self):
1240 uid_1, uid_2 = all_users[:2]
1241 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001242 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001243 os.chown(support.TESTFN, uid_1, gid)
1244 os.chown(support.TESTFN, uid_2, gid)
1245
Berker Peksag036a71b2015-07-21 09:29:48 +03001246 @classmethod
1247 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001248 os.rmdir(support.TESTFN)
1249
1250
Andrew Svetlov405faed2012-12-25 12:18:09 +02001251class RemoveDirsTests(unittest.TestCase):
1252 def setUp(self):
1253 os.makedirs(support.TESTFN)
1254
1255 def tearDown(self):
1256 support.rmtree(support.TESTFN)
1257
1258 def test_remove_all(self):
1259 dira = os.path.join(support.TESTFN, 'dira')
1260 os.mkdir(dira)
1261 dirb = os.path.join(dira, 'dirb')
1262 os.mkdir(dirb)
1263 os.removedirs(dirb)
1264 self.assertFalse(os.path.exists(dirb))
1265 self.assertFalse(os.path.exists(dira))
1266 self.assertFalse(os.path.exists(support.TESTFN))
1267
1268 def test_remove_partial(self):
1269 dira = os.path.join(support.TESTFN, 'dira')
1270 os.mkdir(dira)
1271 dirb = os.path.join(dira, 'dirb')
1272 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001273 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001274 os.removedirs(dirb)
1275 self.assertFalse(os.path.exists(dirb))
1276 self.assertTrue(os.path.exists(dira))
1277 self.assertTrue(os.path.exists(support.TESTFN))
1278
1279 def test_remove_nothing(self):
1280 dira = os.path.join(support.TESTFN, 'dira')
1281 os.mkdir(dira)
1282 dirb = os.path.join(dira, 'dirb')
1283 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001284 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001285 with self.assertRaises(OSError):
1286 os.removedirs(dirb)
1287 self.assertTrue(os.path.exists(dirb))
1288 self.assertTrue(os.path.exists(dira))
1289 self.assertTrue(os.path.exists(support.TESTFN))
1290
1291
Guido van Rossume7ba4952007-06-06 23:52:48 +00001292class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001293 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001294 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001295 f.write(b'hello')
1296 f.close()
1297 with open(os.devnull, 'rb') as f:
1298 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001299
Andrew Svetlov405faed2012-12-25 12:18:09 +02001300
Guido van Rossume7ba4952007-06-06 23:52:48 +00001301class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001302 def test_urandom_length(self):
1303 self.assertEqual(len(os.urandom(0)), 0)
1304 self.assertEqual(len(os.urandom(1)), 1)
1305 self.assertEqual(len(os.urandom(10)), 10)
1306 self.assertEqual(len(os.urandom(100)), 100)
1307 self.assertEqual(len(os.urandom(1000)), 1000)
1308
1309 def test_urandom_value(self):
1310 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001311 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001312 data2 = os.urandom(16)
1313 self.assertNotEqual(data1, data2)
1314
1315 def get_urandom_subprocess(self, count):
1316 code = '\n'.join((
1317 'import os, sys',
1318 'data = os.urandom(%s)' % count,
1319 'sys.stdout.buffer.write(data)',
1320 'sys.stdout.buffer.flush()'))
1321 out = assert_python_ok('-c', code)
1322 stdout = out[1]
1323 self.assertEqual(len(stdout), 16)
1324 return stdout
1325
1326 def test_urandom_subprocess(self):
1327 data1 = self.get_urandom_subprocess(16)
1328 data2 = self.get_urandom_subprocess(16)
1329 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001330
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001331
Victor Stinner9b1f4742016-09-06 16:18:52 -07001332@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1333class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001334 @classmethod
1335 def setUpClass(cls):
1336 try:
1337 os.getrandom(1)
1338 except OSError as exc:
1339 if exc.errno == errno.ENOSYS:
1340 # Python compiled on a more recent Linux version
1341 # than the current Linux kernel
1342 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1343 else:
1344 raise
1345
Victor Stinner9b1f4742016-09-06 16:18:52 -07001346 def test_getrandom_type(self):
1347 data = os.getrandom(16)
1348 self.assertIsInstance(data, bytes)
1349 self.assertEqual(len(data), 16)
1350
1351 def test_getrandom0(self):
1352 empty = os.getrandom(0)
1353 self.assertEqual(empty, b'')
1354
1355 def test_getrandom_random(self):
1356 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1357
1358 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1359 # resource /dev/random
1360
1361 def test_getrandom_nonblock(self):
1362 # The call must not fail. Check also that the flag exists
1363 try:
1364 os.getrandom(1, os.GRND_NONBLOCK)
1365 except BlockingIOError:
1366 # System urandom is not initialized yet
1367 pass
1368
1369 def test_getrandom_value(self):
1370 data1 = os.getrandom(16)
1371 data2 = os.getrandom(16)
1372 self.assertNotEqual(data1, data2)
1373
1374
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001375# os.urandom() doesn't use a file descriptor when it is implemented with the
1376# getentropy() function, the getrandom() function or the getrandom() syscall
1377OS_URANDOM_DONT_USE_FD = (
1378 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1379 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1380 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001381
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001382@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1383 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001384class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001385 @unittest.skipUnless(resource, "test requires the resource module")
1386 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001387 # Check urandom() failing when it is not able to open /dev/random.
1388 # We spawn a new process to make the test more robust (if getrlimit()
1389 # failed to restore the file descriptor limit after this, the whole
1390 # test suite would crash; this actually happened on the OS X Tiger
1391 # buildbot).
1392 code = """if 1:
1393 import errno
1394 import os
1395 import resource
1396
1397 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1398 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1399 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001400 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001401 except OSError as e:
1402 assert e.errno == errno.EMFILE, e.errno
1403 else:
1404 raise AssertionError("OSError not raised")
1405 """
1406 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001407
Antoine Pitroue472aea2014-04-26 14:33:03 +02001408 def test_urandom_fd_closed(self):
1409 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1410 # closed.
1411 code = """if 1:
1412 import os
1413 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001414 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001415 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001416 with test.support.SuppressCrashReport():
1417 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001418 sys.stdout.buffer.write(os.urandom(4))
1419 """
1420 rc, out, err = assert_python_ok('-Sc', code)
1421
1422 def test_urandom_fd_reopened(self):
1423 # Issue #21207: urandom() should detect its fd to /dev/urandom
1424 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001425 self.addCleanup(support.unlink, support.TESTFN)
1426 create_file(support.TESTFN, b"x" * 256)
1427
Antoine Pitroue472aea2014-04-26 14:33:03 +02001428 code = """if 1:
1429 import os
1430 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001431 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001432 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001433 with test.support.SuppressCrashReport():
1434 for fd in range(3, 256):
1435 try:
1436 os.close(fd)
1437 except OSError:
1438 pass
1439 else:
1440 # Found the urandom fd (XXX hopefully)
1441 break
1442 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001443 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001444 new_fd = f.fileno()
1445 # Issue #26935: posix allows new_fd and fd to be equal but
1446 # some libc implementations have dup2 return an error in this
1447 # case.
1448 if new_fd != fd:
1449 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001450 sys.stdout.buffer.write(os.urandom(4))
1451 sys.stdout.buffer.write(os.urandom(4))
1452 """.format(TESTFN=support.TESTFN)
1453 rc, out, err = assert_python_ok('-Sc', code)
1454 self.assertEqual(len(out), 8)
1455 self.assertNotEqual(out[0:4], out[4:8])
1456 rc, out2, err2 = assert_python_ok('-Sc', code)
1457 self.assertEqual(len(out2), 8)
1458 self.assertNotEqual(out2, out)
1459
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001460
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001461@contextlib.contextmanager
1462def _execvpe_mockup(defpath=None):
1463 """
1464 Stubs out execv and execve functions when used as context manager.
1465 Records exec calls. The mock execv and execve functions always raise an
1466 exception as they would normally never return.
1467 """
1468 # A list of tuples containing (function name, first arg, args)
1469 # of calls to execv or execve that have been made.
1470 calls = []
1471
1472 def mock_execv(name, *args):
1473 calls.append(('execv', name, args))
1474 raise RuntimeError("execv called")
1475
1476 def mock_execve(name, *args):
1477 calls.append(('execve', name, args))
1478 raise OSError(errno.ENOTDIR, "execve called")
1479
1480 try:
1481 orig_execv = os.execv
1482 orig_execve = os.execve
1483 orig_defpath = os.defpath
1484 os.execv = mock_execv
1485 os.execve = mock_execve
1486 if defpath is not None:
1487 os.defpath = defpath
1488 yield calls
1489 finally:
1490 os.execv = orig_execv
1491 os.execve = orig_execve
1492 os.defpath = orig_defpath
1493
Victor Stinner4659ccf2016-09-14 10:57:00 +02001494
Guido van Rossume7ba4952007-06-06 23:52:48 +00001495class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001496 @unittest.skipIf(USING_LINUXTHREADS,
1497 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001498 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001499 self.assertRaises(OSError, os.execvpe, 'no such app-',
1500 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001501
Steve Dowerbce26262016-11-19 19:17:26 -08001502 def test_execv_with_bad_arglist(self):
1503 self.assertRaises(ValueError, os.execv, 'notepad', ())
1504 self.assertRaises(ValueError, os.execv, 'notepad', [])
1505 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1506 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1507
Thomas Heller6790d602007-08-30 17:15:14 +00001508 def test_execvpe_with_bad_arglist(self):
1509 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001510 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1511 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001512
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001513 @unittest.skipUnless(hasattr(os, '_execvpe'),
1514 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001515 def _test_internal_execvpe(self, test_type):
1516 program_path = os.sep + 'absolutepath'
1517 if test_type is bytes:
1518 program = b'executable'
1519 fullpath = os.path.join(os.fsencode(program_path), program)
1520 native_fullpath = fullpath
1521 arguments = [b'progname', 'arg1', 'arg2']
1522 else:
1523 program = 'executable'
1524 arguments = ['progname', 'arg1', 'arg2']
1525 fullpath = os.path.join(program_path, program)
1526 if os.name != "nt":
1527 native_fullpath = os.fsencode(fullpath)
1528 else:
1529 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001530 env = {'spam': 'beans'}
1531
Victor Stinnerb745a742010-05-18 17:17:23 +00001532 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001533 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001534 self.assertRaises(RuntimeError,
1535 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001536 self.assertEqual(len(calls), 1)
1537 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1538
Victor Stinnerb745a742010-05-18 17:17:23 +00001539 # test os._execvpe() with a relative path:
1540 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001541 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001542 self.assertRaises(OSError,
1543 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001544 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001545 self.assertSequenceEqual(calls[0],
1546 ('execve', native_fullpath, (arguments, env)))
1547
1548 # test os._execvpe() with a relative path:
1549 # os.get_exec_path() reads the 'PATH' variable
1550 with _execvpe_mockup() as calls:
1551 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001552 if test_type is bytes:
1553 env_path[b'PATH'] = program_path
1554 else:
1555 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001556 self.assertRaises(OSError,
1557 os._execvpe, program, arguments, env=env_path)
1558 self.assertEqual(len(calls), 1)
1559 self.assertSequenceEqual(calls[0],
1560 ('execve', native_fullpath, (arguments, env_path)))
1561
1562 def test_internal_execvpe_str(self):
1563 self._test_internal_execvpe(str)
1564 if os.name != "nt":
1565 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001566
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001567
Serhiy Storchaka43767632013-11-03 21:31:38 +02001568@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001569class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001570 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001571 try:
1572 os.stat(support.TESTFN)
1573 except FileNotFoundError:
1574 exists = False
1575 except OSError as exc:
1576 exists = True
1577 self.fail("file %s must not exist; os.stat failed with %s"
1578 % (support.TESTFN, exc))
1579 else:
1580 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001581
Thomas Wouters477c8d52006-05-27 19:21:47 +00001582 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001583 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001584
1585 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001586 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001587
1588 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001589 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001590
1591 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001592 self.addCleanup(support.unlink, support.TESTFN)
1593
Victor Stinnere77c9742016-03-25 10:28:23 +01001594 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001595 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001596
1597 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001598 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001599
Thomas Wouters477c8d52006-05-27 19:21:47 +00001600 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001601 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001602
Victor Stinnere77c9742016-03-25 10:28:23 +01001603
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001604class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001605 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001606 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1607 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001608 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001609 def get_single(f):
1610 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001611 if hasattr(os, f):
1612 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001613 return helper
1614 for f in singles:
1615 locals()["test_"+f] = get_single(f)
1616
Benjamin Peterson7522c742009-01-19 21:00:09 +00001617 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001618 try:
1619 f(support.make_bad_fd(), *args)
1620 except OSError as e:
1621 self.assertEqual(e.errno, errno.EBADF)
1622 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001623 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001624 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001625
Serhiy Storchaka43767632013-11-03 21:31:38 +02001626 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001627 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001628 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001629
Serhiy Storchaka43767632013-11-03 21:31:38 +02001630 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001631 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001632 fd = support.make_bad_fd()
1633 # Make sure none of the descriptors we are about to close are
1634 # currently valid (issue 6542).
1635 for i in range(10):
1636 try: os.fstat(fd+i)
1637 except OSError:
1638 pass
1639 else:
1640 break
1641 if i < 2:
1642 raise unittest.SkipTest(
1643 "Unable to acquire a range of invalid file descriptors")
1644 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001645
Serhiy Storchaka43767632013-11-03 21:31:38 +02001646 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001647 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001648 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001649
Serhiy Storchaka43767632013-11-03 21:31:38 +02001650 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001651 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001653
Serhiy Storchaka43767632013-11-03 21:31:38 +02001654 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001655 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001656 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001657
Serhiy Storchaka43767632013-11-03 21:31:38 +02001658 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001659 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001660 self.check(os.pathconf, "PC_NAME_MAX")
1661 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 self.check(os.truncate, 0)
1666 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001667
Serhiy Storchaka43767632013-11-03 21:31:38 +02001668 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001669 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001670 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001671
Serhiy Storchaka43767632013-11-03 21:31:38 +02001672 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001673 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001674 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001675
Victor Stinner57ddf782014-01-08 15:21:28 +01001676 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1677 def test_readv(self):
1678 buf = bytearray(10)
1679 self.check(os.readv, [buf])
1680
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001682 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001686 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001687 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001688
Victor Stinner57ddf782014-01-08 15:21:28 +01001689 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1690 def test_writev(self):
1691 self.check(os.writev, [b'abc'])
1692
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001693 def test_inheritable(self):
1694 self.check(os.get_inheritable)
1695 self.check(os.set_inheritable, True)
1696
1697 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1698 'needs os.get_blocking() and os.set_blocking()')
1699 def test_blocking(self):
1700 self.check(os.get_blocking)
1701 self.check(os.set_blocking, True)
1702
Brian Curtin1b9df392010-11-24 20:24:31 +00001703
1704class LinkTests(unittest.TestCase):
1705 def setUp(self):
1706 self.file1 = support.TESTFN
1707 self.file2 = os.path.join(support.TESTFN + "2")
1708
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001709 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001710 for file in (self.file1, self.file2):
1711 if os.path.exists(file):
1712 os.unlink(file)
1713
Brian Curtin1b9df392010-11-24 20:24:31 +00001714 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001715 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001716
Steve Dowercc16be82016-09-08 10:35:16 -07001717 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001718 with open(file1, "r") as f1, open(file2, "r") as f2:
1719 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1720
1721 def test_link(self):
1722 self._test_link(self.file1, self.file2)
1723
1724 def test_link_bytes(self):
1725 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1726 bytes(self.file2, sys.getfilesystemencoding()))
1727
Brian Curtinf498b752010-11-30 15:54:04 +00001728 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001729 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001730 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001731 except UnicodeError:
1732 raise unittest.SkipTest("Unable to encode for this platform.")
1733
Brian Curtinf498b752010-11-30 15:54:04 +00001734 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001735 self.file2 = self.file1 + "2"
1736 self._test_link(self.file1, self.file2)
1737
Serhiy Storchaka43767632013-11-03 21:31:38 +02001738@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1739class PosixUidGidTests(unittest.TestCase):
1740 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1741 def test_setuid(self):
1742 if os.getuid() != 0:
1743 self.assertRaises(OSError, os.setuid, 0)
1744 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001745
Serhiy Storchaka43767632013-11-03 21:31:38 +02001746 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1747 def test_setgid(self):
1748 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1749 self.assertRaises(OSError, os.setgid, 0)
1750 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001751
Serhiy Storchaka43767632013-11-03 21:31:38 +02001752 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1753 def test_seteuid(self):
1754 if os.getuid() != 0:
1755 self.assertRaises(OSError, os.seteuid, 0)
1756 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001757
Serhiy Storchaka43767632013-11-03 21:31:38 +02001758 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1759 def test_setegid(self):
1760 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1761 self.assertRaises(OSError, os.setegid, 0)
1762 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001763
Serhiy Storchaka43767632013-11-03 21:31:38 +02001764 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1765 def test_setreuid(self):
1766 if os.getuid() != 0:
1767 self.assertRaises(OSError, os.setreuid, 0, 0)
1768 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1769 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001770
Serhiy Storchaka43767632013-11-03 21:31:38 +02001771 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1772 def test_setreuid_neg1(self):
1773 # Needs to accept -1. We run this in a subprocess to avoid
1774 # altering the test runner's process state (issue8045).
1775 subprocess.check_call([
1776 sys.executable, '-c',
1777 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001778
Serhiy Storchaka43767632013-11-03 21:31:38 +02001779 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1780 def test_setregid(self):
1781 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1782 self.assertRaises(OSError, os.setregid, 0, 0)
1783 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1784 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001785
Serhiy Storchaka43767632013-11-03 21:31:38 +02001786 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1787 def test_setregid_neg1(self):
1788 # Needs to accept -1. We run this in a subprocess to avoid
1789 # altering the test runner's process state (issue8045).
1790 subprocess.check_call([
1791 sys.executable, '-c',
1792 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001793
Serhiy Storchaka43767632013-11-03 21:31:38 +02001794@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1795class Pep383Tests(unittest.TestCase):
1796 def setUp(self):
1797 if support.TESTFN_UNENCODABLE:
1798 self.dir = support.TESTFN_UNENCODABLE
1799 elif support.TESTFN_NONASCII:
1800 self.dir = support.TESTFN_NONASCII
1801 else:
1802 self.dir = support.TESTFN
1803 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001804
Serhiy Storchaka43767632013-11-03 21:31:38 +02001805 bytesfn = []
1806 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001807 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001808 fn = os.fsencode(fn)
1809 except UnicodeEncodeError:
1810 return
1811 bytesfn.append(fn)
1812 add_filename(support.TESTFN_UNICODE)
1813 if support.TESTFN_UNENCODABLE:
1814 add_filename(support.TESTFN_UNENCODABLE)
1815 if support.TESTFN_NONASCII:
1816 add_filename(support.TESTFN_NONASCII)
1817 if not bytesfn:
1818 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001819
Serhiy Storchaka43767632013-11-03 21:31:38 +02001820 self.unicodefn = set()
1821 os.mkdir(self.dir)
1822 try:
1823 for fn in bytesfn:
1824 support.create_empty_file(os.path.join(self.bdir, fn))
1825 fn = os.fsdecode(fn)
1826 if fn in self.unicodefn:
1827 raise ValueError("duplicate filename")
1828 self.unicodefn.add(fn)
1829 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001830 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001831 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001832
Serhiy Storchaka43767632013-11-03 21:31:38 +02001833 def tearDown(self):
1834 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001835
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 def test_listdir(self):
1837 expected = self.unicodefn
1838 found = set(os.listdir(self.dir))
1839 self.assertEqual(found, expected)
1840 # test listdir without arguments
1841 current_directory = os.getcwd()
1842 try:
1843 os.chdir(os.sep)
1844 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1845 finally:
1846 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001847
Serhiy Storchaka43767632013-11-03 21:31:38 +02001848 def test_open(self):
1849 for fn in self.unicodefn:
1850 f = open(os.path.join(self.dir, fn), 'rb')
1851 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001852
Serhiy Storchaka43767632013-11-03 21:31:38 +02001853 @unittest.skipUnless(hasattr(os, 'statvfs'),
1854 "need os.statvfs()")
1855 def test_statvfs(self):
1856 # issue #9645
1857 for fn in self.unicodefn:
1858 # should not fail with file not found error
1859 fullname = os.path.join(self.dir, fn)
1860 os.statvfs(fullname)
1861
1862 def test_stat(self):
1863 for fn in self.unicodefn:
1864 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001865
Brian Curtineb24d742010-04-12 17:16:38 +00001866@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1867class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001868 def _kill(self, sig):
1869 # Start sys.executable as a subprocess and communicate from the
1870 # subprocess to the parent that the interpreter is ready. When it
1871 # becomes ready, send *sig* via os.kill to the subprocess and check
1872 # that the return code is equal to *sig*.
1873 import ctypes
1874 from ctypes import wintypes
1875 import msvcrt
1876
1877 # Since we can't access the contents of the process' stdout until the
1878 # process has exited, use PeekNamedPipe to see what's inside stdout
1879 # without waiting. This is done so we can tell that the interpreter
1880 # is started and running at a point where it could handle a signal.
1881 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1882 PeekNamedPipe.restype = wintypes.BOOL
1883 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1884 ctypes.POINTER(ctypes.c_char), # stdout buf
1885 wintypes.DWORD, # Buffer size
1886 ctypes.POINTER(wintypes.DWORD), # bytes read
1887 ctypes.POINTER(wintypes.DWORD), # bytes avail
1888 ctypes.POINTER(wintypes.DWORD)) # bytes left
1889 msg = "running"
1890 proc = subprocess.Popen([sys.executable, "-c",
1891 "import sys;"
1892 "sys.stdout.write('{}');"
1893 "sys.stdout.flush();"
1894 "input()".format(msg)],
1895 stdout=subprocess.PIPE,
1896 stderr=subprocess.PIPE,
1897 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001898 self.addCleanup(proc.stdout.close)
1899 self.addCleanup(proc.stderr.close)
1900 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001901
1902 count, max = 0, 100
1903 while count < max and proc.poll() is None:
1904 # Create a string buffer to store the result of stdout from the pipe
1905 buf = ctypes.create_string_buffer(len(msg))
1906 # Obtain the text currently in proc.stdout
1907 # Bytes read/avail/left are left as NULL and unused
1908 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1909 buf, ctypes.sizeof(buf), None, None, None)
1910 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1911 if buf.value:
1912 self.assertEqual(msg, buf.value.decode())
1913 break
1914 time.sleep(0.1)
1915 count += 1
1916 else:
1917 self.fail("Did not receive communication from the subprocess")
1918
Brian Curtineb24d742010-04-12 17:16:38 +00001919 os.kill(proc.pid, sig)
1920 self.assertEqual(proc.wait(), sig)
1921
1922 def test_kill_sigterm(self):
1923 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001924 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001925
1926 def test_kill_int(self):
1927 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001928 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001929
1930 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001931 tagname = "test_os_%s" % uuid.uuid1()
1932 m = mmap.mmap(-1, 1, tagname)
1933 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001934 # Run a script which has console control handling enabled.
1935 proc = subprocess.Popen([sys.executable,
1936 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001937 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001938 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1939 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001940 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001941 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001942 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001943 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001944 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001945 count += 1
1946 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001947 # Forcefully kill the process if we weren't able to signal it.
1948 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001949 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001950 os.kill(proc.pid, event)
1951 # proc.send_signal(event) could also be done here.
1952 # Allow time for the signal to be passed and the process to exit.
1953 time.sleep(0.5)
1954 if not proc.poll():
1955 # Forcefully kill the process if we weren't able to signal it.
1956 os.kill(proc.pid, signal.SIGINT)
1957 self.fail("subprocess did not stop on {}".format(name))
1958
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001959 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001960 def test_CTRL_C_EVENT(self):
1961 from ctypes import wintypes
1962 import ctypes
1963
1964 # Make a NULL value by creating a pointer with no argument.
1965 NULL = ctypes.POINTER(ctypes.c_int)()
1966 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1967 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1968 wintypes.BOOL)
1969 SetConsoleCtrlHandler.restype = wintypes.BOOL
1970
1971 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001972 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001973 # by subprocesses.
1974 SetConsoleCtrlHandler(NULL, 0)
1975
1976 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1977
1978 def test_CTRL_BREAK_EVENT(self):
1979 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1980
1981
Brian Curtind40e6f72010-07-08 21:39:08 +00001982@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001983class Win32ListdirTests(unittest.TestCase):
1984 """Test listdir on Windows."""
1985
1986 def setUp(self):
1987 self.created_paths = []
1988 for i in range(2):
1989 dir_name = 'SUB%d' % i
1990 dir_path = os.path.join(support.TESTFN, dir_name)
1991 file_name = 'FILE%d' % i
1992 file_path = os.path.join(support.TESTFN, file_name)
1993 os.makedirs(dir_path)
1994 with open(file_path, 'w') as f:
1995 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1996 self.created_paths.extend([dir_name, file_name])
1997 self.created_paths.sort()
1998
1999 def tearDown(self):
2000 shutil.rmtree(support.TESTFN)
2001
2002 def test_listdir_no_extended_path(self):
2003 """Test when the path is not an "extended" path."""
2004 # unicode
2005 self.assertEqual(
2006 sorted(os.listdir(support.TESTFN)),
2007 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002008
Tim Golden781bbeb2013-10-25 20:24:06 +01002009 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002010 self.assertEqual(
2011 sorted(os.listdir(os.fsencode(support.TESTFN))),
2012 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002013
2014 def test_listdir_extended_path(self):
2015 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002016 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002017 # unicode
2018 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2019 self.assertEqual(
2020 sorted(os.listdir(path)),
2021 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002022
Tim Golden781bbeb2013-10-25 20:24:06 +01002023 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002024 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2025 self.assertEqual(
2026 sorted(os.listdir(path)),
2027 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002028
2029
2030@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002031@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002032class Win32SymlinkTests(unittest.TestCase):
2033 filelink = 'filelinktest'
2034 filelink_target = os.path.abspath(__file__)
2035 dirlink = 'dirlinktest'
2036 dirlink_target = os.path.dirname(filelink_target)
2037 missing_link = 'missing link'
2038
2039 def setUp(self):
2040 assert os.path.exists(self.dirlink_target)
2041 assert os.path.exists(self.filelink_target)
2042 assert not os.path.exists(self.dirlink)
2043 assert not os.path.exists(self.filelink)
2044 assert not os.path.exists(self.missing_link)
2045
2046 def tearDown(self):
2047 if os.path.exists(self.filelink):
2048 os.remove(self.filelink)
2049 if os.path.exists(self.dirlink):
2050 os.rmdir(self.dirlink)
2051 if os.path.lexists(self.missing_link):
2052 os.remove(self.missing_link)
2053
2054 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002055 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002056 self.assertTrue(os.path.exists(self.dirlink))
2057 self.assertTrue(os.path.isdir(self.dirlink))
2058 self.assertTrue(os.path.islink(self.dirlink))
2059 self.check_stat(self.dirlink, self.dirlink_target)
2060
2061 def test_file_link(self):
2062 os.symlink(self.filelink_target, self.filelink)
2063 self.assertTrue(os.path.exists(self.filelink))
2064 self.assertTrue(os.path.isfile(self.filelink))
2065 self.assertTrue(os.path.islink(self.filelink))
2066 self.check_stat(self.filelink, self.filelink_target)
2067
2068 def _create_missing_dir_link(self):
2069 'Create a "directory" link to a non-existent target'
2070 linkname = self.missing_link
2071 if os.path.lexists(linkname):
2072 os.remove(linkname)
2073 target = r'c:\\target does not exist.29r3c740'
2074 assert not os.path.exists(target)
2075 target_is_dir = True
2076 os.symlink(target, linkname, target_is_dir)
2077
2078 def test_remove_directory_link_to_missing_target(self):
2079 self._create_missing_dir_link()
2080 # For compatibility with Unix, os.remove will check the
2081 # directory status and call RemoveDirectory if the symlink
2082 # was created with target_is_dir==True.
2083 os.remove(self.missing_link)
2084
2085 @unittest.skip("currently fails; consider for improvement")
2086 def test_isdir_on_directory_link_to_missing_target(self):
2087 self._create_missing_dir_link()
2088 # consider having isdir return true for directory links
2089 self.assertTrue(os.path.isdir(self.missing_link))
2090
2091 @unittest.skip("currently fails; consider for improvement")
2092 def test_rmdir_on_directory_link_to_missing_target(self):
2093 self._create_missing_dir_link()
2094 # consider allowing rmdir to remove directory links
2095 os.rmdir(self.missing_link)
2096
2097 def check_stat(self, link, target):
2098 self.assertEqual(os.stat(link), os.stat(target))
2099 self.assertNotEqual(os.lstat(link), os.stat(link))
2100
Brian Curtind25aef52011-06-13 15:16:04 -05002101 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002102 self.assertEqual(os.stat(bytes_link), os.stat(target))
2103 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002104
2105 def test_12084(self):
2106 level1 = os.path.abspath(support.TESTFN)
2107 level2 = os.path.join(level1, "level2")
2108 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002109 self.addCleanup(support.rmtree, level1)
2110
2111 os.mkdir(level1)
2112 os.mkdir(level2)
2113 os.mkdir(level3)
2114
2115 file1 = os.path.abspath(os.path.join(level1, "file1"))
2116 create_file(file1)
2117
2118 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002119 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002120 os.chdir(level2)
2121 link = os.path.join(level2, "link")
2122 os.symlink(os.path.relpath(file1), "link")
2123 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002124
Victor Stinnerae39d232016-03-24 17:12:55 +01002125 # Check os.stat calls from the same dir as the link
2126 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002127
Victor Stinnerae39d232016-03-24 17:12:55 +01002128 # Check os.stat calls from a dir below the link
2129 os.chdir(level1)
2130 self.assertEqual(os.stat(file1),
2131 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002132
Victor Stinnerae39d232016-03-24 17:12:55 +01002133 # Check os.stat calls from a dir above the link
2134 os.chdir(level3)
2135 self.assertEqual(os.stat(file1),
2136 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002137 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002138 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002139
Brian Curtind40e6f72010-07-08 21:39:08 +00002140
Tim Golden0321cf22014-05-05 19:46:17 +01002141@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2142class Win32JunctionTests(unittest.TestCase):
2143 junction = 'junctiontest'
2144 junction_target = os.path.dirname(os.path.abspath(__file__))
2145
2146 def setUp(self):
2147 assert os.path.exists(self.junction_target)
2148 assert not os.path.exists(self.junction)
2149
2150 def tearDown(self):
2151 if os.path.exists(self.junction):
2152 # os.rmdir delegates to Windows' RemoveDirectoryW,
2153 # which removes junction points safely.
2154 os.rmdir(self.junction)
2155
2156 def test_create_junction(self):
2157 _winapi.CreateJunction(self.junction_target, self.junction)
2158 self.assertTrue(os.path.exists(self.junction))
2159 self.assertTrue(os.path.isdir(self.junction))
2160
2161 # Junctions are not recognized as links.
2162 self.assertFalse(os.path.islink(self.junction))
2163
2164 def test_unlink_removes_junction(self):
2165 _winapi.CreateJunction(self.junction_target, self.junction)
2166 self.assertTrue(os.path.exists(self.junction))
2167
2168 os.unlink(self.junction)
2169 self.assertFalse(os.path.exists(self.junction))
2170
2171
Jason R. Coombs3a092862013-05-27 23:21:28 -04002172@support.skip_unless_symlink
2173class NonLocalSymlinkTests(unittest.TestCase):
2174
2175 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002176 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002177 Create this structure:
2178
2179 base
2180 \___ some_dir
2181 """
2182 os.makedirs('base/some_dir')
2183
2184 def tearDown(self):
2185 shutil.rmtree('base')
2186
2187 def test_directory_link_nonlocal(self):
2188 """
2189 The symlink target should resolve relative to the link, not relative
2190 to the current directory.
2191
2192 Then, link base/some_link -> base/some_dir and ensure that some_link
2193 is resolved as a directory.
2194
2195 In issue13772, it was discovered that directory detection failed if
2196 the symlink target was not specified relative to the current
2197 directory, which was a defect in the implementation.
2198 """
2199 src = os.path.join('base', 'some_link')
2200 os.symlink('some_dir', src)
2201 assert os.path.isdir(src)
2202
2203
Victor Stinnere8d51452010-08-19 01:05:19 +00002204class FSEncodingTests(unittest.TestCase):
2205 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002206 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2207 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002208
Victor Stinnere8d51452010-08-19 01:05:19 +00002209 def test_identity(self):
2210 # assert fsdecode(fsencode(x)) == x
2211 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2212 try:
2213 bytesfn = os.fsencode(fn)
2214 except UnicodeEncodeError:
2215 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002216 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002217
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002218
Brett Cannonefb00c02012-02-29 18:31:31 -05002219
2220class DeviceEncodingTests(unittest.TestCase):
2221
2222 def test_bad_fd(self):
2223 # Return None when an fd doesn't actually exist.
2224 self.assertIsNone(os.device_encoding(123456))
2225
Philip Jenveye308b7c2012-02-29 16:16:15 -08002226 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2227 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002228 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002229 def test_device_encoding(self):
2230 encoding = os.device_encoding(0)
2231 self.assertIsNotNone(encoding)
2232 self.assertTrue(codecs.lookup(encoding))
2233
2234
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002235class PidTests(unittest.TestCase):
2236 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2237 def test_getppid(self):
2238 p = subprocess.Popen([sys.executable, '-c',
2239 'import os; print(os.getppid())'],
2240 stdout=subprocess.PIPE)
2241 stdout, _ = p.communicate()
2242 # We are the parent of our subprocess
2243 self.assertEqual(int(stdout), os.getpid())
2244
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002245 def test_waitpid(self):
2246 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002247 # Add an implicit test for PyUnicode_FSConverter().
2248 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002249 status = os.waitpid(pid, 0)
2250 self.assertEqual(status, (pid, 0))
2251
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002252
Victor Stinner4659ccf2016-09-14 10:57:00 +02002253class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002254 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002255 self.exitcode = 17
2256
2257 filename = support.TESTFN
2258 self.addCleanup(support.unlink, filename)
2259
2260 if not with_env:
2261 code = 'import sys; sys.exit(%s)' % self.exitcode
2262 else:
2263 self.env = dict(os.environ)
2264 # create an unique key
2265 self.key = str(uuid.uuid4())
2266 self.env[self.key] = self.key
2267 # read the variable from os.environ to check that it exists
2268 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2269 % (self.key, self.exitcode))
2270
2271 with open(filename, "w") as fp:
2272 fp.write(code)
2273
Berker Peksag81816462016-09-15 20:19:47 +03002274 args = [sys.executable, filename]
2275 if use_bytes:
2276 args = [os.fsencode(a) for a in args]
2277 self.env = {os.fsencode(k): os.fsencode(v)
2278 for k, v in self.env.items()}
2279
2280 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002281
Berker Peksag4af23d72016-09-15 20:32:44 +03002282 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002283 def test_spawnl(self):
2284 args = self.create_args()
2285 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2286 self.assertEqual(exitcode, self.exitcode)
2287
Berker Peksag4af23d72016-09-15 20:32:44 +03002288 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002289 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002290 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002291 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2292 self.assertEqual(exitcode, self.exitcode)
2293
Berker Peksag4af23d72016-09-15 20:32:44 +03002294 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002295 def test_spawnlp(self):
2296 args = self.create_args()
2297 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2298 self.assertEqual(exitcode, self.exitcode)
2299
Berker Peksag4af23d72016-09-15 20:32:44 +03002300 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002301 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002302 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002303 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2304 self.assertEqual(exitcode, self.exitcode)
2305
Berker Peksag4af23d72016-09-15 20:32:44 +03002306 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002307 def test_spawnv(self):
2308 args = self.create_args()
2309 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2310 self.assertEqual(exitcode, self.exitcode)
2311
Berker Peksag4af23d72016-09-15 20:32:44 +03002312 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002313 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002314 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002315 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2316 self.assertEqual(exitcode, self.exitcode)
2317
Berker Peksag4af23d72016-09-15 20:32:44 +03002318 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002319 def test_spawnvp(self):
2320 args = self.create_args()
2321 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2322 self.assertEqual(exitcode, self.exitcode)
2323
Berker Peksag4af23d72016-09-15 20:32:44 +03002324 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002325 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002326 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002327 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2328 self.assertEqual(exitcode, self.exitcode)
2329
Berker Peksag4af23d72016-09-15 20:32:44 +03002330 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002331 def test_nowait(self):
2332 args = self.create_args()
2333 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2334 result = os.waitpid(pid, 0)
2335 self.assertEqual(result[0], pid)
2336 status = result[1]
2337 if hasattr(os, 'WIFEXITED'):
2338 self.assertTrue(os.WIFEXITED(status))
2339 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2340 else:
2341 self.assertEqual(status, self.exitcode << 8)
2342
Berker Peksag4af23d72016-09-15 20:32:44 +03002343 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002344 def test_spawnve_bytes(self):
2345 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2346 args = self.create_args(with_env=True, use_bytes=True)
2347 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2348 self.assertEqual(exitcode, self.exitcode)
2349
Steve Dower859fd7b2016-11-19 18:53:19 -08002350 @requires_os_func('spawnl')
2351 def test_spawnl_noargs(self):
2352 args = self.create_args()
2353 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002354 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002355
2356 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002357 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002358 args = self.create_args()
2359 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002360 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002361
2362 @requires_os_func('spawnv')
2363 def test_spawnv_noargs(self):
2364 args = self.create_args()
2365 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2366 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002367 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2368 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002369
2370 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002371 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002372 args = self.create_args()
2373 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2374 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002375 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2376 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002377
Brian Curtin0151b8e2010-09-24 13:43:43 +00002378# The introduction of this TestCase caused at least two different errors on
2379# *nix buildbots. Temporarily skip this to let the buildbots move along.
2380@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002381@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2382class LoginTests(unittest.TestCase):
2383 def test_getlogin(self):
2384 user_name = os.getlogin()
2385 self.assertNotEqual(len(user_name), 0)
2386
2387
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002388@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2389 "needs os.getpriority and os.setpriority")
2390class ProgramPriorityTests(unittest.TestCase):
2391 """Tests for os.getpriority() and os.setpriority()."""
2392
2393 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002394
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002395 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2396 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2397 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002398 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2399 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002400 raise unittest.SkipTest("unable to reliably test setpriority "
2401 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002402 else:
2403 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002404 finally:
2405 try:
2406 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2407 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002408 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002409 raise
2410
2411
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002412if threading is not None:
2413 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002414
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002415 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002416
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002417 def __init__(self, conn):
2418 asynchat.async_chat.__init__(self, conn)
2419 self.in_buffer = []
2420 self.closed = False
2421 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002422
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002423 def handle_read(self):
2424 data = self.recv(4096)
2425 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002426
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002427 def get_data(self):
2428 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002429
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002430 def handle_close(self):
2431 self.close()
2432 self.closed = True
2433
2434 def handle_error(self):
2435 raise
2436
2437 def __init__(self, address):
2438 threading.Thread.__init__(self)
2439 asyncore.dispatcher.__init__(self)
2440 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2441 self.bind(address)
2442 self.listen(5)
2443 self.host, self.port = self.socket.getsockname()[:2]
2444 self.handler_instance = None
2445 self._active = False
2446 self._active_lock = threading.Lock()
2447
2448 # --- public API
2449
2450 @property
2451 def running(self):
2452 return self._active
2453
2454 def start(self):
2455 assert not self.running
2456 self.__flag = threading.Event()
2457 threading.Thread.start(self)
2458 self.__flag.wait()
2459
2460 def stop(self):
2461 assert self.running
2462 self._active = False
2463 self.join()
2464
2465 def wait(self):
2466 # wait for handler connection to be closed, then stop the server
2467 while not getattr(self.handler_instance, "closed", False):
2468 time.sleep(0.001)
2469 self.stop()
2470
2471 # --- internals
2472
2473 def run(self):
2474 self._active = True
2475 self.__flag.set()
2476 while self._active and asyncore.socket_map:
2477 self._active_lock.acquire()
2478 asyncore.loop(timeout=0.001, count=1)
2479 self._active_lock.release()
2480 asyncore.close_all()
2481
2482 def handle_accept(self):
2483 conn, addr = self.accept()
2484 self.handler_instance = self.Handler(conn)
2485
2486 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002487 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002488 handle_read = handle_connect
2489
2490 def writable(self):
2491 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002492
2493 def handle_error(self):
2494 raise
2495
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002496
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002497@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002498@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2499class TestSendfile(unittest.TestCase):
2500
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002501 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002502 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002503 not sys.platform.startswith("solaris") and \
2504 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002505 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2506 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002507
2508 @classmethod
2509 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002510 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002511 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002512
2513 @classmethod
2514 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002515 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002516 support.unlink(support.TESTFN)
2517
2518 def setUp(self):
2519 self.server = SendfileTestServer((support.HOST, 0))
2520 self.server.start()
2521 self.client = socket.socket()
2522 self.client.connect((self.server.host, self.server.port))
2523 self.client.settimeout(1)
2524 # synchronize by waiting for "220 ready" response
2525 self.client.recv(1024)
2526 self.sockno = self.client.fileno()
2527 self.file = open(support.TESTFN, 'rb')
2528 self.fileno = self.file.fileno()
2529
2530 def tearDown(self):
2531 self.file.close()
2532 self.client.close()
2533 if self.server.running:
2534 self.server.stop()
2535
2536 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2537 """A higher level wrapper representing how an application is
2538 supposed to use sendfile().
2539 """
2540 while 1:
2541 try:
2542 if self.SUPPORT_HEADERS_TRAILERS:
2543 return os.sendfile(sock, file, offset, nbytes, headers,
2544 trailers)
2545 else:
2546 return os.sendfile(sock, file, offset, nbytes)
2547 except OSError as err:
2548 if err.errno == errno.ECONNRESET:
2549 # disconnected
2550 raise
2551 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2552 # we have to retry send data
2553 continue
2554 else:
2555 raise
2556
2557 def test_send_whole_file(self):
2558 # normal send
2559 total_sent = 0
2560 offset = 0
2561 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002562 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002563 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2564 if sent == 0:
2565 break
2566 offset += sent
2567 total_sent += sent
2568 self.assertTrue(sent <= nbytes)
2569 self.assertEqual(offset, total_sent)
2570
2571 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002572 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002573 self.client.close()
2574 self.server.wait()
2575 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002576 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002577 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002578
2579 def test_send_at_certain_offset(self):
2580 # start sending a file at a certain offset
2581 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002582 offset = len(self.DATA) // 2
2583 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002584 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002585 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002586 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2587 if sent == 0:
2588 break
2589 offset += sent
2590 total_sent += sent
2591 self.assertTrue(sent <= nbytes)
2592
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002593 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002594 self.client.close()
2595 self.server.wait()
2596 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002597 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002598 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002599 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002600 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002601
2602 def test_offset_overflow(self):
2603 # specify an offset > file size
2604 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002605 try:
2606 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2607 except OSError as e:
2608 # Solaris can raise EINVAL if offset >= file length, ignore.
2609 if e.errno != errno.EINVAL:
2610 raise
2611 else:
2612 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002613 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002614 self.client.close()
2615 self.server.wait()
2616 data = self.server.handler_instance.get_data()
2617 self.assertEqual(data, b'')
2618
2619 def test_invalid_offset(self):
2620 with self.assertRaises(OSError) as cm:
2621 os.sendfile(self.sockno, self.fileno, -1, 4096)
2622 self.assertEqual(cm.exception.errno, errno.EINVAL)
2623
Martin Panterbf19d162015-09-09 01:01:13 +00002624 def test_keywords(self):
2625 # Keyword arguments should be supported
2626 os.sendfile(out=self.sockno, offset=0, count=4096,
2627 **{'in': self.fileno})
2628 if self.SUPPORT_HEADERS_TRAILERS:
2629 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002630 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002631
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002632 # --- headers / trailers tests
2633
Serhiy Storchaka43767632013-11-03 21:31:38 +02002634 @requires_headers_trailers
2635 def test_headers(self):
2636 total_sent = 0
2637 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2638 headers=[b"x" * 512])
2639 total_sent += sent
2640 offset = 4096
2641 nbytes = 4096
2642 while 1:
2643 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2644 offset, nbytes)
2645 if sent == 0:
2646 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002647 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002648 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002649
Serhiy Storchaka43767632013-11-03 21:31:38 +02002650 expected_data = b"x" * 512 + self.DATA
2651 self.assertEqual(total_sent, len(expected_data))
2652 self.client.close()
2653 self.server.wait()
2654 data = self.server.handler_instance.get_data()
2655 self.assertEqual(hash(data), hash(expected_data))
2656
2657 @requires_headers_trailers
2658 def test_trailers(self):
2659 TESTFN2 = support.TESTFN + "2"
2660 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002661
2662 self.addCleanup(support.unlink, TESTFN2)
2663 create_file(TESTFN2, file_data)
2664
2665 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002666 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2667 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002668 self.client.close()
2669 self.server.wait()
2670 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002671 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002672
Serhiy Storchaka43767632013-11-03 21:31:38 +02002673 @requires_headers_trailers
2674 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2675 'test needs os.SF_NODISKIO')
2676 def test_flags(self):
2677 try:
2678 os.sendfile(self.sockno, self.fileno, 0, 4096,
2679 flags=os.SF_NODISKIO)
2680 except OSError as err:
2681 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2682 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002683
2684
Larry Hastings9cf065c2012-06-22 16:30:09 -07002685def supports_extended_attributes():
2686 if not hasattr(os, "setxattr"):
2687 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002688
Larry Hastings9cf065c2012-06-22 16:30:09 -07002689 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002690 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002691 try:
2692 os.setxattr(fp.fileno(), b"user.test", b"")
2693 except OSError:
2694 return False
2695 finally:
2696 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002697
2698 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002699
2700
2701@unittest.skipUnless(supports_extended_attributes(),
2702 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002703# Kernels < 2.6.39 don't respect setxattr flags.
2704@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002705class ExtendedAttributeTests(unittest.TestCase):
2706
Larry Hastings9cf065c2012-06-22 16:30:09 -07002707 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002708 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002709 self.addCleanup(support.unlink, fn)
2710 create_file(fn)
2711
Benjamin Peterson799bd802011-08-31 22:15:17 -04002712 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002713 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002714 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002715
Victor Stinnerf12e5062011-10-16 22:12:03 +02002716 init_xattr = listxattr(fn)
2717 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002718
Larry Hastings9cf065c2012-06-22 16:30:09 -07002719 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002720 xattr = set(init_xattr)
2721 xattr.add("user.test")
2722 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002723 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2724 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2725 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002726
Benjamin Peterson799bd802011-08-31 22:15:17 -04002727 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002728 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002729 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002730
Benjamin Peterson799bd802011-08-31 22:15:17 -04002731 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002732 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002733 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002734
Larry Hastings9cf065c2012-06-22 16:30:09 -07002735 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002736 xattr.add("user.test2")
2737 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002738 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002739
Benjamin Peterson799bd802011-08-31 22:15:17 -04002740 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002741 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002742 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002743
Victor Stinnerf12e5062011-10-16 22:12:03 +02002744 xattr.remove("user.test")
2745 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002746 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2747 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2748 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2749 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002750 many = sorted("user.test{}".format(i) for i in range(100))
2751 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002752 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002753 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002754
Larry Hastings9cf065c2012-06-22 16:30:09 -07002755 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002756 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002757 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002758
2759 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2760 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002761
2762 def test_simple(self):
2763 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2764 os.listxattr)
2765
2766 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002767 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2768 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002769
2770 def test_fds(self):
2771 def getxattr(path, *args):
2772 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002773 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002774 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002775 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002776 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002777 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002778 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002779 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002780 def listxattr(path, *args):
2781 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002782 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002783 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2784
2785
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002786@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2787class TermsizeTests(unittest.TestCase):
2788 def test_does_not_crash(self):
2789 """Check if get_terminal_size() returns a meaningful value.
2790
2791 There's no easy portable way to actually check the size of the
2792 terminal, so let's check if it returns something sensible instead.
2793 """
2794 try:
2795 size = os.get_terminal_size()
2796 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002797 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002798 # Under win32 a generic OSError can be thrown if the
2799 # handle cannot be retrieved
2800 self.skipTest("failed to query terminal size")
2801 raise
2802
Antoine Pitroucfade362012-02-08 23:48:59 +01002803 self.assertGreaterEqual(size.columns, 0)
2804 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002805
2806 def test_stty_match(self):
2807 """Check if stty returns the same results
2808
2809 stty actually tests stdin, so get_terminal_size is invoked on
2810 stdin explicitly. If stty succeeded, then get_terminal_size()
2811 should work too.
2812 """
2813 try:
2814 size = subprocess.check_output(['stty', 'size']).decode().split()
2815 except (FileNotFoundError, subprocess.CalledProcessError):
2816 self.skipTest("stty invocation failed")
2817 expected = (int(size[1]), int(size[0])) # reversed order
2818
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002819 try:
2820 actual = os.get_terminal_size(sys.__stdin__.fileno())
2821 except OSError as e:
2822 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2823 # Under win32 a generic OSError can be thrown if the
2824 # handle cannot be retrieved
2825 self.skipTest("failed to query terminal size")
2826 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002827 self.assertEqual(expected, actual)
2828
2829
Victor Stinner292c8352012-10-30 02:17:38 +01002830class OSErrorTests(unittest.TestCase):
2831 def setUp(self):
2832 class Str(str):
2833 pass
2834
Victor Stinnerafe17062012-10-31 22:47:43 +01002835 self.bytes_filenames = []
2836 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002837 if support.TESTFN_UNENCODABLE is not None:
2838 decoded = support.TESTFN_UNENCODABLE
2839 else:
2840 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002841 self.unicode_filenames.append(decoded)
2842 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002843 if support.TESTFN_UNDECODABLE is not None:
2844 encoded = support.TESTFN_UNDECODABLE
2845 else:
2846 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002847 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002848 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002849 self.bytes_filenames.append(memoryview(encoded))
2850
2851 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002852
2853 def test_oserror_filename(self):
2854 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002855 (self.filenames, os.chdir,),
2856 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002857 (self.filenames, os.lstat,),
2858 (self.filenames, os.open, os.O_RDONLY),
2859 (self.filenames, os.rmdir,),
2860 (self.filenames, os.stat,),
2861 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002862 ]
2863 if sys.platform == "win32":
2864 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002865 (self.bytes_filenames, os.rename, b"dst"),
2866 (self.bytes_filenames, os.replace, b"dst"),
2867 (self.unicode_filenames, os.rename, "dst"),
2868 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002869 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002870 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002871 else:
2872 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002873 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002874 (self.filenames, os.rename, "dst"),
2875 (self.filenames, os.replace, "dst"),
2876 ))
2877 if hasattr(os, "chown"):
2878 funcs.append((self.filenames, os.chown, 0, 0))
2879 if hasattr(os, "lchown"):
2880 funcs.append((self.filenames, os.lchown, 0, 0))
2881 if hasattr(os, "truncate"):
2882 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002883 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002884 funcs.append((self.filenames, os.chflags, 0))
2885 if hasattr(os, "lchflags"):
2886 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002887 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002888 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002889 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002890 if sys.platform == "win32":
2891 funcs.append((self.bytes_filenames, os.link, b"dst"))
2892 funcs.append((self.unicode_filenames, os.link, "dst"))
2893 else:
2894 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002895 if hasattr(os, "listxattr"):
2896 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002897 (self.filenames, os.listxattr,),
2898 (self.filenames, os.getxattr, "user.test"),
2899 (self.filenames, os.setxattr, "user.test", b'user'),
2900 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002901 ))
2902 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002903 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002904 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002905 if sys.platform == "win32":
2906 funcs.append((self.unicode_filenames, os.readlink,))
2907 else:
2908 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002909
Steve Dowercc16be82016-09-08 10:35:16 -07002910
Victor Stinnerafe17062012-10-31 22:47:43 +01002911 for filenames, func, *func_args in funcs:
2912 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002913 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002914 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002915 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002916 else:
2917 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2918 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002919 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002920 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002921 except UnicodeDecodeError:
2922 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002923 else:
2924 self.fail("No exception thrown by {}".format(func))
2925
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002926class CPUCountTests(unittest.TestCase):
2927 def test_cpu_count(self):
2928 cpus = os.cpu_count()
2929 if cpus is not None:
2930 self.assertIsInstance(cpus, int)
2931 self.assertGreater(cpus, 0)
2932 else:
2933 self.skipTest("Could not determine the number of CPUs")
2934
Victor Stinnerdaf45552013-08-28 00:53:59 +02002935
2936class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002937 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002938 fd = os.open(__file__, os.O_RDONLY)
2939 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002940 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002941
Victor Stinnerdaf45552013-08-28 00:53:59 +02002942 os.set_inheritable(fd, True)
2943 self.assertEqual(os.get_inheritable(fd), True)
2944
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002945 @unittest.skipIf(fcntl is None, "need fcntl")
2946 def test_get_inheritable_cloexec(self):
2947 fd = os.open(__file__, os.O_RDONLY)
2948 self.addCleanup(os.close, fd)
2949 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002950
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002951 # clear FD_CLOEXEC flag
2952 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2953 flags &= ~fcntl.FD_CLOEXEC
2954 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002955
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002956 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002957
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002958 @unittest.skipIf(fcntl is None, "need fcntl")
2959 def test_set_inheritable_cloexec(self):
2960 fd = os.open(__file__, os.O_RDONLY)
2961 self.addCleanup(os.close, fd)
2962 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2963 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002964
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002965 os.set_inheritable(fd, True)
2966 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2967 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002968
Victor Stinnerdaf45552013-08-28 00:53:59 +02002969 def test_open(self):
2970 fd = os.open(__file__, os.O_RDONLY)
2971 self.addCleanup(os.close, fd)
2972 self.assertEqual(os.get_inheritable(fd), False)
2973
2974 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2975 def test_pipe(self):
2976 rfd, wfd = os.pipe()
2977 self.addCleanup(os.close, rfd)
2978 self.addCleanup(os.close, wfd)
2979 self.assertEqual(os.get_inheritable(rfd), False)
2980 self.assertEqual(os.get_inheritable(wfd), False)
2981
2982 def test_dup(self):
2983 fd1 = os.open(__file__, os.O_RDONLY)
2984 self.addCleanup(os.close, fd1)
2985
2986 fd2 = os.dup(fd1)
2987 self.addCleanup(os.close, fd2)
2988 self.assertEqual(os.get_inheritable(fd2), False)
2989
2990 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2991 def test_dup2(self):
2992 fd = os.open(__file__, os.O_RDONLY)
2993 self.addCleanup(os.close, fd)
2994
2995 # inheritable by default
2996 fd2 = os.open(__file__, os.O_RDONLY)
2997 try:
2998 os.dup2(fd, fd2)
2999 self.assertEqual(os.get_inheritable(fd2), True)
3000 finally:
3001 os.close(fd2)
3002
3003 # force non-inheritable
3004 fd3 = os.open(__file__, os.O_RDONLY)
3005 try:
3006 os.dup2(fd, fd3, inheritable=False)
3007 self.assertEqual(os.get_inheritable(fd3), False)
3008 finally:
3009 os.close(fd3)
3010
3011 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3012 def test_openpty(self):
3013 master_fd, slave_fd = os.openpty()
3014 self.addCleanup(os.close, master_fd)
3015 self.addCleanup(os.close, slave_fd)
3016 self.assertEqual(os.get_inheritable(master_fd), False)
3017 self.assertEqual(os.get_inheritable(slave_fd), False)
3018
3019
Brett Cannon3f9183b2016-08-26 14:44:48 -07003020class PathTConverterTests(unittest.TestCase):
3021 # tuples of (function name, allows fd arguments, additional arguments to
3022 # function, cleanup function)
3023 functions = [
3024 ('stat', True, (), None),
3025 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003026 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003027 ('chflags', False, (0,), None),
3028 ('lchflags', False, (0,), None),
3029 ('open', False, (0,), getattr(os, 'close', None)),
3030 ]
3031
3032 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003033 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003034 if os.name == 'nt':
3035 bytes_fspath = bytes_filename = None
3036 else:
3037 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003038 bytes_fspath = _PathLike(bytes_filename)
3039 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003040 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003041 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003042
Brett Cannonec6ce872016-09-06 15:50:29 -07003043 int_fspath = _PathLike(fd)
3044 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003045
3046 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3047 with self.subTest(name=name):
3048 try:
3049 fn = getattr(os, name)
3050 except AttributeError:
3051 continue
3052
Brett Cannon8f96a302016-08-26 19:30:11 -07003053 for path in (str_filename, bytes_filename, str_fspath,
3054 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003055 if path is None:
3056 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003057 with self.subTest(name=name, path=path):
3058 result = fn(path, *extra_args)
3059 if cleanup_fn is not None:
3060 cleanup_fn(result)
3061
3062 with self.assertRaisesRegex(
3063 TypeError, 'should be string, bytes'):
3064 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003065
3066 if allow_fd:
3067 result = fn(fd, *extra_args) # should not fail
3068 if cleanup_fn is not None:
3069 cleanup_fn(result)
3070 else:
3071 with self.assertRaisesRegex(
3072 TypeError,
3073 'os.PathLike'):
3074 fn(fd, *extra_args)
3075
3076
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003077@unittest.skipUnless(hasattr(os, 'get_blocking'),
3078 'needs os.get_blocking() and os.set_blocking()')
3079class BlockingTests(unittest.TestCase):
3080 def test_blocking(self):
3081 fd = os.open(__file__, os.O_RDONLY)
3082 self.addCleanup(os.close, fd)
3083 self.assertEqual(os.get_blocking(fd), True)
3084
3085 os.set_blocking(fd, False)
3086 self.assertEqual(os.get_blocking(fd), False)
3087
3088 os.set_blocking(fd, True)
3089 self.assertEqual(os.get_blocking(fd), True)
3090
3091
Yury Selivanov97e2e062014-09-26 12:33:06 -04003092
3093class ExportsTests(unittest.TestCase):
3094 def test_os_all(self):
3095 self.assertIn('open', os.__all__)
3096 self.assertIn('walk', os.__all__)
3097
3098
Victor Stinner6036e442015-03-08 01:58:04 +01003099class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003100 check_no_resource_warning = support.check_no_resource_warning
3101
Victor Stinner6036e442015-03-08 01:58:04 +01003102 def setUp(self):
3103 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003104 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003105 self.addCleanup(support.rmtree, self.path)
3106 os.mkdir(self.path)
3107
3108 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003109 path = self.bytes_path if isinstance(name, bytes) else self.path
3110 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003111 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003112 return filename
3113
3114 def get_entries(self, names):
3115 entries = dict((entry.name, entry)
3116 for entry in os.scandir(self.path))
3117 self.assertEqual(sorted(entries.keys()), names)
3118 return entries
3119
3120 def assert_stat_equal(self, stat1, stat2, skip_fields):
3121 if skip_fields:
3122 for attr in dir(stat1):
3123 if not attr.startswith("st_"):
3124 continue
3125 if attr in ("st_dev", "st_ino", "st_nlink"):
3126 continue
3127 self.assertEqual(getattr(stat1, attr),
3128 getattr(stat2, attr),
3129 (stat1, stat2, attr))
3130 else:
3131 self.assertEqual(stat1, stat2)
3132
3133 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003134 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003135 self.assertEqual(entry.name, name)
3136 self.assertEqual(entry.path, os.path.join(self.path, name))
3137 self.assertEqual(entry.inode(),
3138 os.stat(entry.path, follow_symlinks=False).st_ino)
3139
3140 entry_stat = os.stat(entry.path)
3141 self.assertEqual(entry.is_dir(),
3142 stat.S_ISDIR(entry_stat.st_mode))
3143 self.assertEqual(entry.is_file(),
3144 stat.S_ISREG(entry_stat.st_mode))
3145 self.assertEqual(entry.is_symlink(),
3146 os.path.islink(entry.path))
3147
3148 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3149 self.assertEqual(entry.is_dir(follow_symlinks=False),
3150 stat.S_ISDIR(entry_lstat.st_mode))
3151 self.assertEqual(entry.is_file(follow_symlinks=False),
3152 stat.S_ISREG(entry_lstat.st_mode))
3153
3154 self.assert_stat_equal(entry.stat(),
3155 entry_stat,
3156 os.name == 'nt' and not is_symlink)
3157 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3158 entry_lstat,
3159 os.name == 'nt')
3160
3161 def test_attributes(self):
3162 link = hasattr(os, 'link')
3163 symlink = support.can_symlink()
3164
3165 dirname = os.path.join(self.path, "dir")
3166 os.mkdir(dirname)
3167 filename = self.create_file("file.txt")
3168 if link:
3169 os.link(filename, os.path.join(self.path, "link_file.txt"))
3170 if symlink:
3171 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3172 target_is_directory=True)
3173 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3174
3175 names = ['dir', 'file.txt']
3176 if link:
3177 names.append('link_file.txt')
3178 if symlink:
3179 names.extend(('symlink_dir', 'symlink_file.txt'))
3180 entries = self.get_entries(names)
3181
3182 entry = entries['dir']
3183 self.check_entry(entry, 'dir', True, False, False)
3184
3185 entry = entries['file.txt']
3186 self.check_entry(entry, 'file.txt', False, True, False)
3187
3188 if link:
3189 entry = entries['link_file.txt']
3190 self.check_entry(entry, 'link_file.txt', False, True, False)
3191
3192 if symlink:
3193 entry = entries['symlink_dir']
3194 self.check_entry(entry, 'symlink_dir', True, False, True)
3195
3196 entry = entries['symlink_file.txt']
3197 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3198
3199 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003200 path = self.bytes_path if isinstance(name, bytes) else self.path
3201 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003202 self.assertEqual(len(entries), 1)
3203
3204 entry = entries[0]
3205 self.assertEqual(entry.name, name)
3206 return entry
3207
Brett Cannon96881cd2016-06-10 14:37:21 -07003208 def create_file_entry(self, name='file.txt'):
3209 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003210 return self.get_entry(os.path.basename(filename))
3211
3212 def test_current_directory(self):
3213 filename = self.create_file()
3214 old_dir = os.getcwd()
3215 try:
3216 os.chdir(self.path)
3217
3218 # call scandir() without parameter: it must list the content
3219 # of the current directory
3220 entries = dict((entry.name, entry) for entry in os.scandir())
3221 self.assertEqual(sorted(entries.keys()),
3222 [os.path.basename(filename)])
3223 finally:
3224 os.chdir(old_dir)
3225
3226 def test_repr(self):
3227 entry = self.create_file_entry()
3228 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3229
Brett Cannon96881cd2016-06-10 14:37:21 -07003230 def test_fspath_protocol(self):
3231 entry = self.create_file_entry()
3232 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3233
3234 def test_fspath_protocol_bytes(self):
3235 bytes_filename = os.fsencode('bytesfile.txt')
3236 bytes_entry = self.create_file_entry(name=bytes_filename)
3237 fspath = os.fspath(bytes_entry)
3238 self.assertIsInstance(fspath, bytes)
3239 self.assertEqual(fspath,
3240 os.path.join(os.fsencode(self.path),bytes_filename))
3241
Victor Stinner6036e442015-03-08 01:58:04 +01003242 def test_removed_dir(self):
3243 path = os.path.join(self.path, 'dir')
3244
3245 os.mkdir(path)
3246 entry = self.get_entry('dir')
3247 os.rmdir(path)
3248
3249 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3250 if os.name == 'nt':
3251 self.assertTrue(entry.is_dir())
3252 self.assertFalse(entry.is_file())
3253 self.assertFalse(entry.is_symlink())
3254 if os.name == 'nt':
3255 self.assertRaises(FileNotFoundError, entry.inode)
3256 # don't fail
3257 entry.stat()
3258 entry.stat(follow_symlinks=False)
3259 else:
3260 self.assertGreater(entry.inode(), 0)
3261 self.assertRaises(FileNotFoundError, entry.stat)
3262 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3263
3264 def test_removed_file(self):
3265 entry = self.create_file_entry()
3266 os.unlink(entry.path)
3267
3268 self.assertFalse(entry.is_dir())
3269 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3270 if os.name == 'nt':
3271 self.assertTrue(entry.is_file())
3272 self.assertFalse(entry.is_symlink())
3273 if os.name == 'nt':
3274 self.assertRaises(FileNotFoundError, entry.inode)
3275 # don't fail
3276 entry.stat()
3277 entry.stat(follow_symlinks=False)
3278 else:
3279 self.assertGreater(entry.inode(), 0)
3280 self.assertRaises(FileNotFoundError, entry.stat)
3281 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3282
3283 def test_broken_symlink(self):
3284 if not support.can_symlink():
3285 return self.skipTest('cannot create symbolic link')
3286
3287 filename = self.create_file("file.txt")
3288 os.symlink(filename,
3289 os.path.join(self.path, "symlink.txt"))
3290 entries = self.get_entries(['file.txt', 'symlink.txt'])
3291 entry = entries['symlink.txt']
3292 os.unlink(filename)
3293
3294 self.assertGreater(entry.inode(), 0)
3295 self.assertFalse(entry.is_dir())
3296 self.assertFalse(entry.is_file()) # broken symlink returns False
3297 self.assertFalse(entry.is_dir(follow_symlinks=False))
3298 self.assertFalse(entry.is_file(follow_symlinks=False))
3299 self.assertTrue(entry.is_symlink())
3300 self.assertRaises(FileNotFoundError, entry.stat)
3301 # don't fail
3302 entry.stat(follow_symlinks=False)
3303
3304 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003305 self.create_file("file.txt")
3306
3307 path_bytes = os.fsencode(self.path)
3308 entries = list(os.scandir(path_bytes))
3309 self.assertEqual(len(entries), 1, entries)
3310 entry = entries[0]
3311
3312 self.assertEqual(entry.name, b'file.txt')
3313 self.assertEqual(entry.path,
3314 os.fsencode(os.path.join(self.path, 'file.txt')))
3315
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003316 @unittest.skipUnless(os.listdir in os.supports_fd,
3317 'fd support for listdir required for this test.')
3318 def test_fd(self):
3319 self.assertIn(os.scandir, os.supports_fd)
3320 self.create_file('file.txt')
3321 expected_names = ['file.txt']
3322 if support.can_symlink():
3323 os.symlink('file.txt', os.path.join(self.path, 'link'))
3324 expected_names.append('link')
3325
3326 fd = os.open(self.path, os.O_RDONLY)
3327 try:
3328 with os.scandir(fd) as it:
3329 entries = list(it)
3330 names = [entry.name for entry in entries]
3331 self.assertEqual(sorted(names), expected_names)
3332 self.assertEqual(names, os.listdir(fd))
3333 for entry in entries:
3334 self.assertEqual(entry.path, entry.name)
3335 self.assertEqual(os.fspath(entry), entry.name)
3336 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3337 if os.stat in os.supports_dir_fd:
3338 st = os.stat(entry.name, dir_fd=fd)
3339 self.assertEqual(entry.stat(), st)
3340 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3341 self.assertEqual(entry.stat(follow_symlinks=False), st)
3342 finally:
3343 os.close(fd)
3344
Victor Stinner6036e442015-03-08 01:58:04 +01003345 def test_empty_path(self):
3346 self.assertRaises(FileNotFoundError, os.scandir, '')
3347
3348 def test_consume_iterator_twice(self):
3349 self.create_file("file.txt")
3350 iterator = os.scandir(self.path)
3351
3352 entries = list(iterator)
3353 self.assertEqual(len(entries), 1, entries)
3354
3355 # check than consuming the iterator twice doesn't raise exception
3356 entries2 = list(iterator)
3357 self.assertEqual(len(entries2), 0, entries2)
3358
3359 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003360 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003361 self.assertRaises(TypeError, os.scandir, obj)
3362
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003363 def test_close(self):
3364 self.create_file("file.txt")
3365 self.create_file("file2.txt")
3366 iterator = os.scandir(self.path)
3367 next(iterator)
3368 iterator.close()
3369 # multiple closes
3370 iterator.close()
3371 with self.check_no_resource_warning():
3372 del iterator
3373
3374 def test_context_manager(self):
3375 self.create_file("file.txt")
3376 self.create_file("file2.txt")
3377 with os.scandir(self.path) as iterator:
3378 next(iterator)
3379 with self.check_no_resource_warning():
3380 del iterator
3381
3382 def test_context_manager_close(self):
3383 self.create_file("file.txt")
3384 self.create_file("file2.txt")
3385 with os.scandir(self.path) as iterator:
3386 next(iterator)
3387 iterator.close()
3388
3389 def test_context_manager_exception(self):
3390 self.create_file("file.txt")
3391 self.create_file("file2.txt")
3392 with self.assertRaises(ZeroDivisionError):
3393 with os.scandir(self.path) as iterator:
3394 next(iterator)
3395 1/0
3396 with self.check_no_resource_warning():
3397 del iterator
3398
3399 def test_resource_warning(self):
3400 self.create_file("file.txt")
3401 self.create_file("file2.txt")
3402 iterator = os.scandir(self.path)
3403 next(iterator)
3404 with self.assertWarns(ResourceWarning):
3405 del iterator
3406 support.gc_collect()
3407 # exhausted iterator
3408 iterator = os.scandir(self.path)
3409 list(iterator)
3410 with self.check_no_resource_warning():
3411 del iterator
3412
Victor Stinner6036e442015-03-08 01:58:04 +01003413
Ethan Furmancdc08792016-06-02 15:06:09 -07003414class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003415
3416 # Abstracted so it can be overridden to test pure Python implementation
3417 # if a C version is provided.
3418 fspath = staticmethod(os.fspath)
3419
Ethan Furmancdc08792016-06-02 15:06:09 -07003420 def test_return_bytes(self):
3421 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003422 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003423
3424 def test_return_string(self):
3425 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003426 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003427
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003428 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003429 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003430 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003431
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003432 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003433 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3434 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3435
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003436 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003437 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3438 self.assertTrue(issubclass(_PathLike, os.PathLike))
3439 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003440
Ethan Furmancdc08792016-06-02 15:06:09 -07003441 def test_garbage_in_exception_out(self):
3442 vapor = type('blah', (), {})
3443 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003444 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003445
3446 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003447 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003448
Brett Cannon044283a2016-07-15 10:41:49 -07003449 def test_bad_pathlike(self):
3450 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003451 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003452 # __fspath__ attribute that is not callable.
3453 c = type('foo', (), {})
3454 c.__fspath__ = 1
3455 self.assertRaises(TypeError, self.fspath, c())
3456 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003457 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003458 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003459
3460# Only test if the C version is provided, otherwise TestPEP519 already tested
3461# the pure Python implementation.
3462if hasattr(os, "_fspath"):
3463 class TestPEP519PurePython(TestPEP519):
3464
3465 """Explicitly test the pure Python implementation of os.fspath()."""
3466
3467 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003468
3469
Fred Drake2e2be372001-09-20 21:33:42 +00003470if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003471 unittest.main()