blob: bc1b0eeeac9004abe48372c1bcf737a0dd85e807 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020025import time
26import unittest
27import uuid
28import warnings
29from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import grp
48 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
49 if hasattr(os, 'getgid'):
50 process_gid = os.getgid()
51 if process_gid not in groups:
52 groups.append(process_gid)
53except ImportError:
54 groups = []
55try:
56 import pwd
57 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010058except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050059 all_users = []
60try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020062except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020066from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000067
Victor Stinner923590e2016-03-24 09:11:48 +010068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Victor Stinner923590e2016-03-24 09:11:48 +010085
86@contextlib.contextmanager
87def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
Berker Peksag4af23d72016-09-15 20:32:44 +030092def requires_os_func(name):
93 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
94
95
Brett Cannonec6ce872016-09-06 15:50:29 -070096class _PathLike(os.PathLike):
97
98 def __init__(self, path=""):
99 self.path = path
100
101 def __str__(self):
102 return str(self.path)
103
104 def __fspath__(self):
105 if isinstance(self.path, BaseException):
106 raise self.path
107 else:
108 return self.path
109
110
Victor Stinnerae39d232016-03-24 17:12:55 +0100111def create_file(filename, content=b'content'):
112 with open(filename, "xb", 0) as fp:
113 fp.write(content)
114
115
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116# Tests creating TESTFN
117class FileTests(unittest.TestCase):
118 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000119 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000120 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121 tearDown = setUp
122
123 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Christian Heimesfdab48e2008-01-20 09:06:41 +0000128 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000129 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
130 # We must allocate two consecutive file descriptors, otherwise
131 # it will mess up other file descriptors (perhaps even the three
132 # standard ones).
133 second = os.dup(first)
134 try:
135 retries = 0
136 while second != first + 1:
137 os.close(first)
138 retries += 1
139 if retries > 10:
140 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000141 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000142 first, second = second, os.dup(second)
143 finally:
144 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000145 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000146 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000147 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000148
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000149 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000150 def test_rename(self):
151 path = support.TESTFN
152 old = sys.getrefcount(path)
153 self.assertRaises(TypeError, os.rename, path, 0)
154 new = sys.getrefcount(path)
155 self.assertEqual(old, new)
156
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157 def test_read(self):
158 with open(support.TESTFN, "w+b") as fobj:
159 fobj.write(b"spam")
160 fobj.flush()
161 fd = fobj.fileno()
162 os.lseek(fd, 0, 0)
163 s = os.read(fd, 4)
164 self.assertEqual(type(s), bytes)
165 self.assertEqual(s, b"spam")
166
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200167 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200168 # Skip the test on 32-bit platforms: the number of bytes must fit in a
169 # Py_ssize_t type
170 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
171 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200172 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
173 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200174 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100175 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200176
177 # Issue #21932: Make sure that os.read() does not raise an
178 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200179 with open(support.TESTFN, "rb") as fp:
180 data = os.read(fp.fileno(), size)
181
182 # The test does not try to read more than 2 GB at once because the
183 # operating system is free to return less bytes than requested.
184 self.assertEqual(data, b'test')
185
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000186 def test_write(self):
187 # os.write() accepts bytes- and buffer-like objects but not strings
188 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
189 self.assertRaises(TypeError, os.write, fd, "beans")
190 os.write(fd, b"bacon\n")
191 os.write(fd, bytearray(b"eggs\n"))
192 os.write(fd, memoryview(b"spam\n"))
193 os.close(fd)
194 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000195 self.assertEqual(fobj.read().splitlines(),
196 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000197
Victor Stinnere0daff12011-03-20 23:36:35 +0100198 def write_windows_console(self, *args):
199 retcode = subprocess.call(args,
200 # use a new console to not flood the test output
201 creationflags=subprocess.CREATE_NEW_CONSOLE,
202 # use a shell to hide the console window (SW_HIDE)
203 shell=True)
204 self.assertEqual(retcode, 0)
205
206 @unittest.skipUnless(sys.platform == 'win32',
207 'test specific to the Windows console')
208 def test_write_windows_console(self):
209 # Issue #11395: the Windows console returns an error (12: not enough
210 # space error) on writing into stdout if stdout mode is binary and the
211 # length is greater than 66,000 bytes (or less, depending on heap
212 # usage).
213 code = "print('x' * 100000)"
214 self.write_windows_console(sys.executable, "-c", code)
215 self.write_windows_console(sys.executable, "-u", "-c", code)
216
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000217 def fdopen_helper(self, *args):
218 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200219 f = os.fdopen(fd, *args)
220 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000221
222 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200223 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
224 os.close(fd)
225
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000226 self.fdopen_helper()
227 self.fdopen_helper('r')
228 self.fdopen_helper('r', 100)
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 def test_replace(self):
231 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100232 self.addCleanup(support.unlink, support.TESTFN)
233 self.addCleanup(support.unlink, TESTFN2)
234
235 create_file(support.TESTFN, b"1")
236 create_file(TESTFN2, b"2")
237
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100238 os.replace(support.TESTFN, TESTFN2)
239 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
240 with open(TESTFN2, 'r') as f:
241 self.assertEqual(f.read(), "1")
242
Martin Panterbf19d162015-09-09 01:01:13 +0000243 def test_open_keywords(self):
244 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
245 dir_fd=None)
246 os.close(f)
247
248 def test_symlink_keywords(self):
249 symlink = support.get_attribute(os, "symlink")
250 try:
251 symlink(src='target', dst=support.TESTFN,
252 target_is_directory=False, dir_fd=None)
253 except (NotImplementedError, OSError):
254 pass # No OS support or unprivileged user
255
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257# Test attributes on return values from os.*stat* family.
258class StatAttributeTests(unittest.TestCase):
259 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200260 self.fname = support.TESTFN
261 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100262 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Serhiy Storchaka43767632013-11-03 21:31:38 +0200264 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000265 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result[stat.ST_SIZE], 3)
270 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 # Make sure all the attributes are there
273 members = dir(result)
274 for name in dir(stat):
275 if name[:3] == 'ST_':
276 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000277 if name.endswith("TIME"):
278 def trunc(x): return int(x)
279 else:
280 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000281 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Larry Hastings6fe20b32012-04-19 15:07:49 -0700285 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700286 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700287 for name in 'st_atime st_mtime st_ctime'.split():
288 floaty = int(getattr(result, name) * 100000)
289 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700290 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700291
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except IndexError:
296 pass
297
298 # Make sure that assignment fails
299 try:
300 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000302 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 pass
304
305 try:
306 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000308 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309 pass
310
311 try:
312 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200313 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000314 except AttributeError:
315 pass
316
317 # Use the stat_result constructor with a too-short tuple.
318 try:
319 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200320 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 except TypeError:
322 pass
323
Ezio Melotti42da6632011-03-15 05:18:48 +0200324 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 try:
326 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
327 except TypeError:
328 pass
329
Antoine Pitrou38425292010-09-21 18:19:07 +0000330 def test_stat_attributes(self):
331 self.check_stat_attributes(self.fname)
332
333 def test_stat_attributes_bytes(self):
334 try:
335 fname = self.fname.encode(sys.getfilesystemencoding())
336 except UnicodeEncodeError:
337 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700338 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000339
Christian Heimes25827622013-10-12 01:27:08 +0200340 def test_stat_result_pickle(self):
341 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200342 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
343 p = pickle.dumps(result, proto)
344 self.assertIn(b'stat_result', p)
345 if proto < 4:
346 self.assertIn(b'cos\nstat_result\n', p)
347 unpickled = pickle.loads(p)
348 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200349
Serhiy Storchaka43767632013-11-03 21:31:38 +0200350 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000352 try:
353 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000354 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000355 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200357 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000358
359 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000362 # Make sure all the attributes are there.
363 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
364 'ffree', 'favail', 'flag', 'namemax')
365 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000366 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000367
368 # Make sure that assignment really fails
369 try:
370 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000372 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000373 pass
374
375 try:
376 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except AttributeError:
379 pass
380
381 # Use the constructor with a too-short tuple.
382 try:
383 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200384 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385 except TypeError:
386 pass
387
Ezio Melotti42da6632011-03-15 05:18:48 +0200388 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 try:
390 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
391 except TypeError:
392 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000393
Christian Heimes25827622013-10-12 01:27:08 +0200394 @unittest.skipUnless(hasattr(os, 'statvfs'),
395 "need os.statvfs()")
396 def test_statvfs_result_pickle(self):
397 try:
398 result = os.statvfs(self.fname)
399 except OSError as e:
400 # On AtheOS, glibc always returns ENOSYS
401 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200402 self.skipTest('os.statvfs() failed with ENOSYS')
403
Serhiy Storchakabad12572014-12-15 14:03:42 +0200404 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405 p = pickle.dumps(result, proto)
406 self.assertIn(b'statvfs_result', p)
407 if proto < 4:
408 self.assertIn(b'cos\nstatvfs_result\n', p)
409 unpickled = pickle.loads(p)
410 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200411
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
413 def test_1686475(self):
414 # Verify that an open file can be stat'ed
415 try:
416 os.stat(r"c:\pagefile.sys")
417 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600418 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200419 except OSError as e:
420 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000421
Serhiy Storchaka43767632013-11-03 21:31:38 +0200422 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
423 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
424 def test_15261(self):
425 # Verify that stat'ing a closed fd does not cause crash
426 r, w = os.pipe()
427 try:
428 os.stat(r) # should not raise error
429 finally:
430 os.close(r)
431 os.close(w)
432 with self.assertRaises(OSError) as ctx:
433 os.stat(r)
434 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100435
Zachary Ware63f277b2014-06-19 09:46:37 -0500436 def check_file_attributes(self, result):
437 self.assertTrue(hasattr(result, 'st_file_attributes'))
438 self.assertTrue(isinstance(result.st_file_attributes, int))
439 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
440
441 @unittest.skipUnless(sys.platform == "win32",
442 "st_file_attributes is Win32 specific")
443 def test_file_attributes(self):
444 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
445 result = os.stat(self.fname)
446 self.check_file_attributes(result)
447 self.assertEqual(
448 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
449 0)
450
451 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200452 dirname = support.TESTFN + "dir"
453 os.mkdir(dirname)
454 self.addCleanup(os.rmdir, dirname)
455
456 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500457 self.check_file_attributes(result)
458 self.assertEqual(
459 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
460 stat.FILE_ATTRIBUTE_DIRECTORY)
461
Berker Peksag0b4dc482016-09-17 15:49:59 +0300462 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
463 def test_access_denied(self):
464 # Default to FindFirstFile WIN32_FIND_DATA when access is
465 # denied. See issue 28075.
466 # os.environ['TEMP'] should be located on a volume that
467 # supports file ACLs.
468 fname = os.path.join(os.environ['TEMP'], self.fname)
469 self.addCleanup(support.unlink, fname)
470 create_file(fname, b'ABC')
471 # Deny the right to [S]YNCHRONIZE on the file to
472 # force CreateFile to fail with ERROR_ACCESS_DENIED.
473 DETACHED_PROCESS = 8
474 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500475 # bpo-30584: Use security identifier *S-1-5-32-545 instead
476 # of localized "Users" to not depend on the locale.
477 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300478 creationflags=DETACHED_PROCESS
479 )
480 result = os.stat(fname)
481 self.assertNotEqual(result.st_size, 0)
482
Victor Stinner47aacc82015-06-12 17:26:23 +0200483
484class UtimeTests(unittest.TestCase):
485 def setUp(self):
486 self.dirname = support.TESTFN
487 self.fname = os.path.join(self.dirname, "f1")
488
489 self.addCleanup(support.rmtree, self.dirname)
490 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100491 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200492
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200493 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100494 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200495 os.stat_float_times(state)
496
Victor Stinner47aacc82015-06-12 17:26:23 +0200497 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100498 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200499 old_float_times = os.stat_float_times(-1)
500 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200501
502 os.stat_float_times(True)
503
504 def support_subsecond(self, filename):
505 # Heuristic to check if the filesystem supports timestamp with
506 # subsecond resolution: check if float and int timestamps are different
507 st = os.stat(filename)
508 return ((st.st_atime != st[7])
509 or (st.st_mtime != st[8])
510 or (st.st_ctime != st[9]))
511
512 def _test_utime(self, set_time, filename=None):
513 if not filename:
514 filename = self.fname
515
516 support_subsecond = self.support_subsecond(filename)
517 if support_subsecond:
518 # Timestamp with a resolution of 1 microsecond (10^-6).
519 #
520 # The resolution of the C internal function used by os.utime()
521 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
522 # test with a resolution of 1 ns requires more work:
523 # see the issue #15745.
524 atime_ns = 1002003000 # 1.002003 seconds
525 mtime_ns = 4005006000 # 4.005006 seconds
526 else:
527 # use a resolution of 1 second
528 atime_ns = 5 * 10**9
529 mtime_ns = 8 * 10**9
530
531 set_time(filename, (atime_ns, mtime_ns))
532 st = os.stat(filename)
533
534 if support_subsecond:
535 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
536 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
537 else:
538 self.assertEqual(st.st_atime, atime_ns * 1e-9)
539 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
540 self.assertEqual(st.st_atime_ns, atime_ns)
541 self.assertEqual(st.st_mtime_ns, mtime_ns)
542
543 def test_utime(self):
544 def set_time(filename, ns):
545 # test the ns keyword parameter
546 os.utime(filename, ns=ns)
547 self._test_utime(set_time)
548
549 @staticmethod
550 def ns_to_sec(ns):
551 # Convert a number of nanosecond (int) to a number of seconds (float).
552 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
553 # issue, os.utime() rounds towards minus infinity.
554 return (ns * 1e-9) + 0.5e-9
555
556 def test_utime_by_indexed(self):
557 # pass times as floating point seconds as the second indexed parameter
558 def set_time(filename, ns):
559 atime_ns, mtime_ns = ns
560 atime = self.ns_to_sec(atime_ns)
561 mtime = self.ns_to_sec(mtime_ns)
562 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
563 # or utime(time_t)
564 os.utime(filename, (atime, mtime))
565 self._test_utime(set_time)
566
567 def test_utime_by_times(self):
568 def set_time(filename, ns):
569 atime_ns, mtime_ns = ns
570 atime = self.ns_to_sec(atime_ns)
571 mtime = self.ns_to_sec(mtime_ns)
572 # test the times keyword parameter
573 os.utime(filename, times=(atime, mtime))
574 self._test_utime(set_time)
575
576 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
577 "follow_symlinks support for utime required "
578 "for this test.")
579 def test_utime_nofollow_symlinks(self):
580 def set_time(filename, ns):
581 # use follow_symlinks=False to test utimensat(timespec)
582 # or lutimes(timeval)
583 os.utime(filename, ns=ns, follow_symlinks=False)
584 self._test_utime(set_time)
585
586 @unittest.skipUnless(os.utime in os.supports_fd,
587 "fd support for utime required for this test.")
588 def test_utime_fd(self):
589 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100590 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200591 # use a file descriptor to test futimens(timespec)
592 # or futimes(timeval)
593 os.utime(fp.fileno(), ns=ns)
594 self._test_utime(set_time)
595
596 @unittest.skipUnless(os.utime in os.supports_dir_fd,
597 "dir_fd support for utime required for this test.")
598 def test_utime_dir_fd(self):
599 def set_time(filename, ns):
600 dirname, name = os.path.split(filename)
601 dirfd = os.open(dirname, os.O_RDONLY)
602 try:
603 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
604 os.utime(name, dir_fd=dirfd, ns=ns)
605 finally:
606 os.close(dirfd)
607 self._test_utime(set_time)
608
609 def test_utime_directory(self):
610 def set_time(filename, ns):
611 # test calling os.utime() on a directory
612 os.utime(filename, ns=ns)
613 self._test_utime(set_time, filename=self.dirname)
614
615 def _test_utime_current(self, set_time):
616 # Get the system clock
617 current = time.time()
618
619 # Call os.utime() to set the timestamp to the current system clock
620 set_time(self.fname)
621
622 if not self.support_subsecond(self.fname):
623 delta = 1.0
Victor Stinnerc94caca2017-06-13 23:48:27 +0200624 elif os.name == 'nt':
625 # On Windows, the usual resolution of time.time() is 15.6 ms.
626 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
627 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200628 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200629 delta = 0.010
Victor Stinner47aacc82015-06-12 17:26:23 +0200630 st = os.stat(self.fname)
631 msg = ("st_time=%r, current=%r, dt=%r"
632 % (st.st_mtime, current, st.st_mtime - current))
633 self.assertAlmostEqual(st.st_mtime, current,
634 delta=delta, msg=msg)
635
636 def test_utime_current(self):
637 def set_time(filename):
638 # Set to the current time in the new way
639 os.utime(self.fname)
640 self._test_utime_current(set_time)
641
642 def test_utime_current_old(self):
643 def set_time(filename):
644 # Set to the current time in the old explicit way.
645 os.utime(self.fname, None)
646 self._test_utime_current(set_time)
647
648 def get_file_system(self, path):
649 if sys.platform == 'win32':
650 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
651 import ctypes
652 kernel32 = ctypes.windll.kernel32
653 buf = ctypes.create_unicode_buffer("", 100)
654 ok = kernel32.GetVolumeInformationW(root, None, 0,
655 None, None, None,
656 buf, len(buf))
657 if ok:
658 return buf.value
659 # return None if the filesystem is unknown
660
661 def test_large_time(self):
662 # Many filesystems are limited to the year 2038. At least, the test
663 # pass with NTFS filesystem.
664 if self.get_file_system(self.dirname) != "NTFS":
665 self.skipTest("requires NTFS")
666
667 large = 5000000000 # some day in 2128
668 os.utime(self.fname, (large, large))
669 self.assertEqual(os.stat(self.fname).st_mtime, large)
670
671 def test_utime_invalid_arguments(self):
672 # seconds and nanoseconds parameters are mutually exclusive
673 with self.assertRaises(ValueError):
674 os.utime(self.fname, (5, 5), ns=(5, 5))
675
676
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000677from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000679class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000680 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000681 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000682
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000683 def setUp(self):
684 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000685 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000686 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000687 for key, value in self._reference().items():
688 os.environ[key] = value
689
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000690 def tearDown(self):
691 os.environ.clear()
692 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000693 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000694 os.environb.clear()
695 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000696
Christian Heimes90333392007-11-01 19:08:42 +0000697 def _reference(self):
698 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
699
700 def _empty_mapping(self):
701 os.environ.clear()
702 return os.environ
703
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000704 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200705 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
706 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000707 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000708 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300709 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200710 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300711 value = popen.read().strip()
712 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000713
Xavier de Gayed1415312016-07-22 12:15:29 +0200714 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
715 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000716 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200717 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
718 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300719 it = iter(popen)
720 self.assertEqual(next(it), "line1\n")
721 self.assertEqual(next(it), "line2\n")
722 self.assertEqual(next(it), "line3\n")
723 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000724
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000725 # Verify environ keys and values from the OS are of the
726 # correct str type.
727 def test_keyvalue_types(self):
728 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000729 self.assertEqual(type(key), str)
730 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000731
Christian Heimes90333392007-11-01 19:08:42 +0000732 def test_items(self):
733 for key, value in self._reference().items():
734 self.assertEqual(os.environ.get(key), value)
735
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000736 # Issue 7310
737 def test___repr__(self):
738 """Check that the repr() of os.environ looks like environ({...})."""
739 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000740 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
741 '{!r}: {!r}'.format(key, value)
742 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000743
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000744 def test_get_exec_path(self):
745 defpath_list = os.defpath.split(os.pathsep)
746 test_path = ['/monty', '/python', '', '/flying/circus']
747 test_env = {'PATH': os.pathsep.join(test_path)}
748
749 saved_environ = os.environ
750 try:
751 os.environ = dict(test_env)
752 # Test that defaulting to os.environ works.
753 self.assertSequenceEqual(test_path, os.get_exec_path())
754 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
755 finally:
756 os.environ = saved_environ
757
758 # No PATH environment variable
759 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
760 # Empty PATH environment variable
761 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
762 # Supplied PATH environment variable
763 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
764
Victor Stinnerb745a742010-05-18 17:17:23 +0000765 if os.supports_bytes_environ:
766 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000767 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000768 # ignore BytesWarning warning
769 with warnings.catch_warnings(record=True):
770 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000771 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000772 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000773 pass
774 else:
775 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000776
777 # bytes key and/or value
778 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
779 ['abc'])
780 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
781 ['abc'])
782 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
783 ['abc'])
784
785 @unittest.skipUnless(os.supports_bytes_environ,
786 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000787 def test_environb(self):
788 # os.environ -> os.environb
789 value = 'euro\u20ac'
790 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000791 value_bytes = value.encode(sys.getfilesystemencoding(),
792 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000793 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000794 msg = "U+20AC character is not encodable to %s" % (
795 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000796 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000797 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000798 self.assertEqual(os.environ['unicode'], value)
799 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800
801 # os.environb -> os.environ
802 value = b'\xff'
803 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000805 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000807
Charles-François Natali2966f102011-11-26 11:32:46 +0100808 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
809 # #13415).
810 @support.requires_freebsd_version(7)
811 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100812 def test_unset_error(self):
813 if sys.platform == "win32":
814 # an environment variable is limited to 32,767 characters
815 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100816 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100817 else:
818 # "=" is not allowed in a variable name
819 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100820 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100821
Victor Stinner6d101392013-04-14 16:35:04 +0200822 def test_key_type(self):
823 missing = 'missingkey'
824 self.assertNotIn(missing, os.environ)
825
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200827 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200828 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200829 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200830
Victor Stinner839e5ea2013-04-14 16:43:03 +0200831 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200832 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200833 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200834 self.assertTrue(cm.exception.__suppress_context__)
835
Victor Stinner6d101392013-04-14 16:35:04 +0200836
Tim Petersc4e09402003-04-25 07:11:48 +0000837class WalkTests(unittest.TestCase):
838 """Tests for os.walk()."""
839
Victor Stinner0561c532015-03-12 10:28:24 +0100840 # Wrapper to hide minor differences between os.walk and os.fwalk
841 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200842 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200843 if 'follow_symlinks' in kwargs:
844 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200845 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100846
Charles-François Natali7372b062012-02-05 15:15:38 +0100847 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100848 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100849 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000850
851 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000852 # TESTFN/
853 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000854 # tmp1
855 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000856 # tmp2
857 # SUB11/ no kids
858 # SUB2/ a file kid and a dirsymlink kid
859 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300860 # SUB21/ not readable
861 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000862 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200863 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300864 # broken_link2
865 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000866 # TEST2/
867 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100868 self.walk_path = join(support.TESTFN, "TEST1")
869 self.sub1_path = join(self.walk_path, "SUB1")
870 self.sub11_path = join(self.sub1_path, "SUB11")
871 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300872 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100873 tmp1_path = join(self.walk_path, "tmp1")
874 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000875 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300876 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100877 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000878 t2_path = join(support.TESTFN, "TEST2")
879 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200880 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300881 broken_link2_path = join(sub2_path, "broken_link2")
882 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000883
884 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100885 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000886 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300887 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000888 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100889
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300890 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100891 with open(path, "x") as f:
892 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000893
Victor Stinner0561c532015-03-12 10:28:24 +0100894 if support.can_symlink():
895 os.symlink(os.path.abspath(t2_path), self.link_path)
896 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300897 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
898 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300899 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300900 ["broken_link", "broken_link2", "broken_link3",
901 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100902 else:
903 self.sub2_tree = (sub2_path, [], ["tmp3"])
904
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300905 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300906 try:
907 os.listdir(sub21_path)
908 except PermissionError:
909 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
910 else:
911 os.chmod(sub21_path, stat.S_IRWXU)
912 os.unlink(tmp5_path)
913 os.rmdir(sub21_path)
914 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300915
Victor Stinner0561c532015-03-12 10:28:24 +0100916 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000917 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300918 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100919
Tim Petersc4e09402003-04-25 07:11:48 +0000920 self.assertEqual(len(all), 4)
921 # We can't know which order SUB1 and SUB2 will appear in.
922 # Not flipped: TESTFN, SUB1, SUB11, SUB2
923 # flipped: TESTFN, SUB2, SUB1, SUB11
924 flipped = all[0][1][0] != "SUB1"
925 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200926 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300927 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100928 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
929 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
930 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
931 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000932
Brett Cannon3f9183b2016-08-26 14:44:48 -0700933 def test_walk_prune(self, walk_path=None):
934 if walk_path is None:
935 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000936 # Prune the search.
937 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700938 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000939 all.append((root, dirs, files))
940 # Don't descend into SUB1.
941 if 'SUB1' in dirs:
942 # Note that this also mutates the dirs we appended to all!
943 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000944
Victor Stinner0561c532015-03-12 10:28:24 +0100945 self.assertEqual(len(all), 2)
946 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700947 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100948
949 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300950 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100951 self.assertEqual(all[1], self.sub2_tree)
952
Brett Cannon3f9183b2016-08-26 14:44:48 -0700953 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700954 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700955
Victor Stinner0561c532015-03-12 10:28:24 +0100956 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000957 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100958 all = list(self.walk(self.walk_path, topdown=False))
959
Victor Stinner53b0a412016-03-26 01:12:36 +0100960 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000961 # We can't know which order SUB1 and SUB2 will appear in.
962 # Not flipped: SUB11, SUB1, SUB2, TESTFN
963 # flipped: SUB2, SUB11, SUB1, TESTFN
964 flipped = all[3][1][0] != "SUB1"
965 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200966 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300967 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100968 self.assertEqual(all[3],
969 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
970 self.assertEqual(all[flipped],
971 (self.sub11_path, [], []))
972 self.assertEqual(all[flipped + 1],
973 (self.sub1_path, ["SUB11"], ["tmp2"]))
974 self.assertEqual(all[2 - 2 * flipped],
975 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000976
Victor Stinner0561c532015-03-12 10:28:24 +0100977 def test_walk_symlink(self):
978 if not support.can_symlink():
979 self.skipTest("need symlink support")
980
981 # Walk, following symlinks.
982 walk_it = self.walk(self.walk_path, follow_symlinks=True)
983 for root, dirs, files in walk_it:
984 if root == self.link_path:
985 self.assertEqual(dirs, [])
986 self.assertEqual(files, ["tmp4"])
987 break
988 else:
989 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000990
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200991 def test_walk_bad_dir(self):
992 # Walk top-down.
993 errors = []
994 walk_it = self.walk(self.walk_path, onerror=errors.append)
995 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300996 self.assertEqual(errors, [])
997 dir1 = 'SUB1'
998 path1 = os.path.join(root, dir1)
999 path1new = os.path.join(root, dir1 + '.new')
1000 os.rename(path1, path1new)
1001 try:
1002 roots = [r for r, d, f in walk_it]
1003 self.assertTrue(errors)
1004 self.assertNotIn(path1, roots)
1005 self.assertNotIn(path1new, roots)
1006 for dir2 in dirs:
1007 if dir2 != dir1:
1008 self.assertIn(os.path.join(root, dir2), roots)
1009 finally:
1010 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001011
Charles-François Natali7372b062012-02-05 15:15:38 +01001012
1013@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1014class FwalkTests(WalkTests):
1015 """Tests for os.fwalk()."""
1016
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001017 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001018 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001019 yield (root, dirs, files)
1020
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001021 def fwalk(self, *args, **kwargs):
1022 return os.fwalk(*args, **kwargs)
1023
Larry Hastingsc48fe982012-06-25 04:49:05 -07001024 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1025 """
1026 compare with walk() results.
1027 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001028 walk_kwargs = walk_kwargs.copy()
1029 fwalk_kwargs = fwalk_kwargs.copy()
1030 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1031 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1032 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001033
Charles-François Natali7372b062012-02-05 15:15:38 +01001034 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001035 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001036 expected[root] = (set(dirs), set(files))
1037
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001038 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001039 self.assertIn(root, expected)
1040 self.assertEqual(expected[root], (set(dirs), set(files)))
1041
Larry Hastingsc48fe982012-06-25 04:49:05 -07001042 def test_compare_to_walk(self):
1043 kwargs = {'top': support.TESTFN}
1044 self._compare_to_walk(kwargs, kwargs)
1045
Charles-François Natali7372b062012-02-05 15:15:38 +01001046 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001047 try:
1048 fd = os.open(".", os.O_RDONLY)
1049 walk_kwargs = {'top': support.TESTFN}
1050 fwalk_kwargs = walk_kwargs.copy()
1051 fwalk_kwargs['dir_fd'] = fd
1052 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1053 finally:
1054 os.close(fd)
1055
1056 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001057 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001058 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1059 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001060 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001061 # check that the FD is valid
1062 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001063 # redundant check
1064 os.stat(rootfd)
1065 # check that listdir() returns consistent information
1066 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001067
1068 def test_fd_leak(self):
1069 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1070 # we both check that calling fwalk() a large number of times doesn't
1071 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1072 minfd = os.dup(1)
1073 os.close(minfd)
1074 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001075 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001076 pass
1077 newfd = os.dup(1)
1078 self.addCleanup(os.close, newfd)
1079 self.assertEqual(newfd, minfd)
1080
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001081class BytesWalkTests(WalkTests):
1082 """Tests for os.walk() with bytes."""
1083 def walk(self, top, **kwargs):
1084 if 'follow_symlinks' in kwargs:
1085 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1086 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1087 root = os.fsdecode(broot)
1088 dirs = list(map(os.fsdecode, bdirs))
1089 files = list(map(os.fsdecode, bfiles))
1090 yield (root, dirs, files)
1091 bdirs[:] = list(map(os.fsencode, dirs))
1092 bfiles[:] = list(map(os.fsencode, files))
1093
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001094@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1095class BytesFwalkTests(FwalkTests):
1096 """Tests for os.walk() with bytes."""
1097 def fwalk(self, top='.', *args, **kwargs):
1098 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1099 root = os.fsdecode(broot)
1100 dirs = list(map(os.fsdecode, bdirs))
1101 files = list(map(os.fsdecode, bfiles))
1102 yield (root, dirs, files, topfd)
1103 bdirs[:] = list(map(os.fsencode, dirs))
1104 bfiles[:] = list(map(os.fsencode, files))
1105
Charles-François Natali7372b062012-02-05 15:15:38 +01001106
Guido van Rossume7ba4952007-06-06 23:52:48 +00001107class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001108 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001109 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001110
1111 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001112 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001113 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1114 os.makedirs(path) # Should work
1115 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1116 os.makedirs(path)
1117
1118 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001119 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001120 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1121 os.makedirs(path)
1122 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1123 'dir5', 'dir6')
1124 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001125
Serhiy Storchakae304e332017-03-24 13:27:42 +02001126 def test_mode(self):
1127 with support.temp_umask(0o002):
1128 base = support.TESTFN
1129 parent = os.path.join(base, 'dir1')
1130 path = os.path.join(parent, 'dir2')
1131 os.makedirs(path, 0o555)
1132 self.assertTrue(os.path.exists(path))
1133 self.assertTrue(os.path.isdir(path))
1134 if os.name != 'nt':
1135 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1136 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1137
Terry Reedy5a22b652010-12-02 07:05:56 +00001138 def test_exist_ok_existing_directory(self):
1139 path = os.path.join(support.TESTFN, 'dir1')
1140 mode = 0o777
1141 old_mask = os.umask(0o022)
1142 os.makedirs(path, mode)
1143 self.assertRaises(OSError, os.makedirs, path, mode)
1144 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001145 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001146 os.makedirs(path, mode=mode, exist_ok=True)
1147 os.umask(old_mask)
1148
Martin Pantera82642f2015-11-19 04:48:44 +00001149 # Issue #25583: A drive root could raise PermissionError on Windows
1150 os.makedirs(os.path.abspath('/'), exist_ok=True)
1151
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001152 def test_exist_ok_s_isgid_directory(self):
1153 path = os.path.join(support.TESTFN, 'dir1')
1154 S_ISGID = stat.S_ISGID
1155 mode = 0o777
1156 old_mask = os.umask(0o022)
1157 try:
1158 existing_testfn_mode = stat.S_IMODE(
1159 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001160 try:
1161 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001162 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001163 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001164 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1165 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1166 # The os should apply S_ISGID from the parent dir for us, but
1167 # this test need not depend on that behavior. Be explicit.
1168 os.makedirs(path, mode | S_ISGID)
1169 # http://bugs.python.org/issue14992
1170 # Should not fail when the bit is already set.
1171 os.makedirs(path, mode, exist_ok=True)
1172 # remove the bit.
1173 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001174 # May work even when the bit is not already set when demanded.
1175 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001176 finally:
1177 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001178
1179 def test_exist_ok_existing_regular_file(self):
1180 base = support.TESTFN
1181 path = os.path.join(support.TESTFN, 'dir1')
1182 f = open(path, 'w')
1183 f.write('abc')
1184 f.close()
1185 self.assertRaises(OSError, os.makedirs, path)
1186 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1187 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1188 os.remove(path)
1189
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001190 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001191 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001192 'dir4', 'dir5', 'dir6')
1193 # If the tests failed, the bottom-most directory ('../dir6')
1194 # may not have been created, so we look for the outermost directory
1195 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001196 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001197 path = os.path.dirname(path)
1198
1199 os.removedirs(path)
1200
Andrew Svetlov405faed2012-12-25 12:18:09 +02001201
R David Murrayf2ad1732014-12-25 18:36:56 -05001202@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1203class ChownFileTests(unittest.TestCase):
1204
Berker Peksag036a71b2015-07-21 09:29:48 +03001205 @classmethod
1206 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001207 os.mkdir(support.TESTFN)
1208
1209 def test_chown_uid_gid_arguments_must_be_index(self):
1210 stat = os.stat(support.TESTFN)
1211 uid = stat.st_uid
1212 gid = stat.st_gid
1213 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1214 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1215 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1216 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1217 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1218
1219 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1220 def test_chown(self):
1221 gid_1, gid_2 = groups[:2]
1222 uid = os.stat(support.TESTFN).st_uid
1223 os.chown(support.TESTFN, uid, gid_1)
1224 gid = os.stat(support.TESTFN).st_gid
1225 self.assertEqual(gid, gid_1)
1226 os.chown(support.TESTFN, uid, gid_2)
1227 gid = os.stat(support.TESTFN).st_gid
1228 self.assertEqual(gid, gid_2)
1229
1230 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1231 "test needs root privilege and more than one user")
1232 def test_chown_with_root(self):
1233 uid_1, uid_2 = all_users[:2]
1234 gid = os.stat(support.TESTFN).st_gid
1235 os.chown(support.TESTFN, uid_1, gid)
1236 uid = os.stat(support.TESTFN).st_uid
1237 self.assertEqual(uid, uid_1)
1238 os.chown(support.TESTFN, uid_2, gid)
1239 uid = os.stat(support.TESTFN).st_uid
1240 self.assertEqual(uid, uid_2)
1241
1242 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1243 "test needs non-root account and more than one user")
1244 def test_chown_without_permission(self):
1245 uid_1, uid_2 = all_users[:2]
1246 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001247 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001248 os.chown(support.TESTFN, uid_1, gid)
1249 os.chown(support.TESTFN, uid_2, gid)
1250
Berker Peksag036a71b2015-07-21 09:29:48 +03001251 @classmethod
1252 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001253 os.rmdir(support.TESTFN)
1254
1255
Andrew Svetlov405faed2012-12-25 12:18:09 +02001256class RemoveDirsTests(unittest.TestCase):
1257 def setUp(self):
1258 os.makedirs(support.TESTFN)
1259
1260 def tearDown(self):
1261 support.rmtree(support.TESTFN)
1262
1263 def test_remove_all(self):
1264 dira = os.path.join(support.TESTFN, 'dira')
1265 os.mkdir(dira)
1266 dirb = os.path.join(dira, 'dirb')
1267 os.mkdir(dirb)
1268 os.removedirs(dirb)
1269 self.assertFalse(os.path.exists(dirb))
1270 self.assertFalse(os.path.exists(dira))
1271 self.assertFalse(os.path.exists(support.TESTFN))
1272
1273 def test_remove_partial(self):
1274 dira = os.path.join(support.TESTFN, 'dira')
1275 os.mkdir(dira)
1276 dirb = os.path.join(dira, 'dirb')
1277 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001278 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001279 os.removedirs(dirb)
1280 self.assertFalse(os.path.exists(dirb))
1281 self.assertTrue(os.path.exists(dira))
1282 self.assertTrue(os.path.exists(support.TESTFN))
1283
1284 def test_remove_nothing(self):
1285 dira = os.path.join(support.TESTFN, 'dira')
1286 os.mkdir(dira)
1287 dirb = os.path.join(dira, 'dirb')
1288 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001289 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001290 with self.assertRaises(OSError):
1291 os.removedirs(dirb)
1292 self.assertTrue(os.path.exists(dirb))
1293 self.assertTrue(os.path.exists(dira))
1294 self.assertTrue(os.path.exists(support.TESTFN))
1295
1296
Guido van Rossume7ba4952007-06-06 23:52:48 +00001297class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001298 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001299 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001300 f.write(b'hello')
1301 f.close()
1302 with open(os.devnull, 'rb') as f:
1303 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001304
Andrew Svetlov405faed2012-12-25 12:18:09 +02001305
Guido van Rossume7ba4952007-06-06 23:52:48 +00001306class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001307 def test_urandom_length(self):
1308 self.assertEqual(len(os.urandom(0)), 0)
1309 self.assertEqual(len(os.urandom(1)), 1)
1310 self.assertEqual(len(os.urandom(10)), 10)
1311 self.assertEqual(len(os.urandom(100)), 100)
1312 self.assertEqual(len(os.urandom(1000)), 1000)
1313
1314 def test_urandom_value(self):
1315 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001316 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001317 data2 = os.urandom(16)
1318 self.assertNotEqual(data1, data2)
1319
1320 def get_urandom_subprocess(self, count):
1321 code = '\n'.join((
1322 'import os, sys',
1323 'data = os.urandom(%s)' % count,
1324 'sys.stdout.buffer.write(data)',
1325 'sys.stdout.buffer.flush()'))
1326 out = assert_python_ok('-c', code)
1327 stdout = out[1]
1328 self.assertEqual(len(stdout), 16)
1329 return stdout
1330
1331 def test_urandom_subprocess(self):
1332 data1 = self.get_urandom_subprocess(16)
1333 data2 = self.get_urandom_subprocess(16)
1334 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001335
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001336
Victor Stinner9b1f4742016-09-06 16:18:52 -07001337@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1338class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001339 @classmethod
1340 def setUpClass(cls):
1341 try:
1342 os.getrandom(1)
1343 except OSError as exc:
1344 if exc.errno == errno.ENOSYS:
1345 # Python compiled on a more recent Linux version
1346 # than the current Linux kernel
1347 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1348 else:
1349 raise
1350
Victor Stinner9b1f4742016-09-06 16:18:52 -07001351 def test_getrandom_type(self):
1352 data = os.getrandom(16)
1353 self.assertIsInstance(data, bytes)
1354 self.assertEqual(len(data), 16)
1355
1356 def test_getrandom0(self):
1357 empty = os.getrandom(0)
1358 self.assertEqual(empty, b'')
1359
1360 def test_getrandom_random(self):
1361 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1362
1363 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1364 # resource /dev/random
1365
1366 def test_getrandom_nonblock(self):
1367 # The call must not fail. Check also that the flag exists
1368 try:
1369 os.getrandom(1, os.GRND_NONBLOCK)
1370 except BlockingIOError:
1371 # System urandom is not initialized yet
1372 pass
1373
1374 def test_getrandom_value(self):
1375 data1 = os.getrandom(16)
1376 data2 = os.getrandom(16)
1377 self.assertNotEqual(data1, data2)
1378
1379
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001380# os.urandom() doesn't use a file descriptor when it is implemented with the
1381# getentropy() function, the getrandom() function or the getrandom() syscall
1382OS_URANDOM_DONT_USE_FD = (
1383 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1384 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1385 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001386
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001387@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1388 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001389class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001390 @unittest.skipUnless(resource, "test requires the resource module")
1391 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001392 # Check urandom() failing when it is not able to open /dev/random.
1393 # We spawn a new process to make the test more robust (if getrlimit()
1394 # failed to restore the file descriptor limit after this, the whole
1395 # test suite would crash; this actually happened on the OS X Tiger
1396 # buildbot).
1397 code = """if 1:
1398 import errno
1399 import os
1400 import resource
1401
1402 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1403 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1404 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001405 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001406 except OSError as e:
1407 assert e.errno == errno.EMFILE, e.errno
1408 else:
1409 raise AssertionError("OSError not raised")
1410 """
1411 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001412
Antoine Pitroue472aea2014-04-26 14:33:03 +02001413 def test_urandom_fd_closed(self):
1414 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1415 # closed.
1416 code = """if 1:
1417 import os
1418 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001419 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001420 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001421 with test.support.SuppressCrashReport():
1422 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001423 sys.stdout.buffer.write(os.urandom(4))
1424 """
1425 rc, out, err = assert_python_ok('-Sc', code)
1426
1427 def test_urandom_fd_reopened(self):
1428 # Issue #21207: urandom() should detect its fd to /dev/urandom
1429 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001430 self.addCleanup(support.unlink, support.TESTFN)
1431 create_file(support.TESTFN, b"x" * 256)
1432
Antoine Pitroue472aea2014-04-26 14:33:03 +02001433 code = """if 1:
1434 import os
1435 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001436 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001437 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001438 with test.support.SuppressCrashReport():
1439 for fd in range(3, 256):
1440 try:
1441 os.close(fd)
1442 except OSError:
1443 pass
1444 else:
1445 # Found the urandom fd (XXX hopefully)
1446 break
1447 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001448 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001449 new_fd = f.fileno()
1450 # Issue #26935: posix allows new_fd and fd to be equal but
1451 # some libc implementations have dup2 return an error in this
1452 # case.
1453 if new_fd != fd:
1454 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001455 sys.stdout.buffer.write(os.urandom(4))
1456 sys.stdout.buffer.write(os.urandom(4))
1457 """.format(TESTFN=support.TESTFN)
1458 rc, out, err = assert_python_ok('-Sc', code)
1459 self.assertEqual(len(out), 8)
1460 self.assertNotEqual(out[0:4], out[4:8])
1461 rc, out2, err2 = assert_python_ok('-Sc', code)
1462 self.assertEqual(len(out2), 8)
1463 self.assertNotEqual(out2, out)
1464
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001465
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001466@contextlib.contextmanager
1467def _execvpe_mockup(defpath=None):
1468 """
1469 Stubs out execv and execve functions when used as context manager.
1470 Records exec calls. The mock execv and execve functions always raise an
1471 exception as they would normally never return.
1472 """
1473 # A list of tuples containing (function name, first arg, args)
1474 # of calls to execv or execve that have been made.
1475 calls = []
1476
1477 def mock_execv(name, *args):
1478 calls.append(('execv', name, args))
1479 raise RuntimeError("execv called")
1480
1481 def mock_execve(name, *args):
1482 calls.append(('execve', name, args))
1483 raise OSError(errno.ENOTDIR, "execve called")
1484
1485 try:
1486 orig_execv = os.execv
1487 orig_execve = os.execve
1488 orig_defpath = os.defpath
1489 os.execv = mock_execv
1490 os.execve = mock_execve
1491 if defpath is not None:
1492 os.defpath = defpath
1493 yield calls
1494 finally:
1495 os.execv = orig_execv
1496 os.execve = orig_execve
1497 os.defpath = orig_defpath
1498
Victor Stinner4659ccf2016-09-14 10:57:00 +02001499
Guido van Rossume7ba4952007-06-06 23:52:48 +00001500class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001501 @unittest.skipIf(USING_LINUXTHREADS,
1502 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001503 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001504 self.assertRaises(OSError, os.execvpe, 'no such app-',
1505 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001506
Steve Dowerbce26262016-11-19 19:17:26 -08001507 def test_execv_with_bad_arglist(self):
1508 self.assertRaises(ValueError, os.execv, 'notepad', ())
1509 self.assertRaises(ValueError, os.execv, 'notepad', [])
1510 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1511 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1512
Thomas Heller6790d602007-08-30 17:15:14 +00001513 def test_execvpe_with_bad_arglist(self):
1514 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001515 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1516 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001517
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001518 @unittest.skipUnless(hasattr(os, '_execvpe'),
1519 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001520 def _test_internal_execvpe(self, test_type):
1521 program_path = os.sep + 'absolutepath'
1522 if test_type is bytes:
1523 program = b'executable'
1524 fullpath = os.path.join(os.fsencode(program_path), program)
1525 native_fullpath = fullpath
1526 arguments = [b'progname', 'arg1', 'arg2']
1527 else:
1528 program = 'executable'
1529 arguments = ['progname', 'arg1', 'arg2']
1530 fullpath = os.path.join(program_path, program)
1531 if os.name != "nt":
1532 native_fullpath = os.fsencode(fullpath)
1533 else:
1534 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001535 env = {'spam': 'beans'}
1536
Victor Stinnerb745a742010-05-18 17:17:23 +00001537 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001538 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001539 self.assertRaises(RuntimeError,
1540 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001541 self.assertEqual(len(calls), 1)
1542 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1543
Victor Stinnerb745a742010-05-18 17:17:23 +00001544 # test os._execvpe() with a relative path:
1545 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001546 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001547 self.assertRaises(OSError,
1548 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001549 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001550 self.assertSequenceEqual(calls[0],
1551 ('execve', native_fullpath, (arguments, env)))
1552
1553 # test os._execvpe() with a relative path:
1554 # os.get_exec_path() reads the 'PATH' variable
1555 with _execvpe_mockup() as calls:
1556 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001557 if test_type is bytes:
1558 env_path[b'PATH'] = program_path
1559 else:
1560 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001561 self.assertRaises(OSError,
1562 os._execvpe, program, arguments, env=env_path)
1563 self.assertEqual(len(calls), 1)
1564 self.assertSequenceEqual(calls[0],
1565 ('execve', native_fullpath, (arguments, env_path)))
1566
1567 def test_internal_execvpe_str(self):
1568 self._test_internal_execvpe(str)
1569 if os.name != "nt":
1570 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001571
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001572
Serhiy Storchaka43767632013-11-03 21:31:38 +02001573@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001574class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001575 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001576 try:
1577 os.stat(support.TESTFN)
1578 except FileNotFoundError:
1579 exists = False
1580 except OSError as exc:
1581 exists = True
1582 self.fail("file %s must not exist; os.stat failed with %s"
1583 % (support.TESTFN, exc))
1584 else:
1585 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001586
Thomas Wouters477c8d52006-05-27 19:21:47 +00001587 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001588 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001589
1590 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001591 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001592
1593 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001594 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001595
1596 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001597 self.addCleanup(support.unlink, support.TESTFN)
1598
Victor Stinnere77c9742016-03-25 10:28:23 +01001599 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001600 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001601
1602 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001603 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001604
Thomas Wouters477c8d52006-05-27 19:21:47 +00001605 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001606 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001607
Victor Stinnere77c9742016-03-25 10:28:23 +01001608
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001609class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001610 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001611 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1612 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001613 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001614 def get_single(f):
1615 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001616 if hasattr(os, f):
1617 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001618 return helper
1619 for f in singles:
1620 locals()["test_"+f] = get_single(f)
1621
Benjamin Peterson7522c742009-01-19 21:00:09 +00001622 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001623 try:
1624 f(support.make_bad_fd(), *args)
1625 except OSError as e:
1626 self.assertEqual(e.errno, errno.EBADF)
1627 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001628 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001629 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001630
Serhiy Storchaka43767632013-11-03 21:31:38 +02001631 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001632 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001633 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001634
Serhiy Storchaka43767632013-11-03 21:31:38 +02001635 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001636 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 fd = support.make_bad_fd()
1638 # Make sure none of the descriptors we are about to close are
1639 # currently valid (issue 6542).
1640 for i in range(10):
1641 try: os.fstat(fd+i)
1642 except OSError:
1643 pass
1644 else:
1645 break
1646 if i < 2:
1647 raise unittest.SkipTest(
1648 "Unable to acquire a range of invalid file descriptors")
1649 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001650
Serhiy Storchaka43767632013-11-03 21:31:38 +02001651 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001652 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001653 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001654
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001657 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001658
Serhiy Storchaka43767632013-11-03 21:31:38 +02001659 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001660 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001661 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001662
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001664 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 self.check(os.pathconf, "PC_NAME_MAX")
1666 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001667
Serhiy Storchaka43767632013-11-03 21:31:38 +02001668 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001669 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001670 self.check(os.truncate, 0)
1671 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001672
Serhiy Storchaka43767632013-11-03 21:31:38 +02001673 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001674 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001675 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001676
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001678 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001679 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001680
Victor Stinner57ddf782014-01-08 15:21:28 +01001681 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1682 def test_readv(self):
1683 buf = bytearray(10)
1684 self.check(os.readv, [buf])
1685
Serhiy Storchaka43767632013-11-03 21:31:38 +02001686 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001687 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001688 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001689
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001691 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001692 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001693
Victor Stinner57ddf782014-01-08 15:21:28 +01001694 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1695 def test_writev(self):
1696 self.check(os.writev, [b'abc'])
1697
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001698 def test_inheritable(self):
1699 self.check(os.get_inheritable)
1700 self.check(os.set_inheritable, True)
1701
1702 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1703 'needs os.get_blocking() and os.set_blocking()')
1704 def test_blocking(self):
1705 self.check(os.get_blocking)
1706 self.check(os.set_blocking, True)
1707
Brian Curtin1b9df392010-11-24 20:24:31 +00001708
1709class LinkTests(unittest.TestCase):
1710 def setUp(self):
1711 self.file1 = support.TESTFN
1712 self.file2 = os.path.join(support.TESTFN + "2")
1713
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001714 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001715 for file in (self.file1, self.file2):
1716 if os.path.exists(file):
1717 os.unlink(file)
1718
Brian Curtin1b9df392010-11-24 20:24:31 +00001719 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001720 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001721
Steve Dowercc16be82016-09-08 10:35:16 -07001722 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001723 with open(file1, "r") as f1, open(file2, "r") as f2:
1724 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1725
1726 def test_link(self):
1727 self._test_link(self.file1, self.file2)
1728
1729 def test_link_bytes(self):
1730 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1731 bytes(self.file2, sys.getfilesystemencoding()))
1732
Brian Curtinf498b752010-11-30 15:54:04 +00001733 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001734 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001735 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001736 except UnicodeError:
1737 raise unittest.SkipTest("Unable to encode for this platform.")
1738
Brian Curtinf498b752010-11-30 15:54:04 +00001739 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001740 self.file2 = self.file1 + "2"
1741 self._test_link(self.file1, self.file2)
1742
Serhiy Storchaka43767632013-11-03 21:31:38 +02001743@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1744class PosixUidGidTests(unittest.TestCase):
1745 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1746 def test_setuid(self):
1747 if os.getuid() != 0:
1748 self.assertRaises(OSError, os.setuid, 0)
1749 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001750
Serhiy Storchaka43767632013-11-03 21:31:38 +02001751 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1752 def test_setgid(self):
1753 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1754 self.assertRaises(OSError, os.setgid, 0)
1755 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001756
Serhiy Storchaka43767632013-11-03 21:31:38 +02001757 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1758 def test_seteuid(self):
1759 if os.getuid() != 0:
1760 self.assertRaises(OSError, os.seteuid, 0)
1761 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001762
Serhiy Storchaka43767632013-11-03 21:31:38 +02001763 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1764 def test_setegid(self):
1765 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1766 self.assertRaises(OSError, os.setegid, 0)
1767 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001768
Serhiy Storchaka43767632013-11-03 21:31:38 +02001769 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1770 def test_setreuid(self):
1771 if os.getuid() != 0:
1772 self.assertRaises(OSError, os.setreuid, 0, 0)
1773 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1774 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001775
Serhiy Storchaka43767632013-11-03 21:31:38 +02001776 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1777 def test_setreuid_neg1(self):
1778 # Needs to accept -1. We run this in a subprocess to avoid
1779 # altering the test runner's process state (issue8045).
1780 subprocess.check_call([
1781 sys.executable, '-c',
1782 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001783
Serhiy Storchaka43767632013-11-03 21:31:38 +02001784 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1785 def test_setregid(self):
1786 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1787 self.assertRaises(OSError, os.setregid, 0, 0)
1788 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1789 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001790
Serhiy Storchaka43767632013-11-03 21:31:38 +02001791 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1792 def test_setregid_neg1(self):
1793 # Needs to accept -1. We run this in a subprocess to avoid
1794 # altering the test runner's process state (issue8045).
1795 subprocess.check_call([
1796 sys.executable, '-c',
1797 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001798
Serhiy Storchaka43767632013-11-03 21:31:38 +02001799@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1800class Pep383Tests(unittest.TestCase):
1801 def setUp(self):
1802 if support.TESTFN_UNENCODABLE:
1803 self.dir = support.TESTFN_UNENCODABLE
1804 elif support.TESTFN_NONASCII:
1805 self.dir = support.TESTFN_NONASCII
1806 else:
1807 self.dir = support.TESTFN
1808 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001809
Serhiy Storchaka43767632013-11-03 21:31:38 +02001810 bytesfn = []
1811 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001812 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001813 fn = os.fsencode(fn)
1814 except UnicodeEncodeError:
1815 return
1816 bytesfn.append(fn)
1817 add_filename(support.TESTFN_UNICODE)
1818 if support.TESTFN_UNENCODABLE:
1819 add_filename(support.TESTFN_UNENCODABLE)
1820 if support.TESTFN_NONASCII:
1821 add_filename(support.TESTFN_NONASCII)
1822 if not bytesfn:
1823 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001824
Serhiy Storchaka43767632013-11-03 21:31:38 +02001825 self.unicodefn = set()
1826 os.mkdir(self.dir)
1827 try:
1828 for fn in bytesfn:
1829 support.create_empty_file(os.path.join(self.bdir, fn))
1830 fn = os.fsdecode(fn)
1831 if fn in self.unicodefn:
1832 raise ValueError("duplicate filename")
1833 self.unicodefn.add(fn)
1834 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001835 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001837
Serhiy Storchaka43767632013-11-03 21:31:38 +02001838 def tearDown(self):
1839 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001840
Serhiy Storchaka43767632013-11-03 21:31:38 +02001841 def test_listdir(self):
1842 expected = self.unicodefn
1843 found = set(os.listdir(self.dir))
1844 self.assertEqual(found, expected)
1845 # test listdir without arguments
1846 current_directory = os.getcwd()
1847 try:
1848 os.chdir(os.sep)
1849 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1850 finally:
1851 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001852
Serhiy Storchaka43767632013-11-03 21:31:38 +02001853 def test_open(self):
1854 for fn in self.unicodefn:
1855 f = open(os.path.join(self.dir, fn), 'rb')
1856 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001857
Serhiy Storchaka43767632013-11-03 21:31:38 +02001858 @unittest.skipUnless(hasattr(os, 'statvfs'),
1859 "need os.statvfs()")
1860 def test_statvfs(self):
1861 # issue #9645
1862 for fn in self.unicodefn:
1863 # should not fail with file not found error
1864 fullname = os.path.join(self.dir, fn)
1865 os.statvfs(fullname)
1866
1867 def test_stat(self):
1868 for fn in self.unicodefn:
1869 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001870
Brian Curtineb24d742010-04-12 17:16:38 +00001871@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1872class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001873 def _kill(self, sig):
1874 # Start sys.executable as a subprocess and communicate from the
1875 # subprocess to the parent that the interpreter is ready. When it
1876 # becomes ready, send *sig* via os.kill to the subprocess and check
1877 # that the return code is equal to *sig*.
1878 import ctypes
1879 from ctypes import wintypes
1880 import msvcrt
1881
1882 # Since we can't access the contents of the process' stdout until the
1883 # process has exited, use PeekNamedPipe to see what's inside stdout
1884 # without waiting. This is done so we can tell that the interpreter
1885 # is started and running at a point where it could handle a signal.
1886 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1887 PeekNamedPipe.restype = wintypes.BOOL
1888 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1889 ctypes.POINTER(ctypes.c_char), # stdout buf
1890 wintypes.DWORD, # Buffer size
1891 ctypes.POINTER(wintypes.DWORD), # bytes read
1892 ctypes.POINTER(wintypes.DWORD), # bytes avail
1893 ctypes.POINTER(wintypes.DWORD)) # bytes left
1894 msg = "running"
1895 proc = subprocess.Popen([sys.executable, "-c",
1896 "import sys;"
1897 "sys.stdout.write('{}');"
1898 "sys.stdout.flush();"
1899 "input()".format(msg)],
1900 stdout=subprocess.PIPE,
1901 stderr=subprocess.PIPE,
1902 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001903 self.addCleanup(proc.stdout.close)
1904 self.addCleanup(proc.stderr.close)
1905 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001906
1907 count, max = 0, 100
1908 while count < max and proc.poll() is None:
1909 # Create a string buffer to store the result of stdout from the pipe
1910 buf = ctypes.create_string_buffer(len(msg))
1911 # Obtain the text currently in proc.stdout
1912 # Bytes read/avail/left are left as NULL and unused
1913 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1914 buf, ctypes.sizeof(buf), None, None, None)
1915 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1916 if buf.value:
1917 self.assertEqual(msg, buf.value.decode())
1918 break
1919 time.sleep(0.1)
1920 count += 1
1921 else:
1922 self.fail("Did not receive communication from the subprocess")
1923
Brian Curtineb24d742010-04-12 17:16:38 +00001924 os.kill(proc.pid, sig)
1925 self.assertEqual(proc.wait(), sig)
1926
1927 def test_kill_sigterm(self):
1928 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001929 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001930
1931 def test_kill_int(self):
1932 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001933 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001934
1935 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001936 tagname = "test_os_%s" % uuid.uuid1()
1937 m = mmap.mmap(-1, 1, tagname)
1938 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001939 # Run a script which has console control handling enabled.
1940 proc = subprocess.Popen([sys.executable,
1941 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001942 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001943 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1944 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001945 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001946 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001947 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001948 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001949 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001950 count += 1
1951 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001952 # Forcefully kill the process if we weren't able to signal it.
1953 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001954 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001955 os.kill(proc.pid, event)
1956 # proc.send_signal(event) could also be done here.
1957 # Allow time for the signal to be passed and the process to exit.
1958 time.sleep(0.5)
1959 if not proc.poll():
1960 # Forcefully kill the process if we weren't able to signal it.
1961 os.kill(proc.pid, signal.SIGINT)
1962 self.fail("subprocess did not stop on {}".format(name))
1963
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001964 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001965 def test_CTRL_C_EVENT(self):
1966 from ctypes import wintypes
1967 import ctypes
1968
1969 # Make a NULL value by creating a pointer with no argument.
1970 NULL = ctypes.POINTER(ctypes.c_int)()
1971 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1972 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1973 wintypes.BOOL)
1974 SetConsoleCtrlHandler.restype = wintypes.BOOL
1975
1976 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001977 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001978 # by subprocesses.
1979 SetConsoleCtrlHandler(NULL, 0)
1980
1981 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1982
1983 def test_CTRL_BREAK_EVENT(self):
1984 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1985
1986
Brian Curtind40e6f72010-07-08 21:39:08 +00001987@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001988class Win32ListdirTests(unittest.TestCase):
1989 """Test listdir on Windows."""
1990
1991 def setUp(self):
1992 self.created_paths = []
1993 for i in range(2):
1994 dir_name = 'SUB%d' % i
1995 dir_path = os.path.join(support.TESTFN, dir_name)
1996 file_name = 'FILE%d' % i
1997 file_path = os.path.join(support.TESTFN, file_name)
1998 os.makedirs(dir_path)
1999 with open(file_path, 'w') as f:
2000 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2001 self.created_paths.extend([dir_name, file_name])
2002 self.created_paths.sort()
2003
2004 def tearDown(self):
2005 shutil.rmtree(support.TESTFN)
2006
2007 def test_listdir_no_extended_path(self):
2008 """Test when the path is not an "extended" path."""
2009 # unicode
2010 self.assertEqual(
2011 sorted(os.listdir(support.TESTFN)),
2012 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002013
Tim Golden781bbeb2013-10-25 20:24:06 +01002014 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002015 self.assertEqual(
2016 sorted(os.listdir(os.fsencode(support.TESTFN))),
2017 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002018
2019 def test_listdir_extended_path(self):
2020 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002021 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002022 # unicode
2023 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2024 self.assertEqual(
2025 sorted(os.listdir(path)),
2026 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002027
Tim Golden781bbeb2013-10-25 20:24:06 +01002028 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002029 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2030 self.assertEqual(
2031 sorted(os.listdir(path)),
2032 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002033
2034
2035@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002036@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002037class Win32SymlinkTests(unittest.TestCase):
2038 filelink = 'filelinktest'
2039 filelink_target = os.path.abspath(__file__)
2040 dirlink = 'dirlinktest'
2041 dirlink_target = os.path.dirname(filelink_target)
2042 missing_link = 'missing link'
2043
2044 def setUp(self):
2045 assert os.path.exists(self.dirlink_target)
2046 assert os.path.exists(self.filelink_target)
2047 assert not os.path.exists(self.dirlink)
2048 assert not os.path.exists(self.filelink)
2049 assert not os.path.exists(self.missing_link)
2050
2051 def tearDown(self):
2052 if os.path.exists(self.filelink):
2053 os.remove(self.filelink)
2054 if os.path.exists(self.dirlink):
2055 os.rmdir(self.dirlink)
2056 if os.path.lexists(self.missing_link):
2057 os.remove(self.missing_link)
2058
2059 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002060 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002061 self.assertTrue(os.path.exists(self.dirlink))
2062 self.assertTrue(os.path.isdir(self.dirlink))
2063 self.assertTrue(os.path.islink(self.dirlink))
2064 self.check_stat(self.dirlink, self.dirlink_target)
2065
2066 def test_file_link(self):
2067 os.symlink(self.filelink_target, self.filelink)
2068 self.assertTrue(os.path.exists(self.filelink))
2069 self.assertTrue(os.path.isfile(self.filelink))
2070 self.assertTrue(os.path.islink(self.filelink))
2071 self.check_stat(self.filelink, self.filelink_target)
2072
2073 def _create_missing_dir_link(self):
2074 'Create a "directory" link to a non-existent target'
2075 linkname = self.missing_link
2076 if os.path.lexists(linkname):
2077 os.remove(linkname)
2078 target = r'c:\\target does not exist.29r3c740'
2079 assert not os.path.exists(target)
2080 target_is_dir = True
2081 os.symlink(target, linkname, target_is_dir)
2082
2083 def test_remove_directory_link_to_missing_target(self):
2084 self._create_missing_dir_link()
2085 # For compatibility with Unix, os.remove will check the
2086 # directory status and call RemoveDirectory if the symlink
2087 # was created with target_is_dir==True.
2088 os.remove(self.missing_link)
2089
2090 @unittest.skip("currently fails; consider for improvement")
2091 def test_isdir_on_directory_link_to_missing_target(self):
2092 self._create_missing_dir_link()
2093 # consider having isdir return true for directory links
2094 self.assertTrue(os.path.isdir(self.missing_link))
2095
2096 @unittest.skip("currently fails; consider for improvement")
2097 def test_rmdir_on_directory_link_to_missing_target(self):
2098 self._create_missing_dir_link()
2099 # consider allowing rmdir to remove directory links
2100 os.rmdir(self.missing_link)
2101
2102 def check_stat(self, link, target):
2103 self.assertEqual(os.stat(link), os.stat(target))
2104 self.assertNotEqual(os.lstat(link), os.stat(link))
2105
Brian Curtind25aef52011-06-13 15:16:04 -05002106 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002107 self.assertEqual(os.stat(bytes_link), os.stat(target))
2108 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002109
2110 def test_12084(self):
2111 level1 = os.path.abspath(support.TESTFN)
2112 level2 = os.path.join(level1, "level2")
2113 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002114 self.addCleanup(support.rmtree, level1)
2115
2116 os.mkdir(level1)
2117 os.mkdir(level2)
2118 os.mkdir(level3)
2119
2120 file1 = os.path.abspath(os.path.join(level1, "file1"))
2121 create_file(file1)
2122
2123 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002124 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002125 os.chdir(level2)
2126 link = os.path.join(level2, "link")
2127 os.symlink(os.path.relpath(file1), "link")
2128 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002129
Victor Stinnerae39d232016-03-24 17:12:55 +01002130 # Check os.stat calls from the same dir as the link
2131 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002132
Victor Stinnerae39d232016-03-24 17:12:55 +01002133 # Check os.stat calls from a dir below the link
2134 os.chdir(level1)
2135 self.assertEqual(os.stat(file1),
2136 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002137
Victor Stinnerae39d232016-03-24 17:12:55 +01002138 # Check os.stat calls from a dir above the link
2139 os.chdir(level3)
2140 self.assertEqual(os.stat(file1),
2141 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002142 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002143 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002144
Brian Curtind40e6f72010-07-08 21:39:08 +00002145
Tim Golden0321cf22014-05-05 19:46:17 +01002146@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2147class Win32JunctionTests(unittest.TestCase):
2148 junction = 'junctiontest'
2149 junction_target = os.path.dirname(os.path.abspath(__file__))
2150
2151 def setUp(self):
2152 assert os.path.exists(self.junction_target)
2153 assert not os.path.exists(self.junction)
2154
2155 def tearDown(self):
2156 if os.path.exists(self.junction):
2157 # os.rmdir delegates to Windows' RemoveDirectoryW,
2158 # which removes junction points safely.
2159 os.rmdir(self.junction)
2160
2161 def test_create_junction(self):
2162 _winapi.CreateJunction(self.junction_target, self.junction)
2163 self.assertTrue(os.path.exists(self.junction))
2164 self.assertTrue(os.path.isdir(self.junction))
2165
2166 # Junctions are not recognized as links.
2167 self.assertFalse(os.path.islink(self.junction))
2168
2169 def test_unlink_removes_junction(self):
2170 _winapi.CreateJunction(self.junction_target, self.junction)
2171 self.assertTrue(os.path.exists(self.junction))
2172
2173 os.unlink(self.junction)
2174 self.assertFalse(os.path.exists(self.junction))
2175
2176
Jason R. Coombs3a092862013-05-27 23:21:28 -04002177@support.skip_unless_symlink
2178class NonLocalSymlinkTests(unittest.TestCase):
2179
2180 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002181 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002182 Create this structure:
2183
2184 base
2185 \___ some_dir
2186 """
2187 os.makedirs('base/some_dir')
2188
2189 def tearDown(self):
2190 shutil.rmtree('base')
2191
2192 def test_directory_link_nonlocal(self):
2193 """
2194 The symlink target should resolve relative to the link, not relative
2195 to the current directory.
2196
2197 Then, link base/some_link -> base/some_dir and ensure that some_link
2198 is resolved as a directory.
2199
2200 In issue13772, it was discovered that directory detection failed if
2201 the symlink target was not specified relative to the current
2202 directory, which was a defect in the implementation.
2203 """
2204 src = os.path.join('base', 'some_link')
2205 os.symlink('some_dir', src)
2206 assert os.path.isdir(src)
2207
2208
Victor Stinnere8d51452010-08-19 01:05:19 +00002209class FSEncodingTests(unittest.TestCase):
2210 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2212 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002213
Victor Stinnere8d51452010-08-19 01:05:19 +00002214 def test_identity(self):
2215 # assert fsdecode(fsencode(x)) == x
2216 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2217 try:
2218 bytesfn = os.fsencode(fn)
2219 except UnicodeEncodeError:
2220 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002221 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002222
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002223
Brett Cannonefb00c02012-02-29 18:31:31 -05002224
2225class DeviceEncodingTests(unittest.TestCase):
2226
2227 def test_bad_fd(self):
2228 # Return None when an fd doesn't actually exist.
2229 self.assertIsNone(os.device_encoding(123456))
2230
Philip Jenveye308b7c2012-02-29 16:16:15 -08002231 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2232 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002233 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002234 def test_device_encoding(self):
2235 encoding = os.device_encoding(0)
2236 self.assertIsNotNone(encoding)
2237 self.assertTrue(codecs.lookup(encoding))
2238
2239
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002240class PidTests(unittest.TestCase):
2241 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2242 def test_getppid(self):
2243 p = subprocess.Popen([sys.executable, '-c',
2244 'import os; print(os.getppid())'],
2245 stdout=subprocess.PIPE)
2246 stdout, _ = p.communicate()
2247 # We are the parent of our subprocess
2248 self.assertEqual(int(stdout), os.getpid())
2249
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002250 def test_waitpid(self):
2251 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002252 # Add an implicit test for PyUnicode_FSConverter().
2253 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002254 status = os.waitpid(pid, 0)
2255 self.assertEqual(status, (pid, 0))
2256
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002257
Victor Stinner4659ccf2016-09-14 10:57:00 +02002258class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002259 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002260 self.exitcode = 17
2261
2262 filename = support.TESTFN
2263 self.addCleanup(support.unlink, filename)
2264
2265 if not with_env:
2266 code = 'import sys; sys.exit(%s)' % self.exitcode
2267 else:
2268 self.env = dict(os.environ)
2269 # create an unique key
2270 self.key = str(uuid.uuid4())
2271 self.env[self.key] = self.key
2272 # read the variable from os.environ to check that it exists
2273 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2274 % (self.key, self.exitcode))
2275
2276 with open(filename, "w") as fp:
2277 fp.write(code)
2278
Berker Peksag81816462016-09-15 20:19:47 +03002279 args = [sys.executable, filename]
2280 if use_bytes:
2281 args = [os.fsencode(a) for a in args]
2282 self.env = {os.fsencode(k): os.fsencode(v)
2283 for k, v in self.env.items()}
2284
2285 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002286
Berker Peksag4af23d72016-09-15 20:32:44 +03002287 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002288 def test_spawnl(self):
2289 args = self.create_args()
2290 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2291 self.assertEqual(exitcode, self.exitcode)
2292
Berker Peksag4af23d72016-09-15 20:32:44 +03002293 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002294 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002295 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002296 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2297 self.assertEqual(exitcode, self.exitcode)
2298
Berker Peksag4af23d72016-09-15 20:32:44 +03002299 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002300 def test_spawnlp(self):
2301 args = self.create_args()
2302 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2303 self.assertEqual(exitcode, self.exitcode)
2304
Berker Peksag4af23d72016-09-15 20:32:44 +03002305 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002306 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002307 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002308 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2309 self.assertEqual(exitcode, self.exitcode)
2310
Berker Peksag4af23d72016-09-15 20:32:44 +03002311 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002312 def test_spawnv(self):
2313 args = self.create_args()
2314 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2315 self.assertEqual(exitcode, self.exitcode)
2316
Berker Peksag4af23d72016-09-15 20:32:44 +03002317 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002318 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002319 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002320 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2321 self.assertEqual(exitcode, self.exitcode)
2322
Berker Peksag4af23d72016-09-15 20:32:44 +03002323 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002324 def test_spawnvp(self):
2325 args = self.create_args()
2326 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2327 self.assertEqual(exitcode, self.exitcode)
2328
Berker Peksag4af23d72016-09-15 20:32:44 +03002329 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002330 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002331 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002332 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2333 self.assertEqual(exitcode, self.exitcode)
2334
Berker Peksag4af23d72016-09-15 20:32:44 +03002335 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002336 def test_nowait(self):
2337 args = self.create_args()
2338 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2339 result = os.waitpid(pid, 0)
2340 self.assertEqual(result[0], pid)
2341 status = result[1]
2342 if hasattr(os, 'WIFEXITED'):
2343 self.assertTrue(os.WIFEXITED(status))
2344 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2345 else:
2346 self.assertEqual(status, self.exitcode << 8)
2347
Berker Peksag4af23d72016-09-15 20:32:44 +03002348 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002349 def test_spawnve_bytes(self):
2350 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2351 args = self.create_args(with_env=True, use_bytes=True)
2352 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2353 self.assertEqual(exitcode, self.exitcode)
2354
Steve Dower859fd7b2016-11-19 18:53:19 -08002355 @requires_os_func('spawnl')
2356 def test_spawnl_noargs(self):
2357 args = self.create_args()
2358 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002359 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002360
2361 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002362 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002363 args = self.create_args()
2364 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002365 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002366
2367 @requires_os_func('spawnv')
2368 def test_spawnv_noargs(self):
2369 args = self.create_args()
2370 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2371 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002372 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2373 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002374
2375 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002376 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002377 args = self.create_args()
2378 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2379 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002380 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2381 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002382
Brian Curtin0151b8e2010-09-24 13:43:43 +00002383# The introduction of this TestCase caused at least two different errors on
2384# *nix buildbots. Temporarily skip this to let the buildbots move along.
2385@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002386@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2387class LoginTests(unittest.TestCase):
2388 def test_getlogin(self):
2389 user_name = os.getlogin()
2390 self.assertNotEqual(len(user_name), 0)
2391
2392
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002393@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2394 "needs os.getpriority and os.setpriority")
2395class ProgramPriorityTests(unittest.TestCase):
2396 """Tests for os.getpriority() and os.setpriority()."""
2397
2398 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002399
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002400 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2401 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2402 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002403 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2404 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002405 raise unittest.SkipTest("unable to reliably test setpriority "
2406 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002407 else:
2408 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002409 finally:
2410 try:
2411 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2412 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002413 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002414 raise
2415
2416
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002417if threading is not None:
2418 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002419
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002420 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002421
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002422 def __init__(self, conn):
2423 asynchat.async_chat.__init__(self, conn)
2424 self.in_buffer = []
2425 self.closed = False
2426 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002427
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002428 def handle_read(self):
2429 data = self.recv(4096)
2430 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002431
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002432 def get_data(self):
2433 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002434
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002435 def handle_close(self):
2436 self.close()
2437 self.closed = True
2438
2439 def handle_error(self):
2440 raise
2441
2442 def __init__(self, address):
2443 threading.Thread.__init__(self)
2444 asyncore.dispatcher.__init__(self)
2445 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2446 self.bind(address)
2447 self.listen(5)
2448 self.host, self.port = self.socket.getsockname()[:2]
2449 self.handler_instance = None
2450 self._active = False
2451 self._active_lock = threading.Lock()
2452
2453 # --- public API
2454
2455 @property
2456 def running(self):
2457 return self._active
2458
2459 def start(self):
2460 assert not self.running
2461 self.__flag = threading.Event()
2462 threading.Thread.start(self)
2463 self.__flag.wait()
2464
2465 def stop(self):
2466 assert self.running
2467 self._active = False
2468 self.join()
2469
2470 def wait(self):
2471 # wait for handler connection to be closed, then stop the server
2472 while not getattr(self.handler_instance, "closed", False):
2473 time.sleep(0.001)
2474 self.stop()
2475
2476 # --- internals
2477
2478 def run(self):
2479 self._active = True
2480 self.__flag.set()
2481 while self._active and asyncore.socket_map:
2482 self._active_lock.acquire()
2483 asyncore.loop(timeout=0.001, count=1)
2484 self._active_lock.release()
2485 asyncore.close_all()
2486
2487 def handle_accept(self):
2488 conn, addr = self.accept()
2489 self.handler_instance = self.Handler(conn)
2490
2491 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002492 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002493 handle_read = handle_connect
2494
2495 def writable(self):
2496 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002497
2498 def handle_error(self):
2499 raise
2500
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002501
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002502@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002503@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2504class TestSendfile(unittest.TestCase):
2505
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002506 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002507 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002508 not sys.platform.startswith("solaris") and \
2509 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002510 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2511 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002512
2513 @classmethod
2514 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002515 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002516 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002517
2518 @classmethod
2519 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002520 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002521 support.unlink(support.TESTFN)
2522
2523 def setUp(self):
2524 self.server = SendfileTestServer((support.HOST, 0))
2525 self.server.start()
2526 self.client = socket.socket()
2527 self.client.connect((self.server.host, self.server.port))
2528 self.client.settimeout(1)
2529 # synchronize by waiting for "220 ready" response
2530 self.client.recv(1024)
2531 self.sockno = self.client.fileno()
2532 self.file = open(support.TESTFN, 'rb')
2533 self.fileno = self.file.fileno()
2534
2535 def tearDown(self):
2536 self.file.close()
2537 self.client.close()
2538 if self.server.running:
2539 self.server.stop()
2540
2541 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2542 """A higher level wrapper representing how an application is
2543 supposed to use sendfile().
2544 """
2545 while 1:
2546 try:
2547 if self.SUPPORT_HEADERS_TRAILERS:
2548 return os.sendfile(sock, file, offset, nbytes, headers,
2549 trailers)
2550 else:
2551 return os.sendfile(sock, file, offset, nbytes)
2552 except OSError as err:
2553 if err.errno == errno.ECONNRESET:
2554 # disconnected
2555 raise
2556 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2557 # we have to retry send data
2558 continue
2559 else:
2560 raise
2561
2562 def test_send_whole_file(self):
2563 # normal send
2564 total_sent = 0
2565 offset = 0
2566 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002567 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002568 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2569 if sent == 0:
2570 break
2571 offset += sent
2572 total_sent += sent
2573 self.assertTrue(sent <= nbytes)
2574 self.assertEqual(offset, total_sent)
2575
2576 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002577 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002578 self.client.close()
2579 self.server.wait()
2580 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002581 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002582 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002583
2584 def test_send_at_certain_offset(self):
2585 # start sending a file at a certain offset
2586 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002587 offset = len(self.DATA) // 2
2588 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002589 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002590 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002591 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2592 if sent == 0:
2593 break
2594 offset += sent
2595 total_sent += sent
2596 self.assertTrue(sent <= nbytes)
2597
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002598 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002599 self.client.close()
2600 self.server.wait()
2601 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002602 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002603 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002604 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002605 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002606
2607 def test_offset_overflow(self):
2608 # specify an offset > file size
2609 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002610 try:
2611 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2612 except OSError as e:
2613 # Solaris can raise EINVAL if offset >= file length, ignore.
2614 if e.errno != errno.EINVAL:
2615 raise
2616 else:
2617 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002618 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002619 self.client.close()
2620 self.server.wait()
2621 data = self.server.handler_instance.get_data()
2622 self.assertEqual(data, b'')
2623
2624 def test_invalid_offset(self):
2625 with self.assertRaises(OSError) as cm:
2626 os.sendfile(self.sockno, self.fileno, -1, 4096)
2627 self.assertEqual(cm.exception.errno, errno.EINVAL)
2628
Martin Panterbf19d162015-09-09 01:01:13 +00002629 def test_keywords(self):
2630 # Keyword arguments should be supported
2631 os.sendfile(out=self.sockno, offset=0, count=4096,
2632 **{'in': self.fileno})
2633 if self.SUPPORT_HEADERS_TRAILERS:
2634 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002635 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002636
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002637 # --- headers / trailers tests
2638
Serhiy Storchaka43767632013-11-03 21:31:38 +02002639 @requires_headers_trailers
2640 def test_headers(self):
2641 total_sent = 0
2642 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2643 headers=[b"x" * 512])
2644 total_sent += sent
2645 offset = 4096
2646 nbytes = 4096
2647 while 1:
2648 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2649 offset, nbytes)
2650 if sent == 0:
2651 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002652 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002653 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002654
Serhiy Storchaka43767632013-11-03 21:31:38 +02002655 expected_data = b"x" * 512 + self.DATA
2656 self.assertEqual(total_sent, len(expected_data))
2657 self.client.close()
2658 self.server.wait()
2659 data = self.server.handler_instance.get_data()
2660 self.assertEqual(hash(data), hash(expected_data))
2661
2662 @requires_headers_trailers
2663 def test_trailers(self):
2664 TESTFN2 = support.TESTFN + "2"
2665 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002666
2667 self.addCleanup(support.unlink, TESTFN2)
2668 create_file(TESTFN2, file_data)
2669
2670 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002671 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2672 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002673 self.client.close()
2674 self.server.wait()
2675 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002676 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002677
Serhiy Storchaka43767632013-11-03 21:31:38 +02002678 @requires_headers_trailers
2679 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2680 'test needs os.SF_NODISKIO')
2681 def test_flags(self):
2682 try:
2683 os.sendfile(self.sockno, self.fileno, 0, 4096,
2684 flags=os.SF_NODISKIO)
2685 except OSError as err:
2686 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2687 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002688
2689
Larry Hastings9cf065c2012-06-22 16:30:09 -07002690def supports_extended_attributes():
2691 if not hasattr(os, "setxattr"):
2692 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002693
Larry Hastings9cf065c2012-06-22 16:30:09 -07002694 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002695 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002696 try:
2697 os.setxattr(fp.fileno(), b"user.test", b"")
2698 except OSError:
2699 return False
2700 finally:
2701 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002702
2703 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002704
2705
2706@unittest.skipUnless(supports_extended_attributes(),
2707 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002708# Kernels < 2.6.39 don't respect setxattr flags.
2709@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002710class ExtendedAttributeTests(unittest.TestCase):
2711
Larry Hastings9cf065c2012-06-22 16:30:09 -07002712 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002713 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002714 self.addCleanup(support.unlink, fn)
2715 create_file(fn)
2716
Benjamin Peterson799bd802011-08-31 22:15:17 -04002717 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002718 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002719 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002720
Victor Stinnerf12e5062011-10-16 22:12:03 +02002721 init_xattr = listxattr(fn)
2722 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002723
Larry Hastings9cf065c2012-06-22 16:30:09 -07002724 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002725 xattr = set(init_xattr)
2726 xattr.add("user.test")
2727 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002728 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2729 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2730 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002731
Benjamin Peterson799bd802011-08-31 22:15:17 -04002732 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002733 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002734 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002735
Benjamin Peterson799bd802011-08-31 22:15:17 -04002736 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002737 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002738 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002739
Larry Hastings9cf065c2012-06-22 16:30:09 -07002740 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002741 xattr.add("user.test2")
2742 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002743 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002744
Benjamin Peterson799bd802011-08-31 22:15:17 -04002745 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002746 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002747 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002748
Victor Stinnerf12e5062011-10-16 22:12:03 +02002749 xattr.remove("user.test")
2750 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002751 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2752 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2753 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2754 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002755 many = sorted("user.test{}".format(i) for i in range(100))
2756 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002757 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002758 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002759
Larry Hastings9cf065c2012-06-22 16:30:09 -07002760 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002761 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002762 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002763
2764 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2765 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002766
2767 def test_simple(self):
2768 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2769 os.listxattr)
2770
2771 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002772 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2773 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002774
2775 def test_fds(self):
2776 def getxattr(path, *args):
2777 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002778 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002779 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002780 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002781 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002782 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002783 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002784 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002785 def listxattr(path, *args):
2786 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002787 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002788 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2789
2790
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002791@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2792class TermsizeTests(unittest.TestCase):
2793 def test_does_not_crash(self):
2794 """Check if get_terminal_size() returns a meaningful value.
2795
2796 There's no easy portable way to actually check the size of the
2797 terminal, so let's check if it returns something sensible instead.
2798 """
2799 try:
2800 size = os.get_terminal_size()
2801 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002802 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002803 # Under win32 a generic OSError can be thrown if the
2804 # handle cannot be retrieved
2805 self.skipTest("failed to query terminal size")
2806 raise
2807
Antoine Pitroucfade362012-02-08 23:48:59 +01002808 self.assertGreaterEqual(size.columns, 0)
2809 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002810
2811 def test_stty_match(self):
2812 """Check if stty returns the same results
2813
2814 stty actually tests stdin, so get_terminal_size is invoked on
2815 stdin explicitly. If stty succeeded, then get_terminal_size()
2816 should work too.
2817 """
2818 try:
2819 size = subprocess.check_output(['stty', 'size']).decode().split()
2820 except (FileNotFoundError, subprocess.CalledProcessError):
2821 self.skipTest("stty invocation failed")
2822 expected = (int(size[1]), int(size[0])) # reversed order
2823
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002824 try:
2825 actual = os.get_terminal_size(sys.__stdin__.fileno())
2826 except OSError as e:
2827 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2828 # Under win32 a generic OSError can be thrown if the
2829 # handle cannot be retrieved
2830 self.skipTest("failed to query terminal size")
2831 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002832 self.assertEqual(expected, actual)
2833
2834
Victor Stinner292c8352012-10-30 02:17:38 +01002835class OSErrorTests(unittest.TestCase):
2836 def setUp(self):
2837 class Str(str):
2838 pass
2839
Victor Stinnerafe17062012-10-31 22:47:43 +01002840 self.bytes_filenames = []
2841 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002842 if support.TESTFN_UNENCODABLE is not None:
2843 decoded = support.TESTFN_UNENCODABLE
2844 else:
2845 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002846 self.unicode_filenames.append(decoded)
2847 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002848 if support.TESTFN_UNDECODABLE is not None:
2849 encoded = support.TESTFN_UNDECODABLE
2850 else:
2851 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002852 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002853 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002854 self.bytes_filenames.append(memoryview(encoded))
2855
2856 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002857
2858 def test_oserror_filename(self):
2859 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002860 (self.filenames, os.chdir,),
2861 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002862 (self.filenames, os.lstat,),
2863 (self.filenames, os.open, os.O_RDONLY),
2864 (self.filenames, os.rmdir,),
2865 (self.filenames, os.stat,),
2866 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002867 ]
2868 if sys.platform == "win32":
2869 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002870 (self.bytes_filenames, os.rename, b"dst"),
2871 (self.bytes_filenames, os.replace, b"dst"),
2872 (self.unicode_filenames, os.rename, "dst"),
2873 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002874 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002875 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002876 else:
2877 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002878 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002879 (self.filenames, os.rename, "dst"),
2880 (self.filenames, os.replace, "dst"),
2881 ))
2882 if hasattr(os, "chown"):
2883 funcs.append((self.filenames, os.chown, 0, 0))
2884 if hasattr(os, "lchown"):
2885 funcs.append((self.filenames, os.lchown, 0, 0))
2886 if hasattr(os, "truncate"):
2887 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002888 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002889 funcs.append((self.filenames, os.chflags, 0))
2890 if hasattr(os, "lchflags"):
2891 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002892 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002893 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002894 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002895 if sys.platform == "win32":
2896 funcs.append((self.bytes_filenames, os.link, b"dst"))
2897 funcs.append((self.unicode_filenames, os.link, "dst"))
2898 else:
2899 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002900 if hasattr(os, "listxattr"):
2901 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002902 (self.filenames, os.listxattr,),
2903 (self.filenames, os.getxattr, "user.test"),
2904 (self.filenames, os.setxattr, "user.test", b'user'),
2905 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002906 ))
2907 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002908 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002909 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002910 if sys.platform == "win32":
2911 funcs.append((self.unicode_filenames, os.readlink,))
2912 else:
2913 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002914
Steve Dowercc16be82016-09-08 10:35:16 -07002915
Victor Stinnerafe17062012-10-31 22:47:43 +01002916 for filenames, func, *func_args in funcs:
2917 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002918 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002919 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002920 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002921 else:
2922 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2923 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002924 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002925 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08002926 except UnicodeDecodeError:
2927 pass
Victor Stinner292c8352012-10-30 02:17:38 +01002928 else:
2929 self.fail("No exception thrown by {}".format(func))
2930
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002931class CPUCountTests(unittest.TestCase):
2932 def test_cpu_count(self):
2933 cpus = os.cpu_count()
2934 if cpus is not None:
2935 self.assertIsInstance(cpus, int)
2936 self.assertGreater(cpus, 0)
2937 else:
2938 self.skipTest("Could not determine the number of CPUs")
2939
Victor Stinnerdaf45552013-08-28 00:53:59 +02002940
2941class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002942 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002943 fd = os.open(__file__, os.O_RDONLY)
2944 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002945 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002946
Victor Stinnerdaf45552013-08-28 00:53:59 +02002947 os.set_inheritable(fd, True)
2948 self.assertEqual(os.get_inheritable(fd), True)
2949
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002950 @unittest.skipIf(fcntl is None, "need fcntl")
2951 def test_get_inheritable_cloexec(self):
2952 fd = os.open(__file__, os.O_RDONLY)
2953 self.addCleanup(os.close, fd)
2954 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002955
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002956 # clear FD_CLOEXEC flag
2957 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2958 flags &= ~fcntl.FD_CLOEXEC
2959 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002960
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002961 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002962
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002963 @unittest.skipIf(fcntl is None, "need fcntl")
2964 def test_set_inheritable_cloexec(self):
2965 fd = os.open(__file__, os.O_RDONLY)
2966 self.addCleanup(os.close, fd)
2967 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2968 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002969
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002970 os.set_inheritable(fd, True)
2971 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2972 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002973
Victor Stinnerdaf45552013-08-28 00:53:59 +02002974 def test_open(self):
2975 fd = os.open(__file__, os.O_RDONLY)
2976 self.addCleanup(os.close, fd)
2977 self.assertEqual(os.get_inheritable(fd), False)
2978
2979 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2980 def test_pipe(self):
2981 rfd, wfd = os.pipe()
2982 self.addCleanup(os.close, rfd)
2983 self.addCleanup(os.close, wfd)
2984 self.assertEqual(os.get_inheritable(rfd), False)
2985 self.assertEqual(os.get_inheritable(wfd), False)
2986
2987 def test_dup(self):
2988 fd1 = os.open(__file__, os.O_RDONLY)
2989 self.addCleanup(os.close, fd1)
2990
2991 fd2 = os.dup(fd1)
2992 self.addCleanup(os.close, fd2)
2993 self.assertEqual(os.get_inheritable(fd2), False)
2994
2995 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2996 def test_dup2(self):
2997 fd = os.open(__file__, os.O_RDONLY)
2998 self.addCleanup(os.close, fd)
2999
3000 # inheritable by default
3001 fd2 = os.open(__file__, os.O_RDONLY)
3002 try:
3003 os.dup2(fd, fd2)
3004 self.assertEqual(os.get_inheritable(fd2), True)
3005 finally:
3006 os.close(fd2)
3007
3008 # force non-inheritable
3009 fd3 = os.open(__file__, os.O_RDONLY)
3010 try:
3011 os.dup2(fd, fd3, inheritable=False)
3012 self.assertEqual(os.get_inheritable(fd3), False)
3013 finally:
3014 os.close(fd3)
3015
3016 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3017 def test_openpty(self):
3018 master_fd, slave_fd = os.openpty()
3019 self.addCleanup(os.close, master_fd)
3020 self.addCleanup(os.close, slave_fd)
3021 self.assertEqual(os.get_inheritable(master_fd), False)
3022 self.assertEqual(os.get_inheritable(slave_fd), False)
3023
3024
Brett Cannon3f9183b2016-08-26 14:44:48 -07003025class PathTConverterTests(unittest.TestCase):
3026 # tuples of (function name, allows fd arguments, additional arguments to
3027 # function, cleanup function)
3028 functions = [
3029 ('stat', True, (), None),
3030 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003031 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003032 ('chflags', False, (0,), None),
3033 ('lchflags', False, (0,), None),
3034 ('open', False, (0,), getattr(os, 'close', None)),
3035 ]
3036
3037 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003038 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003039 if os.name == 'nt':
3040 bytes_fspath = bytes_filename = None
3041 else:
3042 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003043 bytes_fspath = _PathLike(bytes_filename)
3044 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003045 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003046 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003047
Brett Cannonec6ce872016-09-06 15:50:29 -07003048 int_fspath = _PathLike(fd)
3049 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003050
3051 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3052 with self.subTest(name=name):
3053 try:
3054 fn = getattr(os, name)
3055 except AttributeError:
3056 continue
3057
Brett Cannon8f96a302016-08-26 19:30:11 -07003058 for path in (str_filename, bytes_filename, str_fspath,
3059 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003060 if path is None:
3061 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003062 with self.subTest(name=name, path=path):
3063 result = fn(path, *extra_args)
3064 if cleanup_fn is not None:
3065 cleanup_fn(result)
3066
3067 with self.assertRaisesRegex(
3068 TypeError, 'should be string, bytes'):
3069 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003070
3071 if allow_fd:
3072 result = fn(fd, *extra_args) # should not fail
3073 if cleanup_fn is not None:
3074 cleanup_fn(result)
3075 else:
3076 with self.assertRaisesRegex(
3077 TypeError,
3078 'os.PathLike'):
3079 fn(fd, *extra_args)
3080
3081
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003082@unittest.skipUnless(hasattr(os, 'get_blocking'),
3083 'needs os.get_blocking() and os.set_blocking()')
3084class BlockingTests(unittest.TestCase):
3085 def test_blocking(self):
3086 fd = os.open(__file__, os.O_RDONLY)
3087 self.addCleanup(os.close, fd)
3088 self.assertEqual(os.get_blocking(fd), True)
3089
3090 os.set_blocking(fd, False)
3091 self.assertEqual(os.get_blocking(fd), False)
3092
3093 os.set_blocking(fd, True)
3094 self.assertEqual(os.get_blocking(fd), True)
3095
3096
Yury Selivanov97e2e062014-09-26 12:33:06 -04003097
3098class ExportsTests(unittest.TestCase):
3099 def test_os_all(self):
3100 self.assertIn('open', os.__all__)
3101 self.assertIn('walk', os.__all__)
3102
3103
Victor Stinner6036e442015-03-08 01:58:04 +01003104class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003105 check_no_resource_warning = support.check_no_resource_warning
3106
Victor Stinner6036e442015-03-08 01:58:04 +01003107 def setUp(self):
3108 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003109 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003110 self.addCleanup(support.rmtree, self.path)
3111 os.mkdir(self.path)
3112
3113 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003114 path = self.bytes_path if isinstance(name, bytes) else self.path
3115 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003116 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003117 return filename
3118
3119 def get_entries(self, names):
3120 entries = dict((entry.name, entry)
3121 for entry in os.scandir(self.path))
3122 self.assertEqual(sorted(entries.keys()), names)
3123 return entries
3124
3125 def assert_stat_equal(self, stat1, stat2, skip_fields):
3126 if skip_fields:
3127 for attr in dir(stat1):
3128 if not attr.startswith("st_"):
3129 continue
3130 if attr in ("st_dev", "st_ino", "st_nlink"):
3131 continue
3132 self.assertEqual(getattr(stat1, attr),
3133 getattr(stat2, attr),
3134 (stat1, stat2, attr))
3135 else:
3136 self.assertEqual(stat1, stat2)
3137
3138 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003139 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003140 self.assertEqual(entry.name, name)
3141 self.assertEqual(entry.path, os.path.join(self.path, name))
3142 self.assertEqual(entry.inode(),
3143 os.stat(entry.path, follow_symlinks=False).st_ino)
3144
3145 entry_stat = os.stat(entry.path)
3146 self.assertEqual(entry.is_dir(),
3147 stat.S_ISDIR(entry_stat.st_mode))
3148 self.assertEqual(entry.is_file(),
3149 stat.S_ISREG(entry_stat.st_mode))
3150 self.assertEqual(entry.is_symlink(),
3151 os.path.islink(entry.path))
3152
3153 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3154 self.assertEqual(entry.is_dir(follow_symlinks=False),
3155 stat.S_ISDIR(entry_lstat.st_mode))
3156 self.assertEqual(entry.is_file(follow_symlinks=False),
3157 stat.S_ISREG(entry_lstat.st_mode))
3158
3159 self.assert_stat_equal(entry.stat(),
3160 entry_stat,
3161 os.name == 'nt' and not is_symlink)
3162 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3163 entry_lstat,
3164 os.name == 'nt')
3165
3166 def test_attributes(self):
3167 link = hasattr(os, 'link')
3168 symlink = support.can_symlink()
3169
3170 dirname = os.path.join(self.path, "dir")
3171 os.mkdir(dirname)
3172 filename = self.create_file("file.txt")
3173 if link:
3174 os.link(filename, os.path.join(self.path, "link_file.txt"))
3175 if symlink:
3176 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3177 target_is_directory=True)
3178 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3179
3180 names = ['dir', 'file.txt']
3181 if link:
3182 names.append('link_file.txt')
3183 if symlink:
3184 names.extend(('symlink_dir', 'symlink_file.txt'))
3185 entries = self.get_entries(names)
3186
3187 entry = entries['dir']
3188 self.check_entry(entry, 'dir', True, False, False)
3189
3190 entry = entries['file.txt']
3191 self.check_entry(entry, 'file.txt', False, True, False)
3192
3193 if link:
3194 entry = entries['link_file.txt']
3195 self.check_entry(entry, 'link_file.txt', False, True, False)
3196
3197 if symlink:
3198 entry = entries['symlink_dir']
3199 self.check_entry(entry, 'symlink_dir', True, False, True)
3200
3201 entry = entries['symlink_file.txt']
3202 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3203
3204 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003205 path = self.bytes_path if isinstance(name, bytes) else self.path
3206 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003207 self.assertEqual(len(entries), 1)
3208
3209 entry = entries[0]
3210 self.assertEqual(entry.name, name)
3211 return entry
3212
Brett Cannon96881cd2016-06-10 14:37:21 -07003213 def create_file_entry(self, name='file.txt'):
3214 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003215 return self.get_entry(os.path.basename(filename))
3216
3217 def test_current_directory(self):
3218 filename = self.create_file()
3219 old_dir = os.getcwd()
3220 try:
3221 os.chdir(self.path)
3222
3223 # call scandir() without parameter: it must list the content
3224 # of the current directory
3225 entries = dict((entry.name, entry) for entry in os.scandir())
3226 self.assertEqual(sorted(entries.keys()),
3227 [os.path.basename(filename)])
3228 finally:
3229 os.chdir(old_dir)
3230
3231 def test_repr(self):
3232 entry = self.create_file_entry()
3233 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3234
Brett Cannon96881cd2016-06-10 14:37:21 -07003235 def test_fspath_protocol(self):
3236 entry = self.create_file_entry()
3237 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3238
3239 def test_fspath_protocol_bytes(self):
3240 bytes_filename = os.fsencode('bytesfile.txt')
3241 bytes_entry = self.create_file_entry(name=bytes_filename)
3242 fspath = os.fspath(bytes_entry)
3243 self.assertIsInstance(fspath, bytes)
3244 self.assertEqual(fspath,
3245 os.path.join(os.fsencode(self.path),bytes_filename))
3246
Victor Stinner6036e442015-03-08 01:58:04 +01003247 def test_removed_dir(self):
3248 path = os.path.join(self.path, 'dir')
3249
3250 os.mkdir(path)
3251 entry = self.get_entry('dir')
3252 os.rmdir(path)
3253
3254 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3255 if os.name == 'nt':
3256 self.assertTrue(entry.is_dir())
3257 self.assertFalse(entry.is_file())
3258 self.assertFalse(entry.is_symlink())
3259 if os.name == 'nt':
3260 self.assertRaises(FileNotFoundError, entry.inode)
3261 # don't fail
3262 entry.stat()
3263 entry.stat(follow_symlinks=False)
3264 else:
3265 self.assertGreater(entry.inode(), 0)
3266 self.assertRaises(FileNotFoundError, entry.stat)
3267 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3268
3269 def test_removed_file(self):
3270 entry = self.create_file_entry()
3271 os.unlink(entry.path)
3272
3273 self.assertFalse(entry.is_dir())
3274 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3275 if os.name == 'nt':
3276 self.assertTrue(entry.is_file())
3277 self.assertFalse(entry.is_symlink())
3278 if os.name == 'nt':
3279 self.assertRaises(FileNotFoundError, entry.inode)
3280 # don't fail
3281 entry.stat()
3282 entry.stat(follow_symlinks=False)
3283 else:
3284 self.assertGreater(entry.inode(), 0)
3285 self.assertRaises(FileNotFoundError, entry.stat)
3286 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3287
3288 def test_broken_symlink(self):
3289 if not support.can_symlink():
3290 return self.skipTest('cannot create symbolic link')
3291
3292 filename = self.create_file("file.txt")
3293 os.symlink(filename,
3294 os.path.join(self.path, "symlink.txt"))
3295 entries = self.get_entries(['file.txt', 'symlink.txt'])
3296 entry = entries['symlink.txt']
3297 os.unlink(filename)
3298
3299 self.assertGreater(entry.inode(), 0)
3300 self.assertFalse(entry.is_dir())
3301 self.assertFalse(entry.is_file()) # broken symlink returns False
3302 self.assertFalse(entry.is_dir(follow_symlinks=False))
3303 self.assertFalse(entry.is_file(follow_symlinks=False))
3304 self.assertTrue(entry.is_symlink())
3305 self.assertRaises(FileNotFoundError, entry.stat)
3306 # don't fail
3307 entry.stat(follow_symlinks=False)
3308
3309 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003310 self.create_file("file.txt")
3311
3312 path_bytes = os.fsencode(self.path)
3313 entries = list(os.scandir(path_bytes))
3314 self.assertEqual(len(entries), 1, entries)
3315 entry = entries[0]
3316
3317 self.assertEqual(entry.name, b'file.txt')
3318 self.assertEqual(entry.path,
3319 os.fsencode(os.path.join(self.path, 'file.txt')))
3320
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003321 @unittest.skipUnless(os.listdir in os.supports_fd,
3322 'fd support for listdir required for this test.')
3323 def test_fd(self):
3324 self.assertIn(os.scandir, os.supports_fd)
3325 self.create_file('file.txt')
3326 expected_names = ['file.txt']
3327 if support.can_symlink():
3328 os.symlink('file.txt', os.path.join(self.path, 'link'))
3329 expected_names.append('link')
3330
3331 fd = os.open(self.path, os.O_RDONLY)
3332 try:
3333 with os.scandir(fd) as it:
3334 entries = list(it)
3335 names = [entry.name for entry in entries]
3336 self.assertEqual(sorted(names), expected_names)
3337 self.assertEqual(names, os.listdir(fd))
3338 for entry in entries:
3339 self.assertEqual(entry.path, entry.name)
3340 self.assertEqual(os.fspath(entry), entry.name)
3341 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3342 if os.stat in os.supports_dir_fd:
3343 st = os.stat(entry.name, dir_fd=fd)
3344 self.assertEqual(entry.stat(), st)
3345 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3346 self.assertEqual(entry.stat(follow_symlinks=False), st)
3347 finally:
3348 os.close(fd)
3349
Victor Stinner6036e442015-03-08 01:58:04 +01003350 def test_empty_path(self):
3351 self.assertRaises(FileNotFoundError, os.scandir, '')
3352
3353 def test_consume_iterator_twice(self):
3354 self.create_file("file.txt")
3355 iterator = os.scandir(self.path)
3356
3357 entries = list(iterator)
3358 self.assertEqual(len(entries), 1, entries)
3359
3360 # check than consuming the iterator twice doesn't raise exception
3361 entries2 = list(iterator)
3362 self.assertEqual(len(entries2), 0, entries2)
3363
3364 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003365 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003366 self.assertRaises(TypeError, os.scandir, obj)
3367
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003368 def test_close(self):
3369 self.create_file("file.txt")
3370 self.create_file("file2.txt")
3371 iterator = os.scandir(self.path)
3372 next(iterator)
3373 iterator.close()
3374 # multiple closes
3375 iterator.close()
3376 with self.check_no_resource_warning():
3377 del iterator
3378
3379 def test_context_manager(self):
3380 self.create_file("file.txt")
3381 self.create_file("file2.txt")
3382 with os.scandir(self.path) as iterator:
3383 next(iterator)
3384 with self.check_no_resource_warning():
3385 del iterator
3386
3387 def test_context_manager_close(self):
3388 self.create_file("file.txt")
3389 self.create_file("file2.txt")
3390 with os.scandir(self.path) as iterator:
3391 next(iterator)
3392 iterator.close()
3393
3394 def test_context_manager_exception(self):
3395 self.create_file("file.txt")
3396 self.create_file("file2.txt")
3397 with self.assertRaises(ZeroDivisionError):
3398 with os.scandir(self.path) as iterator:
3399 next(iterator)
3400 1/0
3401 with self.check_no_resource_warning():
3402 del iterator
3403
3404 def test_resource_warning(self):
3405 self.create_file("file.txt")
3406 self.create_file("file2.txt")
3407 iterator = os.scandir(self.path)
3408 next(iterator)
3409 with self.assertWarns(ResourceWarning):
3410 del iterator
3411 support.gc_collect()
3412 # exhausted iterator
3413 iterator = os.scandir(self.path)
3414 list(iterator)
3415 with self.check_no_resource_warning():
3416 del iterator
3417
Victor Stinner6036e442015-03-08 01:58:04 +01003418
Ethan Furmancdc08792016-06-02 15:06:09 -07003419class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003420
3421 # Abstracted so it can be overridden to test pure Python implementation
3422 # if a C version is provided.
3423 fspath = staticmethod(os.fspath)
3424
Ethan Furmancdc08792016-06-02 15:06:09 -07003425 def test_return_bytes(self):
3426 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003427 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003428
3429 def test_return_string(self):
3430 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003431 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003432
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003433 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003434 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003435 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003436
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003437 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003438 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3439 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3440
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003441 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003442 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3443 self.assertTrue(issubclass(_PathLike, os.PathLike))
3444 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003445
Ethan Furmancdc08792016-06-02 15:06:09 -07003446 def test_garbage_in_exception_out(self):
3447 vapor = type('blah', (), {})
3448 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003449 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003450
3451 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003452 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003453
Brett Cannon044283a2016-07-15 10:41:49 -07003454 def test_bad_pathlike(self):
3455 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003456 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003457 # __fspath__ attribute that is not callable.
3458 c = type('foo', (), {})
3459 c.__fspath__ = 1
3460 self.assertRaises(TypeError, self.fspath, c())
3461 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003462 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003463 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003464
3465# Only test if the C version is provided, otherwise TestPEP519 already tested
3466# the pure Python implementation.
3467if hasattr(os, "_fspath"):
3468 class TestPEP519PurePython(TestPEP519):
3469
3470 """Explicitly test the pure Python implementation of os.fspath()."""
3471
3472 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003473
3474
Fred Drake2e2be372001-09-20 21:33:42 +00003475if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003476 unittest.main()