blob: 1746ca4525da40538035230e0dc9edc18bd33aef [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
59except ImportError:
60 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
Berker Peksag4af23d72016-09-15 20:32:44 +030093def requires_os_func(name):
94 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
95
96
Brett Cannonec6ce872016-09-06 15:50:29 -070097class _PathLike(os.PathLike):
98
99 def __init__(self, path=""):
100 self.path = path
101
102 def __str__(self):
103 return str(self.path)
104
105 def __fspath__(self):
106 if isinstance(self.path, BaseException):
107 raise self.path
108 else:
109 return self.path
110
111
Victor Stinnerae39d232016-03-24 17:12:55 +0100112def create_file(filename, content=b'content'):
113 with open(filename, "xb", 0) as fp:
114 fp.write(content)
115
116
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117# Tests creating TESTFN
118class FileTests(unittest.TestCase):
119 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000120 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000122 tearDown = setUp
123
124 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000125 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000126 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000127 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000128
Christian Heimesfdab48e2008-01-20 09:06:41 +0000129 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000130 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
131 # We must allocate two consecutive file descriptors, otherwise
132 # it will mess up other file descriptors (perhaps even the three
133 # standard ones).
134 second = os.dup(first)
135 try:
136 retries = 0
137 while second != first + 1:
138 os.close(first)
139 retries += 1
140 if retries > 10:
141 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000142 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000143 first, second = second, os.dup(second)
144 finally:
145 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000146 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000147 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000148 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000149
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000150 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000151 def test_rename(self):
152 path = support.TESTFN
153 old = sys.getrefcount(path)
154 self.assertRaises(TypeError, os.rename, path, 0)
155 new = sys.getrefcount(path)
156 self.assertEqual(old, new)
157
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000158 def test_read(self):
159 with open(support.TESTFN, "w+b") as fobj:
160 fobj.write(b"spam")
161 fobj.flush()
162 fd = fobj.fileno()
163 os.lseek(fd, 0, 0)
164 s = os.read(fd, 4)
165 self.assertEqual(type(s), bytes)
166 self.assertEqual(s, b"spam")
167
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200168 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200169 # Skip the test on 32-bit platforms: the number of bytes must fit in a
170 # Py_ssize_t type
171 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
172 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200173 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
174 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200175 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100176 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200177
178 # Issue #21932: Make sure that os.read() does not raise an
179 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200180 with open(support.TESTFN, "rb") as fp:
181 data = os.read(fp.fileno(), size)
182
183 # The test does not try to read more than 2 GB at once because the
184 # operating system is free to return less bytes than requested.
185 self.assertEqual(data, b'test')
186
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000187 def test_write(self):
188 # os.write() accepts bytes- and buffer-like objects but not strings
189 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
190 self.assertRaises(TypeError, os.write, fd, "beans")
191 os.write(fd, b"bacon\n")
192 os.write(fd, bytearray(b"eggs\n"))
193 os.write(fd, memoryview(b"spam\n"))
194 os.close(fd)
195 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000196 self.assertEqual(fobj.read().splitlines(),
197 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000198
Victor Stinnere0daff12011-03-20 23:36:35 +0100199 def write_windows_console(self, *args):
200 retcode = subprocess.call(args,
201 # use a new console to not flood the test output
202 creationflags=subprocess.CREATE_NEW_CONSOLE,
203 # use a shell to hide the console window (SW_HIDE)
204 shell=True)
205 self.assertEqual(retcode, 0)
206
207 @unittest.skipUnless(sys.platform == 'win32',
208 'test specific to the Windows console')
209 def test_write_windows_console(self):
210 # Issue #11395: the Windows console returns an error (12: not enough
211 # space error) on writing into stdout if stdout mode is binary and the
212 # length is greater than 66,000 bytes (or less, depending on heap
213 # usage).
214 code = "print('x' * 100000)"
215 self.write_windows_console(sys.executable, "-c", code)
216 self.write_windows_console(sys.executable, "-u", "-c", code)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 def fdopen_helper(self, *args):
219 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200220 f = os.fdopen(fd, *args)
221 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000222
223 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200224 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
225 os.close(fd)
226
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000227 self.fdopen_helper()
228 self.fdopen_helper('r')
229 self.fdopen_helper('r', 100)
230
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100231 def test_replace(self):
232 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100233 self.addCleanup(support.unlink, support.TESTFN)
234 self.addCleanup(support.unlink, TESTFN2)
235
236 create_file(support.TESTFN, b"1")
237 create_file(TESTFN2, b"2")
238
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100239 os.replace(support.TESTFN, TESTFN2)
240 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
241 with open(TESTFN2, 'r') as f:
242 self.assertEqual(f.read(), "1")
243
Martin Panterbf19d162015-09-09 01:01:13 +0000244 def test_open_keywords(self):
245 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
246 dir_fd=None)
247 os.close(f)
248
249 def test_symlink_keywords(self):
250 symlink = support.get_attribute(os, "symlink")
251 try:
252 symlink(src='target', dst=support.TESTFN,
253 target_is_directory=False, dir_fd=None)
254 except (NotImplementedError, OSError):
255 pass # No OS support or unprivileged user
256
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200257
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258# Test attributes on return values from os.*stat* family.
259class StatAttributeTests(unittest.TestCase):
260 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200261 self.fname = support.TESTFN
262 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100263 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000267 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268
269 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000270 self.assertEqual(result[stat.ST_SIZE], 3)
271 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 # Make sure all the attributes are there
274 members = dir(result)
275 for name in dir(stat):
276 if name[:3] == 'ST_':
277 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000278 if name.endswith("TIME"):
279 def trunc(x): return int(x)
280 else:
281 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000282 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285
Larry Hastings6fe20b32012-04-19 15:07:49 -0700286 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700287 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700288 for name in 'st_atime st_mtime st_ctime'.split():
289 floaty = int(getattr(result, name) * 100000)
290 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700291 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700292
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 try:
294 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except IndexError:
297 pass
298
299 # Make sure that assignment fails
300 try:
301 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200302 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000303 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000304 pass
305
306 try:
307 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200308 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000309 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000310 pass
311
312 try:
313 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200314 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315 except AttributeError:
316 pass
317
318 # Use the stat_result constructor with a too-short tuple.
319 try:
320 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200321 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322 except TypeError:
323 pass
324
Ezio Melotti42da6632011-03-15 05:18:48 +0200325 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000326 try:
327 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
328 except TypeError:
329 pass
330
Antoine Pitrou38425292010-09-21 18:19:07 +0000331 def test_stat_attributes(self):
332 self.check_stat_attributes(self.fname)
333
334 def test_stat_attributes_bytes(self):
335 try:
336 fname = self.fname.encode(sys.getfilesystemencoding())
337 except UnicodeEncodeError:
338 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700339 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000340
Christian Heimes25827622013-10-12 01:27:08 +0200341 def test_stat_result_pickle(self):
342 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200343 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
344 p = pickle.dumps(result, proto)
345 self.assertIn(b'stat_result', p)
346 if proto < 4:
347 self.assertIn(b'cos\nstat_result\n', p)
348 unpickled = pickle.loads(p)
349 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200350
Serhiy Storchaka43767632013-11-03 21:31:38 +0200351 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000352 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000353 try:
354 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000355 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000357 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200358 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359
360 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000361 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000363 # Make sure all the attributes are there.
364 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
365 'ffree', 'favail', 'flag', 'namemax')
366 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000367 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000368
369 # Make sure that assignment really fails
370 try:
371 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200372 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000373 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374 pass
375
376 try:
377 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200378 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000379 except AttributeError:
380 pass
381
382 # Use the constructor with a too-short tuple.
383 try:
384 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200385 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386 except TypeError:
387 pass
388
Ezio Melotti42da6632011-03-15 05:18:48 +0200389 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000390 try:
391 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
392 except TypeError:
393 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000394
Christian Heimes25827622013-10-12 01:27:08 +0200395 @unittest.skipUnless(hasattr(os, 'statvfs'),
396 "need os.statvfs()")
397 def test_statvfs_result_pickle(self):
398 try:
399 result = os.statvfs(self.fname)
400 except OSError as e:
401 # On AtheOS, glibc always returns ENOSYS
402 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200403 self.skipTest('os.statvfs() failed with ENOSYS')
404
Serhiy Storchakabad12572014-12-15 14:03:42 +0200405 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
406 p = pickle.dumps(result, proto)
407 self.assertIn(b'statvfs_result', p)
408 if proto < 4:
409 self.assertIn(b'cos\nstatvfs_result\n', p)
410 unpickled = pickle.loads(p)
411 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200412
Serhiy Storchaka43767632013-11-03 21:31:38 +0200413 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
414 def test_1686475(self):
415 # Verify that an open file can be stat'ed
416 try:
417 os.stat(r"c:\pagefile.sys")
418 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600419 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200420 except OSError as e:
421 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000422
Serhiy Storchaka43767632013-11-03 21:31:38 +0200423 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
424 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
425 def test_15261(self):
426 # Verify that stat'ing a closed fd does not cause crash
427 r, w = os.pipe()
428 try:
429 os.stat(r) # should not raise error
430 finally:
431 os.close(r)
432 os.close(w)
433 with self.assertRaises(OSError) as ctx:
434 os.stat(r)
435 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100436
Zachary Ware63f277b2014-06-19 09:46:37 -0500437 def check_file_attributes(self, result):
438 self.assertTrue(hasattr(result, 'st_file_attributes'))
439 self.assertTrue(isinstance(result.st_file_attributes, int))
440 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
441
442 @unittest.skipUnless(sys.platform == "win32",
443 "st_file_attributes is Win32 specific")
444 def test_file_attributes(self):
445 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
446 result = os.stat(self.fname)
447 self.check_file_attributes(result)
448 self.assertEqual(
449 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
450 0)
451
452 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200453 dirname = support.TESTFN + "dir"
454 os.mkdir(dirname)
455 self.addCleanup(os.rmdir, dirname)
456
457 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500458 self.check_file_attributes(result)
459 self.assertEqual(
460 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
461 stat.FILE_ATTRIBUTE_DIRECTORY)
462
Berker Peksag0b4dc482016-09-17 15:49:59 +0300463 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
464 def test_access_denied(self):
465 # Default to FindFirstFile WIN32_FIND_DATA when access is
466 # denied. See issue 28075.
467 # os.environ['TEMP'] should be located on a volume that
468 # supports file ACLs.
469 fname = os.path.join(os.environ['TEMP'], self.fname)
470 self.addCleanup(support.unlink, fname)
471 create_file(fname, b'ABC')
472 # Deny the right to [S]YNCHRONIZE on the file to
473 # force CreateFile to fail with ERROR_ACCESS_DENIED.
474 DETACHED_PROCESS = 8
475 subprocess.check_call(
476 ['icacls.exe', fname, '/deny', 'Users:(S)'],
477 creationflags=DETACHED_PROCESS
478 )
479 result = os.stat(fname)
480 self.assertNotEqual(result.st_size, 0)
481
Victor Stinner47aacc82015-06-12 17:26:23 +0200482
483class UtimeTests(unittest.TestCase):
484 def setUp(self):
485 self.dirname = support.TESTFN
486 self.fname = os.path.join(self.dirname, "f1")
487
488 self.addCleanup(support.rmtree, self.dirname)
489 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100490 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200491
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200492 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100493 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200494 os.stat_float_times(state)
495
Victor Stinner47aacc82015-06-12 17:26:23 +0200496 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100497 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200498 old_float_times = os.stat_float_times(-1)
499 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200500
501 os.stat_float_times(True)
502
503 def support_subsecond(self, filename):
504 # Heuristic to check if the filesystem supports timestamp with
505 # subsecond resolution: check if float and int timestamps are different
506 st = os.stat(filename)
507 return ((st.st_atime != st[7])
508 or (st.st_mtime != st[8])
509 or (st.st_ctime != st[9]))
510
511 def _test_utime(self, set_time, filename=None):
512 if not filename:
513 filename = self.fname
514
515 support_subsecond = self.support_subsecond(filename)
516 if support_subsecond:
517 # Timestamp with a resolution of 1 microsecond (10^-6).
518 #
519 # The resolution of the C internal function used by os.utime()
520 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
521 # test with a resolution of 1 ns requires more work:
522 # see the issue #15745.
523 atime_ns = 1002003000 # 1.002003 seconds
524 mtime_ns = 4005006000 # 4.005006 seconds
525 else:
526 # use a resolution of 1 second
527 atime_ns = 5 * 10**9
528 mtime_ns = 8 * 10**9
529
530 set_time(filename, (atime_ns, mtime_ns))
531 st = os.stat(filename)
532
533 if support_subsecond:
534 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
535 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
536 else:
537 self.assertEqual(st.st_atime, atime_ns * 1e-9)
538 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
539 self.assertEqual(st.st_atime_ns, atime_ns)
540 self.assertEqual(st.st_mtime_ns, mtime_ns)
541
542 def test_utime(self):
543 def set_time(filename, ns):
544 # test the ns keyword parameter
545 os.utime(filename, ns=ns)
546 self._test_utime(set_time)
547
548 @staticmethod
549 def ns_to_sec(ns):
550 # Convert a number of nanosecond (int) to a number of seconds (float).
551 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
552 # issue, os.utime() rounds towards minus infinity.
553 return (ns * 1e-9) + 0.5e-9
554
555 def test_utime_by_indexed(self):
556 # pass times as floating point seconds as the second indexed parameter
557 def set_time(filename, ns):
558 atime_ns, mtime_ns = ns
559 atime = self.ns_to_sec(atime_ns)
560 mtime = self.ns_to_sec(mtime_ns)
561 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
562 # or utime(time_t)
563 os.utime(filename, (atime, mtime))
564 self._test_utime(set_time)
565
566 def test_utime_by_times(self):
567 def set_time(filename, ns):
568 atime_ns, mtime_ns = ns
569 atime = self.ns_to_sec(atime_ns)
570 mtime = self.ns_to_sec(mtime_ns)
571 # test the times keyword parameter
572 os.utime(filename, times=(atime, mtime))
573 self._test_utime(set_time)
574
575 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
576 "follow_symlinks support for utime required "
577 "for this test.")
578 def test_utime_nofollow_symlinks(self):
579 def set_time(filename, ns):
580 # use follow_symlinks=False to test utimensat(timespec)
581 # or lutimes(timeval)
582 os.utime(filename, ns=ns, follow_symlinks=False)
583 self._test_utime(set_time)
584
585 @unittest.skipUnless(os.utime in os.supports_fd,
586 "fd support for utime required for this test.")
587 def test_utime_fd(self):
588 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100589 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200590 # use a file descriptor to test futimens(timespec)
591 # or futimes(timeval)
592 os.utime(fp.fileno(), ns=ns)
593 self._test_utime(set_time)
594
595 @unittest.skipUnless(os.utime in os.supports_dir_fd,
596 "dir_fd support for utime required for this test.")
597 def test_utime_dir_fd(self):
598 def set_time(filename, ns):
599 dirname, name = os.path.split(filename)
600 dirfd = os.open(dirname, os.O_RDONLY)
601 try:
602 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
603 os.utime(name, dir_fd=dirfd, ns=ns)
604 finally:
605 os.close(dirfd)
606 self._test_utime(set_time)
607
608 def test_utime_directory(self):
609 def set_time(filename, ns):
610 # test calling os.utime() on a directory
611 os.utime(filename, ns=ns)
612 self._test_utime(set_time, filename=self.dirname)
613
614 def _test_utime_current(self, set_time):
615 # Get the system clock
616 current = time.time()
617
618 # Call os.utime() to set the timestamp to the current system clock
619 set_time(self.fname)
620
621 if not self.support_subsecond(self.fname):
622 delta = 1.0
623 else:
624 # On Windows, the usual resolution of time.time() is 15.6 ms
625 delta = 0.020
626 st = os.stat(self.fname)
627 msg = ("st_time=%r, current=%r, dt=%r"
628 % (st.st_mtime, current, st.st_mtime - current))
629 self.assertAlmostEqual(st.st_mtime, current,
630 delta=delta, msg=msg)
631
632 def test_utime_current(self):
633 def set_time(filename):
634 # Set to the current time in the new way
635 os.utime(self.fname)
636 self._test_utime_current(set_time)
637
638 def test_utime_current_old(self):
639 def set_time(filename):
640 # Set to the current time in the old explicit way.
641 os.utime(self.fname, None)
642 self._test_utime_current(set_time)
643
644 def get_file_system(self, path):
645 if sys.platform == 'win32':
646 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
647 import ctypes
648 kernel32 = ctypes.windll.kernel32
649 buf = ctypes.create_unicode_buffer("", 100)
650 ok = kernel32.GetVolumeInformationW(root, None, 0,
651 None, None, None,
652 buf, len(buf))
653 if ok:
654 return buf.value
655 # return None if the filesystem is unknown
656
657 def test_large_time(self):
658 # Many filesystems are limited to the year 2038. At least, the test
659 # pass with NTFS filesystem.
660 if self.get_file_system(self.dirname) != "NTFS":
661 self.skipTest("requires NTFS")
662
663 large = 5000000000 # some day in 2128
664 os.utime(self.fname, (large, large))
665 self.assertEqual(os.stat(self.fname).st_mtime, large)
666
667 def test_utime_invalid_arguments(self):
668 # seconds and nanoseconds parameters are mutually exclusive
669 with self.assertRaises(ValueError):
670 os.utime(self.fname, (5, 5), ns=(5, 5))
671
672
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000673from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000674
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000675class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000676 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000677 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000678
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000679 def setUp(self):
680 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000681 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000682 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000683 for key, value in self._reference().items():
684 os.environ[key] = value
685
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000686 def tearDown(self):
687 os.environ.clear()
688 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000689 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000690 os.environb.clear()
691 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000692
Christian Heimes90333392007-11-01 19:08:42 +0000693 def _reference(self):
694 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
695
696 def _empty_mapping(self):
697 os.environ.clear()
698 return os.environ
699
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000700 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200701 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
702 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000703 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000704 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300705 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200706 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300707 value = popen.read().strip()
708 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000709
Xavier de Gayed1415312016-07-22 12:15:29 +0200710 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
711 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000712 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200713 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
714 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300715 it = iter(popen)
716 self.assertEqual(next(it), "line1\n")
717 self.assertEqual(next(it), "line2\n")
718 self.assertEqual(next(it), "line3\n")
719 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000720
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000721 # Verify environ keys and values from the OS are of the
722 # correct str type.
723 def test_keyvalue_types(self):
724 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000725 self.assertEqual(type(key), str)
726 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000727
Christian Heimes90333392007-11-01 19:08:42 +0000728 def test_items(self):
729 for key, value in self._reference().items():
730 self.assertEqual(os.environ.get(key), value)
731
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000732 # Issue 7310
733 def test___repr__(self):
734 """Check that the repr() of os.environ looks like environ({...})."""
735 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000736 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
737 '{!r}: {!r}'.format(key, value)
738 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000739
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000740 def test_get_exec_path(self):
741 defpath_list = os.defpath.split(os.pathsep)
742 test_path = ['/monty', '/python', '', '/flying/circus']
743 test_env = {'PATH': os.pathsep.join(test_path)}
744
745 saved_environ = os.environ
746 try:
747 os.environ = dict(test_env)
748 # Test that defaulting to os.environ works.
749 self.assertSequenceEqual(test_path, os.get_exec_path())
750 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
751 finally:
752 os.environ = saved_environ
753
754 # No PATH environment variable
755 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
756 # Empty PATH environment variable
757 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
758 # Supplied PATH environment variable
759 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
760
Victor Stinnerb745a742010-05-18 17:17:23 +0000761 if os.supports_bytes_environ:
762 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000763 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000764 # ignore BytesWarning warning
765 with warnings.catch_warnings(record=True):
766 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000767 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000768 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000769 pass
770 else:
771 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000772
773 # bytes key and/or value
774 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
775 ['abc'])
776 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
777 ['abc'])
778 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
779 ['abc'])
780
781 @unittest.skipUnless(os.supports_bytes_environ,
782 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000783 def test_environb(self):
784 # os.environ -> os.environb
785 value = 'euro\u20ac'
786 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000787 value_bytes = value.encode(sys.getfilesystemencoding(),
788 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000789 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000790 msg = "U+20AC character is not encodable to %s" % (
791 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000792 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000793 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000794 self.assertEqual(os.environ['unicode'], value)
795 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000796
797 # os.environb -> os.environ
798 value = b'\xff'
799 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000800 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000801 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000802 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000803
Charles-François Natali2966f102011-11-26 11:32:46 +0100804 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
805 # #13415).
806 @support.requires_freebsd_version(7)
807 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100808 def test_unset_error(self):
809 if sys.platform == "win32":
810 # an environment variable is limited to 32,767 characters
811 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100812 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100813 else:
814 # "=" is not allowed in a variable name
815 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100816 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100817
Victor Stinner6d101392013-04-14 16:35:04 +0200818 def test_key_type(self):
819 missing = 'missingkey'
820 self.assertNotIn(missing, os.environ)
821
Victor Stinner839e5ea2013-04-14 16:43:03 +0200822 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200823 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200824 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200825 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200826
Victor Stinner839e5ea2013-04-14 16:43:03 +0200827 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200828 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200829 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200830 self.assertTrue(cm.exception.__suppress_context__)
831
Victor Stinner6d101392013-04-14 16:35:04 +0200832
Tim Petersc4e09402003-04-25 07:11:48 +0000833class WalkTests(unittest.TestCase):
834 """Tests for os.walk()."""
835
Victor Stinner0561c532015-03-12 10:28:24 +0100836 # Wrapper to hide minor differences between os.walk and os.fwalk
837 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200838 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200839 if 'follow_symlinks' in kwargs:
840 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200841 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100842
Charles-François Natali7372b062012-02-05 15:15:38 +0100843 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100844 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100845 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000846
847 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000848 # TESTFN/
849 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000850 # tmp1
851 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000852 # tmp2
853 # SUB11/ no kids
854 # SUB2/ a file kid and a dirsymlink kid
855 # tmp3
856 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200857 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000858 # TEST2/
859 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100860 self.walk_path = join(support.TESTFN, "TEST1")
861 self.sub1_path = join(self.walk_path, "SUB1")
862 self.sub11_path = join(self.sub1_path, "SUB11")
863 sub2_path = join(self.walk_path, "SUB2")
864 tmp1_path = join(self.walk_path, "tmp1")
865 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000866 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100867 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000868 t2_path = join(support.TESTFN, "TEST2")
869 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200870 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000871
872 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100873 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000874 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000875 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100876
Guido van Rossumd8faa362007-04-27 19:54:29 +0000877 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100878 with open(path, "x") as f:
879 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000880
Victor Stinner0561c532015-03-12 10:28:24 +0100881 if support.can_symlink():
882 os.symlink(os.path.abspath(t2_path), self.link_path)
883 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200884 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100885 else:
886 self.sub2_tree = (sub2_path, [], ["tmp3"])
887
888 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000889 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300890 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100891
Tim Petersc4e09402003-04-25 07:11:48 +0000892 self.assertEqual(len(all), 4)
893 # We can't know which order SUB1 and SUB2 will appear in.
894 # Not flipped: TESTFN, SUB1, SUB11, SUB2
895 # flipped: TESTFN, SUB2, SUB1, SUB11
896 flipped = all[0][1][0] != "SUB1"
897 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200898 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100899 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
900 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
901 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
902 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000903
Brett Cannon3f9183b2016-08-26 14:44:48 -0700904 def test_walk_prune(self, walk_path=None):
905 if walk_path is None:
906 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000907 # Prune the search.
908 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700909 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000910 all.append((root, dirs, files))
911 # Don't descend into SUB1.
912 if 'SUB1' in dirs:
913 # Note that this also mutates the dirs we appended to all!
914 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000915
Victor Stinner0561c532015-03-12 10:28:24 +0100916 self.assertEqual(len(all), 2)
917 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700918 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100919
920 all[1][-1].sort()
921 self.assertEqual(all[1], self.sub2_tree)
922
Brett Cannon3f9183b2016-08-26 14:44:48 -0700923 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700924 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700925
Victor Stinner0561c532015-03-12 10:28:24 +0100926 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000927 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100928 all = list(self.walk(self.walk_path, topdown=False))
929
Victor Stinner53b0a412016-03-26 01:12:36 +0100930 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000931 # We can't know which order SUB1 and SUB2 will appear in.
932 # Not flipped: SUB11, SUB1, SUB2, TESTFN
933 # flipped: SUB2, SUB11, SUB1, TESTFN
934 flipped = all[3][1][0] != "SUB1"
935 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200936 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100937 self.assertEqual(all[3],
938 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
939 self.assertEqual(all[flipped],
940 (self.sub11_path, [], []))
941 self.assertEqual(all[flipped + 1],
942 (self.sub1_path, ["SUB11"], ["tmp2"]))
943 self.assertEqual(all[2 - 2 * flipped],
944 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000945
Victor Stinner0561c532015-03-12 10:28:24 +0100946 def test_walk_symlink(self):
947 if not support.can_symlink():
948 self.skipTest("need symlink support")
949
950 # Walk, following symlinks.
951 walk_it = self.walk(self.walk_path, follow_symlinks=True)
952 for root, dirs, files in walk_it:
953 if root == self.link_path:
954 self.assertEqual(dirs, [])
955 self.assertEqual(files, ["tmp4"])
956 break
957 else:
958 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000959
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200960 def test_walk_bad_dir(self):
961 # Walk top-down.
962 errors = []
963 walk_it = self.walk(self.walk_path, onerror=errors.append)
964 root, dirs, files = next(walk_it)
965 self.assertFalse(errors)
966 dir1 = dirs[0]
967 dir1new = dir1 + '.new'
968 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
969 roots = [r for r, d, f in walk_it]
970 self.assertTrue(errors)
971 self.assertNotIn(os.path.join(root, dir1), roots)
972 self.assertNotIn(os.path.join(root, dir1new), roots)
973 for dir2 in dirs[1:]:
974 self.assertIn(os.path.join(root, dir2), roots)
975
Charles-François Natali7372b062012-02-05 15:15:38 +0100976
977@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
978class FwalkTests(WalkTests):
979 """Tests for os.fwalk()."""
980
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200981 def walk(self, top, **kwargs):
982 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100983 yield (root, dirs, files)
984
Larry Hastingsc48fe982012-06-25 04:49:05 -0700985 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
986 """
987 compare with walk() results.
988 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700989 walk_kwargs = walk_kwargs.copy()
990 fwalk_kwargs = fwalk_kwargs.copy()
991 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
992 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
993 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700994
Charles-François Natali7372b062012-02-05 15:15:38 +0100995 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700996 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100997 expected[root] = (set(dirs), set(files))
998
Larry Hastingsc48fe982012-06-25 04:49:05 -0700999 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001000 self.assertIn(root, expected)
1001 self.assertEqual(expected[root], (set(dirs), set(files)))
1002
Larry Hastingsc48fe982012-06-25 04:49:05 -07001003 def test_compare_to_walk(self):
1004 kwargs = {'top': support.TESTFN}
1005 self._compare_to_walk(kwargs, kwargs)
1006
Charles-François Natali7372b062012-02-05 15:15:38 +01001007 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001008 try:
1009 fd = os.open(".", os.O_RDONLY)
1010 walk_kwargs = {'top': support.TESTFN}
1011 fwalk_kwargs = walk_kwargs.copy()
1012 fwalk_kwargs['dir_fd'] = fd
1013 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1014 finally:
1015 os.close(fd)
1016
1017 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001018 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001019 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1020 args = support.TESTFN, topdown, None
1021 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001022 # check that the FD is valid
1023 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001024 # redundant check
1025 os.stat(rootfd)
1026 # check that listdir() returns consistent information
1027 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001028
1029 def test_fd_leak(self):
1030 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1031 # we both check that calling fwalk() a large number of times doesn't
1032 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1033 minfd = os.dup(1)
1034 os.close(minfd)
1035 for i in range(256):
1036 for x in os.fwalk(support.TESTFN):
1037 pass
1038 newfd = os.dup(1)
1039 self.addCleanup(os.close, newfd)
1040 self.assertEqual(newfd, minfd)
1041
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001042class BytesWalkTests(WalkTests):
1043 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001044 def setUp(self):
1045 super().setUp()
1046 self.stack = contextlib.ExitStack()
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001047
1048 def tearDown(self):
1049 self.stack.close()
1050 super().tearDown()
1051
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001052 def walk(self, top, **kwargs):
1053 if 'follow_symlinks' in kwargs:
1054 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1055 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1056 root = os.fsdecode(broot)
1057 dirs = list(map(os.fsdecode, bdirs))
1058 files = list(map(os.fsdecode, bfiles))
1059 yield (root, dirs, files)
1060 bdirs[:] = list(map(os.fsencode, dirs))
1061 bfiles[:] = list(map(os.fsencode, files))
1062
Charles-François Natali7372b062012-02-05 15:15:38 +01001063
Guido van Rossume7ba4952007-06-06 23:52:48 +00001064class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001065 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001066 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001067
1068 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001069 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001070 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1071 os.makedirs(path) # Should work
1072 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1073 os.makedirs(path)
1074
1075 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001076 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001077 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1078 os.makedirs(path)
1079 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1080 'dir5', 'dir6')
1081 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001082
Terry Reedy5a22b652010-12-02 07:05:56 +00001083 def test_exist_ok_existing_directory(self):
1084 path = os.path.join(support.TESTFN, 'dir1')
1085 mode = 0o777
1086 old_mask = os.umask(0o022)
1087 os.makedirs(path, mode)
1088 self.assertRaises(OSError, os.makedirs, path, mode)
1089 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001090 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001091 os.makedirs(path, mode=mode, exist_ok=True)
1092 os.umask(old_mask)
1093
Martin Pantera82642f2015-11-19 04:48:44 +00001094 # Issue #25583: A drive root could raise PermissionError on Windows
1095 os.makedirs(os.path.abspath('/'), exist_ok=True)
1096
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001097 def test_exist_ok_s_isgid_directory(self):
1098 path = os.path.join(support.TESTFN, 'dir1')
1099 S_ISGID = stat.S_ISGID
1100 mode = 0o777
1101 old_mask = os.umask(0o022)
1102 try:
1103 existing_testfn_mode = stat.S_IMODE(
1104 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001105 try:
1106 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001107 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001108 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001109 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1110 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1111 # The os should apply S_ISGID from the parent dir for us, but
1112 # this test need not depend on that behavior. Be explicit.
1113 os.makedirs(path, mode | S_ISGID)
1114 # http://bugs.python.org/issue14992
1115 # Should not fail when the bit is already set.
1116 os.makedirs(path, mode, exist_ok=True)
1117 # remove the bit.
1118 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001119 # May work even when the bit is not already set when demanded.
1120 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001121 finally:
1122 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001123
1124 def test_exist_ok_existing_regular_file(self):
1125 base = support.TESTFN
1126 path = os.path.join(support.TESTFN, 'dir1')
1127 f = open(path, 'w')
1128 f.write('abc')
1129 f.close()
1130 self.assertRaises(OSError, os.makedirs, path)
1131 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1132 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1133 os.remove(path)
1134
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001135 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001136 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001137 'dir4', 'dir5', 'dir6')
1138 # If the tests failed, the bottom-most directory ('../dir6')
1139 # may not have been created, so we look for the outermost directory
1140 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001141 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001142 path = os.path.dirname(path)
1143
1144 os.removedirs(path)
1145
Andrew Svetlov405faed2012-12-25 12:18:09 +02001146
R David Murrayf2ad1732014-12-25 18:36:56 -05001147@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1148class ChownFileTests(unittest.TestCase):
1149
Berker Peksag036a71b2015-07-21 09:29:48 +03001150 @classmethod
1151 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001152 os.mkdir(support.TESTFN)
1153
1154 def test_chown_uid_gid_arguments_must_be_index(self):
1155 stat = os.stat(support.TESTFN)
1156 uid = stat.st_uid
1157 gid = stat.st_gid
1158 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1159 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1160 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1161 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1162 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1163
1164 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1165 def test_chown(self):
1166 gid_1, gid_2 = groups[:2]
1167 uid = os.stat(support.TESTFN).st_uid
1168 os.chown(support.TESTFN, uid, gid_1)
1169 gid = os.stat(support.TESTFN).st_gid
1170 self.assertEqual(gid, gid_1)
1171 os.chown(support.TESTFN, uid, gid_2)
1172 gid = os.stat(support.TESTFN).st_gid
1173 self.assertEqual(gid, gid_2)
1174
1175 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1176 "test needs root privilege and more than one user")
1177 def test_chown_with_root(self):
1178 uid_1, uid_2 = all_users[:2]
1179 gid = os.stat(support.TESTFN).st_gid
1180 os.chown(support.TESTFN, uid_1, gid)
1181 uid = os.stat(support.TESTFN).st_uid
1182 self.assertEqual(uid, uid_1)
1183 os.chown(support.TESTFN, uid_2, gid)
1184 uid = os.stat(support.TESTFN).st_uid
1185 self.assertEqual(uid, uid_2)
1186
1187 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1188 "test needs non-root account and more than one user")
1189 def test_chown_without_permission(self):
1190 uid_1, uid_2 = all_users[:2]
1191 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001192 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001193 os.chown(support.TESTFN, uid_1, gid)
1194 os.chown(support.TESTFN, uid_2, gid)
1195
Berker Peksag036a71b2015-07-21 09:29:48 +03001196 @classmethod
1197 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001198 os.rmdir(support.TESTFN)
1199
1200
Andrew Svetlov405faed2012-12-25 12:18:09 +02001201class RemoveDirsTests(unittest.TestCase):
1202 def setUp(self):
1203 os.makedirs(support.TESTFN)
1204
1205 def tearDown(self):
1206 support.rmtree(support.TESTFN)
1207
1208 def test_remove_all(self):
1209 dira = os.path.join(support.TESTFN, 'dira')
1210 os.mkdir(dira)
1211 dirb = os.path.join(dira, 'dirb')
1212 os.mkdir(dirb)
1213 os.removedirs(dirb)
1214 self.assertFalse(os.path.exists(dirb))
1215 self.assertFalse(os.path.exists(dira))
1216 self.assertFalse(os.path.exists(support.TESTFN))
1217
1218 def test_remove_partial(self):
1219 dira = os.path.join(support.TESTFN, 'dira')
1220 os.mkdir(dira)
1221 dirb = os.path.join(dira, 'dirb')
1222 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001223 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001224 os.removedirs(dirb)
1225 self.assertFalse(os.path.exists(dirb))
1226 self.assertTrue(os.path.exists(dira))
1227 self.assertTrue(os.path.exists(support.TESTFN))
1228
1229 def test_remove_nothing(self):
1230 dira = os.path.join(support.TESTFN, 'dira')
1231 os.mkdir(dira)
1232 dirb = os.path.join(dira, 'dirb')
1233 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001234 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001235 with self.assertRaises(OSError):
1236 os.removedirs(dirb)
1237 self.assertTrue(os.path.exists(dirb))
1238 self.assertTrue(os.path.exists(dira))
1239 self.assertTrue(os.path.exists(support.TESTFN))
1240
1241
Guido van Rossume7ba4952007-06-06 23:52:48 +00001242class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001243 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001244 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001245 f.write(b'hello')
1246 f.close()
1247 with open(os.devnull, 'rb') as f:
1248 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001249
Andrew Svetlov405faed2012-12-25 12:18:09 +02001250
Guido van Rossume7ba4952007-06-06 23:52:48 +00001251class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001252 def test_urandom_length(self):
1253 self.assertEqual(len(os.urandom(0)), 0)
1254 self.assertEqual(len(os.urandom(1)), 1)
1255 self.assertEqual(len(os.urandom(10)), 10)
1256 self.assertEqual(len(os.urandom(100)), 100)
1257 self.assertEqual(len(os.urandom(1000)), 1000)
1258
1259 def test_urandom_value(self):
1260 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001261 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001262 data2 = os.urandom(16)
1263 self.assertNotEqual(data1, data2)
1264
1265 def get_urandom_subprocess(self, count):
1266 code = '\n'.join((
1267 'import os, sys',
1268 'data = os.urandom(%s)' % count,
1269 'sys.stdout.buffer.write(data)',
1270 'sys.stdout.buffer.flush()'))
1271 out = assert_python_ok('-c', code)
1272 stdout = out[1]
1273 self.assertEqual(len(stdout), 16)
1274 return stdout
1275
1276 def test_urandom_subprocess(self):
1277 data1 = self.get_urandom_subprocess(16)
1278 data2 = self.get_urandom_subprocess(16)
1279 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001280
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001281
Victor Stinner9b1f4742016-09-06 16:18:52 -07001282@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1283class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001284 @classmethod
1285 def setUpClass(cls):
1286 try:
1287 os.getrandom(1)
1288 except OSError as exc:
1289 if exc.errno == errno.ENOSYS:
1290 # Python compiled on a more recent Linux version
1291 # than the current Linux kernel
1292 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1293 else:
1294 raise
1295
Victor Stinner9b1f4742016-09-06 16:18:52 -07001296 def test_getrandom_type(self):
1297 data = os.getrandom(16)
1298 self.assertIsInstance(data, bytes)
1299 self.assertEqual(len(data), 16)
1300
1301 def test_getrandom0(self):
1302 empty = os.getrandom(0)
1303 self.assertEqual(empty, b'')
1304
1305 def test_getrandom_random(self):
1306 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1307
1308 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1309 # resource /dev/random
1310
1311 def test_getrandom_nonblock(self):
1312 # The call must not fail. Check also that the flag exists
1313 try:
1314 os.getrandom(1, os.GRND_NONBLOCK)
1315 except BlockingIOError:
1316 # System urandom is not initialized yet
1317 pass
1318
1319 def test_getrandom_value(self):
1320 data1 = os.getrandom(16)
1321 data2 = os.getrandom(16)
1322 self.assertNotEqual(data1, data2)
1323
1324
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001325# os.urandom() doesn't use a file descriptor when it is implemented with the
1326# getentropy() function, the getrandom() function or the getrandom() syscall
1327OS_URANDOM_DONT_USE_FD = (
1328 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1329 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1330 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001331
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001332@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1333 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001334class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001335 @unittest.skipUnless(resource, "test requires the resource module")
1336 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001337 # Check urandom() failing when it is not able to open /dev/random.
1338 # We spawn a new process to make the test more robust (if getrlimit()
1339 # failed to restore the file descriptor limit after this, the whole
1340 # test suite would crash; this actually happened on the OS X Tiger
1341 # buildbot).
1342 code = """if 1:
1343 import errno
1344 import os
1345 import resource
1346
1347 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1348 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1349 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001350 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001351 except OSError as e:
1352 assert e.errno == errno.EMFILE, e.errno
1353 else:
1354 raise AssertionError("OSError not raised")
1355 """
1356 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001357
Antoine Pitroue472aea2014-04-26 14:33:03 +02001358 def test_urandom_fd_closed(self):
1359 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1360 # closed.
1361 code = """if 1:
1362 import os
1363 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001364 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001365 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001366 with test.support.SuppressCrashReport():
1367 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001368 sys.stdout.buffer.write(os.urandom(4))
1369 """
1370 rc, out, err = assert_python_ok('-Sc', code)
1371
1372 def test_urandom_fd_reopened(self):
1373 # Issue #21207: urandom() should detect its fd to /dev/urandom
1374 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001375 self.addCleanup(support.unlink, support.TESTFN)
1376 create_file(support.TESTFN, b"x" * 256)
1377
Antoine Pitroue472aea2014-04-26 14:33:03 +02001378 code = """if 1:
1379 import os
1380 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001381 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001382 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001383 with test.support.SuppressCrashReport():
1384 for fd in range(3, 256):
1385 try:
1386 os.close(fd)
1387 except OSError:
1388 pass
1389 else:
1390 # Found the urandom fd (XXX hopefully)
1391 break
1392 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001393 with open({TESTFN!r}, 'rb') as f:
1394 os.dup2(f.fileno(), fd)
1395 sys.stdout.buffer.write(os.urandom(4))
1396 sys.stdout.buffer.write(os.urandom(4))
1397 """.format(TESTFN=support.TESTFN)
1398 rc, out, err = assert_python_ok('-Sc', code)
1399 self.assertEqual(len(out), 8)
1400 self.assertNotEqual(out[0:4], out[4:8])
1401 rc, out2, err2 = assert_python_ok('-Sc', code)
1402 self.assertEqual(len(out2), 8)
1403 self.assertNotEqual(out2, out)
1404
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001405
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001406@contextlib.contextmanager
1407def _execvpe_mockup(defpath=None):
1408 """
1409 Stubs out execv and execve functions when used as context manager.
1410 Records exec calls. The mock execv and execve functions always raise an
1411 exception as they would normally never return.
1412 """
1413 # A list of tuples containing (function name, first arg, args)
1414 # of calls to execv or execve that have been made.
1415 calls = []
1416
1417 def mock_execv(name, *args):
1418 calls.append(('execv', name, args))
1419 raise RuntimeError("execv called")
1420
1421 def mock_execve(name, *args):
1422 calls.append(('execve', name, args))
1423 raise OSError(errno.ENOTDIR, "execve called")
1424
1425 try:
1426 orig_execv = os.execv
1427 orig_execve = os.execve
1428 orig_defpath = os.defpath
1429 os.execv = mock_execv
1430 os.execve = mock_execve
1431 if defpath is not None:
1432 os.defpath = defpath
1433 yield calls
1434 finally:
1435 os.execv = orig_execv
1436 os.execve = orig_execve
1437 os.defpath = orig_defpath
1438
Victor Stinner4659ccf2016-09-14 10:57:00 +02001439
Guido van Rossume7ba4952007-06-06 23:52:48 +00001440class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001441 @unittest.skipIf(USING_LINUXTHREADS,
1442 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001443 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001444 self.assertRaises(OSError, os.execvpe, 'no such app-',
1445 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001446
Thomas Heller6790d602007-08-30 17:15:14 +00001447 def test_execvpe_with_bad_arglist(self):
1448 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1449
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001450 @unittest.skipUnless(hasattr(os, '_execvpe'),
1451 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001452 def _test_internal_execvpe(self, test_type):
1453 program_path = os.sep + 'absolutepath'
1454 if test_type is bytes:
1455 program = b'executable'
1456 fullpath = os.path.join(os.fsencode(program_path), program)
1457 native_fullpath = fullpath
1458 arguments = [b'progname', 'arg1', 'arg2']
1459 else:
1460 program = 'executable'
1461 arguments = ['progname', 'arg1', 'arg2']
1462 fullpath = os.path.join(program_path, program)
1463 if os.name != "nt":
1464 native_fullpath = os.fsencode(fullpath)
1465 else:
1466 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001467 env = {'spam': 'beans'}
1468
Victor Stinnerb745a742010-05-18 17:17:23 +00001469 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001470 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001471 self.assertRaises(RuntimeError,
1472 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001473 self.assertEqual(len(calls), 1)
1474 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1475
Victor Stinnerb745a742010-05-18 17:17:23 +00001476 # test os._execvpe() with a relative path:
1477 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001478 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001479 self.assertRaises(OSError,
1480 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001481 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001482 self.assertSequenceEqual(calls[0],
1483 ('execve', native_fullpath, (arguments, env)))
1484
1485 # test os._execvpe() with a relative path:
1486 # os.get_exec_path() reads the 'PATH' variable
1487 with _execvpe_mockup() as calls:
1488 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001489 if test_type is bytes:
1490 env_path[b'PATH'] = program_path
1491 else:
1492 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001493 self.assertRaises(OSError,
1494 os._execvpe, program, arguments, env=env_path)
1495 self.assertEqual(len(calls), 1)
1496 self.assertSequenceEqual(calls[0],
1497 ('execve', native_fullpath, (arguments, env_path)))
1498
1499 def test_internal_execvpe_str(self):
1500 self._test_internal_execvpe(str)
1501 if os.name != "nt":
1502 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001503
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001504
Serhiy Storchaka43767632013-11-03 21:31:38 +02001505@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001506class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001507 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001508 try:
1509 os.stat(support.TESTFN)
1510 except FileNotFoundError:
1511 exists = False
1512 except OSError as exc:
1513 exists = True
1514 self.fail("file %s must not exist; os.stat failed with %s"
1515 % (support.TESTFN, exc))
1516 else:
1517 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001518
Thomas Wouters477c8d52006-05-27 19:21:47 +00001519 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001520 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001521
1522 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001523 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001524
1525 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001526 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001527
1528 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001529 self.addCleanup(support.unlink, support.TESTFN)
1530
Victor Stinnere77c9742016-03-25 10:28:23 +01001531 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001532 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001533
1534 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001535 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001536
Thomas Wouters477c8d52006-05-27 19:21:47 +00001537 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001538 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001539
Victor Stinnere77c9742016-03-25 10:28:23 +01001540
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001541class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001542 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001543 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1544 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001545 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001546 def get_single(f):
1547 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001548 if hasattr(os, f):
1549 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001550 return helper
1551 for f in singles:
1552 locals()["test_"+f] = get_single(f)
1553
Benjamin Peterson7522c742009-01-19 21:00:09 +00001554 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001555 try:
1556 f(support.make_bad_fd(), *args)
1557 except OSError as e:
1558 self.assertEqual(e.errno, errno.EBADF)
1559 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001560 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001561 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001562
Serhiy Storchaka43767632013-11-03 21:31:38 +02001563 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001564 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001565 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001566
Serhiy Storchaka43767632013-11-03 21:31:38 +02001567 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001568 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001569 fd = support.make_bad_fd()
1570 # Make sure none of the descriptors we are about to close are
1571 # currently valid (issue 6542).
1572 for i in range(10):
1573 try: os.fstat(fd+i)
1574 except OSError:
1575 pass
1576 else:
1577 break
1578 if i < 2:
1579 raise unittest.SkipTest(
1580 "Unable to acquire a range of invalid file descriptors")
1581 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001582
Serhiy Storchaka43767632013-11-03 21:31:38 +02001583 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001584 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001585 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001586
Serhiy Storchaka43767632013-11-03 21:31:38 +02001587 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001588 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001589 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001590
Serhiy Storchaka43767632013-11-03 21:31:38 +02001591 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001592 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001593 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001594
Serhiy Storchaka43767632013-11-03 21:31:38 +02001595 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001596 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001597 self.check(os.pathconf, "PC_NAME_MAX")
1598 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001599
Serhiy Storchaka43767632013-11-03 21:31:38 +02001600 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001601 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001602 self.check(os.truncate, 0)
1603 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001604
Serhiy Storchaka43767632013-11-03 21:31:38 +02001605 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001606 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001607 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001608
Serhiy Storchaka43767632013-11-03 21:31:38 +02001609 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001610 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001611 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001612
Victor Stinner57ddf782014-01-08 15:21:28 +01001613 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1614 def test_readv(self):
1615 buf = bytearray(10)
1616 self.check(os.readv, [buf])
1617
Serhiy Storchaka43767632013-11-03 21:31:38 +02001618 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001619 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001620 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001621
Serhiy Storchaka43767632013-11-03 21:31:38 +02001622 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001623 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001624 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001625
Victor Stinner57ddf782014-01-08 15:21:28 +01001626 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1627 def test_writev(self):
1628 self.check(os.writev, [b'abc'])
1629
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001630 def test_inheritable(self):
1631 self.check(os.get_inheritable)
1632 self.check(os.set_inheritable, True)
1633
1634 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1635 'needs os.get_blocking() and os.set_blocking()')
1636 def test_blocking(self):
1637 self.check(os.get_blocking)
1638 self.check(os.set_blocking, True)
1639
Brian Curtin1b9df392010-11-24 20:24:31 +00001640
1641class LinkTests(unittest.TestCase):
1642 def setUp(self):
1643 self.file1 = support.TESTFN
1644 self.file2 = os.path.join(support.TESTFN + "2")
1645
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001646 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001647 for file in (self.file1, self.file2):
1648 if os.path.exists(file):
1649 os.unlink(file)
1650
Brian Curtin1b9df392010-11-24 20:24:31 +00001651 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001652 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001653
Steve Dowercc16be82016-09-08 10:35:16 -07001654 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001655 with open(file1, "r") as f1, open(file2, "r") as f2:
1656 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1657
1658 def test_link(self):
1659 self._test_link(self.file1, self.file2)
1660
1661 def test_link_bytes(self):
1662 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1663 bytes(self.file2, sys.getfilesystemencoding()))
1664
Brian Curtinf498b752010-11-30 15:54:04 +00001665 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001666 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001667 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001668 except UnicodeError:
1669 raise unittest.SkipTest("Unable to encode for this platform.")
1670
Brian Curtinf498b752010-11-30 15:54:04 +00001671 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001672 self.file2 = self.file1 + "2"
1673 self._test_link(self.file1, self.file2)
1674
Serhiy Storchaka43767632013-11-03 21:31:38 +02001675@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1676class PosixUidGidTests(unittest.TestCase):
1677 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1678 def test_setuid(self):
1679 if os.getuid() != 0:
1680 self.assertRaises(OSError, os.setuid, 0)
1681 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001682
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1684 def test_setgid(self):
1685 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1686 self.assertRaises(OSError, os.setgid, 0)
1687 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001688
Serhiy Storchaka43767632013-11-03 21:31:38 +02001689 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1690 def test_seteuid(self):
1691 if os.getuid() != 0:
1692 self.assertRaises(OSError, os.seteuid, 0)
1693 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001694
Serhiy Storchaka43767632013-11-03 21:31:38 +02001695 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1696 def test_setegid(self):
1697 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1698 self.assertRaises(OSError, os.setegid, 0)
1699 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001700
Serhiy Storchaka43767632013-11-03 21:31:38 +02001701 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1702 def test_setreuid(self):
1703 if os.getuid() != 0:
1704 self.assertRaises(OSError, os.setreuid, 0, 0)
1705 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1706 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001707
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1709 def test_setreuid_neg1(self):
1710 # Needs to accept -1. We run this in a subprocess to avoid
1711 # altering the test runner's process state (issue8045).
1712 subprocess.check_call([
1713 sys.executable, '-c',
1714 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001715
Serhiy Storchaka43767632013-11-03 21:31:38 +02001716 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1717 def test_setregid(self):
1718 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1719 self.assertRaises(OSError, os.setregid, 0, 0)
1720 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1721 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001722
Serhiy Storchaka43767632013-11-03 21:31:38 +02001723 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1724 def test_setregid_neg1(self):
1725 # Needs to accept -1. We run this in a subprocess to avoid
1726 # altering the test runner's process state (issue8045).
1727 subprocess.check_call([
1728 sys.executable, '-c',
1729 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001730
Serhiy Storchaka43767632013-11-03 21:31:38 +02001731@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1732class Pep383Tests(unittest.TestCase):
1733 def setUp(self):
1734 if support.TESTFN_UNENCODABLE:
1735 self.dir = support.TESTFN_UNENCODABLE
1736 elif support.TESTFN_NONASCII:
1737 self.dir = support.TESTFN_NONASCII
1738 else:
1739 self.dir = support.TESTFN
1740 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001741
Serhiy Storchaka43767632013-11-03 21:31:38 +02001742 bytesfn = []
1743 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001744 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001745 fn = os.fsencode(fn)
1746 except UnicodeEncodeError:
1747 return
1748 bytesfn.append(fn)
1749 add_filename(support.TESTFN_UNICODE)
1750 if support.TESTFN_UNENCODABLE:
1751 add_filename(support.TESTFN_UNENCODABLE)
1752 if support.TESTFN_NONASCII:
1753 add_filename(support.TESTFN_NONASCII)
1754 if not bytesfn:
1755 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001756
Serhiy Storchaka43767632013-11-03 21:31:38 +02001757 self.unicodefn = set()
1758 os.mkdir(self.dir)
1759 try:
1760 for fn in bytesfn:
1761 support.create_empty_file(os.path.join(self.bdir, fn))
1762 fn = os.fsdecode(fn)
1763 if fn in self.unicodefn:
1764 raise ValueError("duplicate filename")
1765 self.unicodefn.add(fn)
1766 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001767 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001768 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001769
Serhiy Storchaka43767632013-11-03 21:31:38 +02001770 def tearDown(self):
1771 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001772
Serhiy Storchaka43767632013-11-03 21:31:38 +02001773 def test_listdir(self):
1774 expected = self.unicodefn
1775 found = set(os.listdir(self.dir))
1776 self.assertEqual(found, expected)
1777 # test listdir without arguments
1778 current_directory = os.getcwd()
1779 try:
1780 os.chdir(os.sep)
1781 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1782 finally:
1783 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001784
Serhiy Storchaka43767632013-11-03 21:31:38 +02001785 def test_open(self):
1786 for fn in self.unicodefn:
1787 f = open(os.path.join(self.dir, fn), 'rb')
1788 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001789
Serhiy Storchaka43767632013-11-03 21:31:38 +02001790 @unittest.skipUnless(hasattr(os, 'statvfs'),
1791 "need os.statvfs()")
1792 def test_statvfs(self):
1793 # issue #9645
1794 for fn in self.unicodefn:
1795 # should not fail with file not found error
1796 fullname = os.path.join(self.dir, fn)
1797 os.statvfs(fullname)
1798
1799 def test_stat(self):
1800 for fn in self.unicodefn:
1801 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001802
Brian Curtineb24d742010-04-12 17:16:38 +00001803@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1804class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001805 def _kill(self, sig):
1806 # Start sys.executable as a subprocess and communicate from the
1807 # subprocess to the parent that the interpreter is ready. When it
1808 # becomes ready, send *sig* via os.kill to the subprocess and check
1809 # that the return code is equal to *sig*.
1810 import ctypes
1811 from ctypes import wintypes
1812 import msvcrt
1813
1814 # Since we can't access the contents of the process' stdout until the
1815 # process has exited, use PeekNamedPipe to see what's inside stdout
1816 # without waiting. This is done so we can tell that the interpreter
1817 # is started and running at a point where it could handle a signal.
1818 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1819 PeekNamedPipe.restype = wintypes.BOOL
1820 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1821 ctypes.POINTER(ctypes.c_char), # stdout buf
1822 wintypes.DWORD, # Buffer size
1823 ctypes.POINTER(wintypes.DWORD), # bytes read
1824 ctypes.POINTER(wintypes.DWORD), # bytes avail
1825 ctypes.POINTER(wintypes.DWORD)) # bytes left
1826 msg = "running"
1827 proc = subprocess.Popen([sys.executable, "-c",
1828 "import sys;"
1829 "sys.stdout.write('{}');"
1830 "sys.stdout.flush();"
1831 "input()".format(msg)],
1832 stdout=subprocess.PIPE,
1833 stderr=subprocess.PIPE,
1834 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001835 self.addCleanup(proc.stdout.close)
1836 self.addCleanup(proc.stderr.close)
1837 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001838
1839 count, max = 0, 100
1840 while count < max and proc.poll() is None:
1841 # Create a string buffer to store the result of stdout from the pipe
1842 buf = ctypes.create_string_buffer(len(msg))
1843 # Obtain the text currently in proc.stdout
1844 # Bytes read/avail/left are left as NULL and unused
1845 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1846 buf, ctypes.sizeof(buf), None, None, None)
1847 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1848 if buf.value:
1849 self.assertEqual(msg, buf.value.decode())
1850 break
1851 time.sleep(0.1)
1852 count += 1
1853 else:
1854 self.fail("Did not receive communication from the subprocess")
1855
Brian Curtineb24d742010-04-12 17:16:38 +00001856 os.kill(proc.pid, sig)
1857 self.assertEqual(proc.wait(), sig)
1858
1859 def test_kill_sigterm(self):
1860 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001861 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001862
1863 def test_kill_int(self):
1864 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001865 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001866
1867 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001868 tagname = "test_os_%s" % uuid.uuid1()
1869 m = mmap.mmap(-1, 1, tagname)
1870 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001871 # Run a script which has console control handling enabled.
1872 proc = subprocess.Popen([sys.executable,
1873 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001874 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001875 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1876 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001877 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001878 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001879 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001880 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001881 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001882 count += 1
1883 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001884 # Forcefully kill the process if we weren't able to signal it.
1885 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001886 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001887 os.kill(proc.pid, event)
1888 # proc.send_signal(event) could also be done here.
1889 # Allow time for the signal to be passed and the process to exit.
1890 time.sleep(0.5)
1891 if not proc.poll():
1892 # Forcefully kill the process if we weren't able to signal it.
1893 os.kill(proc.pid, signal.SIGINT)
1894 self.fail("subprocess did not stop on {}".format(name))
1895
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001896 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001897 def test_CTRL_C_EVENT(self):
1898 from ctypes import wintypes
1899 import ctypes
1900
1901 # Make a NULL value by creating a pointer with no argument.
1902 NULL = ctypes.POINTER(ctypes.c_int)()
1903 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1904 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1905 wintypes.BOOL)
1906 SetConsoleCtrlHandler.restype = wintypes.BOOL
1907
1908 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001909 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001910 # by subprocesses.
1911 SetConsoleCtrlHandler(NULL, 0)
1912
1913 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1914
1915 def test_CTRL_BREAK_EVENT(self):
1916 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1917
1918
Brian Curtind40e6f72010-07-08 21:39:08 +00001919@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001920class Win32ListdirTests(unittest.TestCase):
1921 """Test listdir on Windows."""
1922
1923 def setUp(self):
1924 self.created_paths = []
1925 for i in range(2):
1926 dir_name = 'SUB%d' % i
1927 dir_path = os.path.join(support.TESTFN, dir_name)
1928 file_name = 'FILE%d' % i
1929 file_path = os.path.join(support.TESTFN, file_name)
1930 os.makedirs(dir_path)
1931 with open(file_path, 'w') as f:
1932 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1933 self.created_paths.extend([dir_name, file_name])
1934 self.created_paths.sort()
1935
1936 def tearDown(self):
1937 shutil.rmtree(support.TESTFN)
1938
1939 def test_listdir_no_extended_path(self):
1940 """Test when the path is not an "extended" path."""
1941 # unicode
1942 self.assertEqual(
1943 sorted(os.listdir(support.TESTFN)),
1944 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001945
Tim Golden781bbeb2013-10-25 20:24:06 +01001946 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001947 self.assertEqual(
1948 sorted(os.listdir(os.fsencode(support.TESTFN))),
1949 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001950
1951 def test_listdir_extended_path(self):
1952 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001953 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001954 # unicode
1955 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1956 self.assertEqual(
1957 sorted(os.listdir(path)),
1958 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001959
Tim Golden781bbeb2013-10-25 20:24:06 +01001960 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001961 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1962 self.assertEqual(
1963 sorted(os.listdir(path)),
1964 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001965
1966
1967@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001968@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001969class Win32SymlinkTests(unittest.TestCase):
1970 filelink = 'filelinktest'
1971 filelink_target = os.path.abspath(__file__)
1972 dirlink = 'dirlinktest'
1973 dirlink_target = os.path.dirname(filelink_target)
1974 missing_link = 'missing link'
1975
1976 def setUp(self):
1977 assert os.path.exists(self.dirlink_target)
1978 assert os.path.exists(self.filelink_target)
1979 assert not os.path.exists(self.dirlink)
1980 assert not os.path.exists(self.filelink)
1981 assert not os.path.exists(self.missing_link)
1982
1983 def tearDown(self):
1984 if os.path.exists(self.filelink):
1985 os.remove(self.filelink)
1986 if os.path.exists(self.dirlink):
1987 os.rmdir(self.dirlink)
1988 if os.path.lexists(self.missing_link):
1989 os.remove(self.missing_link)
1990
1991 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001992 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001993 self.assertTrue(os.path.exists(self.dirlink))
1994 self.assertTrue(os.path.isdir(self.dirlink))
1995 self.assertTrue(os.path.islink(self.dirlink))
1996 self.check_stat(self.dirlink, self.dirlink_target)
1997
1998 def test_file_link(self):
1999 os.symlink(self.filelink_target, self.filelink)
2000 self.assertTrue(os.path.exists(self.filelink))
2001 self.assertTrue(os.path.isfile(self.filelink))
2002 self.assertTrue(os.path.islink(self.filelink))
2003 self.check_stat(self.filelink, self.filelink_target)
2004
2005 def _create_missing_dir_link(self):
2006 'Create a "directory" link to a non-existent target'
2007 linkname = self.missing_link
2008 if os.path.lexists(linkname):
2009 os.remove(linkname)
2010 target = r'c:\\target does not exist.29r3c740'
2011 assert not os.path.exists(target)
2012 target_is_dir = True
2013 os.symlink(target, linkname, target_is_dir)
2014
2015 def test_remove_directory_link_to_missing_target(self):
2016 self._create_missing_dir_link()
2017 # For compatibility with Unix, os.remove will check the
2018 # directory status and call RemoveDirectory if the symlink
2019 # was created with target_is_dir==True.
2020 os.remove(self.missing_link)
2021
2022 @unittest.skip("currently fails; consider for improvement")
2023 def test_isdir_on_directory_link_to_missing_target(self):
2024 self._create_missing_dir_link()
2025 # consider having isdir return true for directory links
2026 self.assertTrue(os.path.isdir(self.missing_link))
2027
2028 @unittest.skip("currently fails; consider for improvement")
2029 def test_rmdir_on_directory_link_to_missing_target(self):
2030 self._create_missing_dir_link()
2031 # consider allowing rmdir to remove directory links
2032 os.rmdir(self.missing_link)
2033
2034 def check_stat(self, link, target):
2035 self.assertEqual(os.stat(link), os.stat(target))
2036 self.assertNotEqual(os.lstat(link), os.stat(link))
2037
Brian Curtind25aef52011-06-13 15:16:04 -05002038 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002039 self.assertEqual(os.stat(bytes_link), os.stat(target))
2040 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002041
2042 def test_12084(self):
2043 level1 = os.path.abspath(support.TESTFN)
2044 level2 = os.path.join(level1, "level2")
2045 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002046 self.addCleanup(support.rmtree, level1)
2047
2048 os.mkdir(level1)
2049 os.mkdir(level2)
2050 os.mkdir(level3)
2051
2052 file1 = os.path.abspath(os.path.join(level1, "file1"))
2053 create_file(file1)
2054
2055 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002056 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002057 os.chdir(level2)
2058 link = os.path.join(level2, "link")
2059 os.symlink(os.path.relpath(file1), "link")
2060 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002061
Victor Stinnerae39d232016-03-24 17:12:55 +01002062 # Check os.stat calls from the same dir as the link
2063 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002064
Victor Stinnerae39d232016-03-24 17:12:55 +01002065 # Check os.stat calls from a dir below the link
2066 os.chdir(level1)
2067 self.assertEqual(os.stat(file1),
2068 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002069
Victor Stinnerae39d232016-03-24 17:12:55 +01002070 # Check os.stat calls from a dir above the link
2071 os.chdir(level3)
2072 self.assertEqual(os.stat(file1),
2073 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002074 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002075 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002076
Brian Curtind40e6f72010-07-08 21:39:08 +00002077
Tim Golden0321cf22014-05-05 19:46:17 +01002078@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2079class Win32JunctionTests(unittest.TestCase):
2080 junction = 'junctiontest'
2081 junction_target = os.path.dirname(os.path.abspath(__file__))
2082
2083 def setUp(self):
2084 assert os.path.exists(self.junction_target)
2085 assert not os.path.exists(self.junction)
2086
2087 def tearDown(self):
2088 if os.path.exists(self.junction):
2089 # os.rmdir delegates to Windows' RemoveDirectoryW,
2090 # which removes junction points safely.
2091 os.rmdir(self.junction)
2092
2093 def test_create_junction(self):
2094 _winapi.CreateJunction(self.junction_target, self.junction)
2095 self.assertTrue(os.path.exists(self.junction))
2096 self.assertTrue(os.path.isdir(self.junction))
2097
2098 # Junctions are not recognized as links.
2099 self.assertFalse(os.path.islink(self.junction))
2100
2101 def test_unlink_removes_junction(self):
2102 _winapi.CreateJunction(self.junction_target, self.junction)
2103 self.assertTrue(os.path.exists(self.junction))
2104
2105 os.unlink(self.junction)
2106 self.assertFalse(os.path.exists(self.junction))
2107
2108
Jason R. Coombs3a092862013-05-27 23:21:28 -04002109@support.skip_unless_symlink
2110class NonLocalSymlinkTests(unittest.TestCase):
2111
2112 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002113 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002114 Create this structure:
2115
2116 base
2117 \___ some_dir
2118 """
2119 os.makedirs('base/some_dir')
2120
2121 def tearDown(self):
2122 shutil.rmtree('base')
2123
2124 def test_directory_link_nonlocal(self):
2125 """
2126 The symlink target should resolve relative to the link, not relative
2127 to the current directory.
2128
2129 Then, link base/some_link -> base/some_dir and ensure that some_link
2130 is resolved as a directory.
2131
2132 In issue13772, it was discovered that directory detection failed if
2133 the symlink target was not specified relative to the current
2134 directory, which was a defect in the implementation.
2135 """
2136 src = os.path.join('base', 'some_link')
2137 os.symlink('some_dir', src)
2138 assert os.path.isdir(src)
2139
2140
Victor Stinnere8d51452010-08-19 01:05:19 +00002141class FSEncodingTests(unittest.TestCase):
2142 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002143 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2144 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002145
Victor Stinnere8d51452010-08-19 01:05:19 +00002146 def test_identity(self):
2147 # assert fsdecode(fsencode(x)) == x
2148 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2149 try:
2150 bytesfn = os.fsencode(fn)
2151 except UnicodeEncodeError:
2152 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002153 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002154
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002155
Brett Cannonefb00c02012-02-29 18:31:31 -05002156
2157class DeviceEncodingTests(unittest.TestCase):
2158
2159 def test_bad_fd(self):
2160 # Return None when an fd doesn't actually exist.
2161 self.assertIsNone(os.device_encoding(123456))
2162
Philip Jenveye308b7c2012-02-29 16:16:15 -08002163 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2164 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002165 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002166 def test_device_encoding(self):
2167 encoding = os.device_encoding(0)
2168 self.assertIsNotNone(encoding)
2169 self.assertTrue(codecs.lookup(encoding))
2170
2171
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002172class PidTests(unittest.TestCase):
2173 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2174 def test_getppid(self):
2175 p = subprocess.Popen([sys.executable, '-c',
2176 'import os; print(os.getppid())'],
2177 stdout=subprocess.PIPE)
2178 stdout, _ = p.communicate()
2179 # We are the parent of our subprocess
2180 self.assertEqual(int(stdout), os.getpid())
2181
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002182 def test_waitpid(self):
2183 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002184 # Add an implicit test for PyUnicode_FSConverter().
2185 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002186 status = os.waitpid(pid, 0)
2187 self.assertEqual(status, (pid, 0))
2188
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002189
Victor Stinner4659ccf2016-09-14 10:57:00 +02002190class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002191 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002192 self.exitcode = 17
2193
2194 filename = support.TESTFN
2195 self.addCleanup(support.unlink, filename)
2196
2197 if not with_env:
2198 code = 'import sys; sys.exit(%s)' % self.exitcode
2199 else:
2200 self.env = dict(os.environ)
2201 # create an unique key
2202 self.key = str(uuid.uuid4())
2203 self.env[self.key] = self.key
2204 # read the variable from os.environ to check that it exists
2205 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2206 % (self.key, self.exitcode))
2207
2208 with open(filename, "w") as fp:
2209 fp.write(code)
2210
Berker Peksag81816462016-09-15 20:19:47 +03002211 args = [sys.executable, filename]
2212 if use_bytes:
2213 args = [os.fsencode(a) for a in args]
2214 self.env = {os.fsencode(k): os.fsencode(v)
2215 for k, v in self.env.items()}
2216
2217 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002218
Berker Peksag4af23d72016-09-15 20:32:44 +03002219 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002220 def test_spawnl(self):
2221 args = self.create_args()
2222 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2223 self.assertEqual(exitcode, self.exitcode)
2224
Berker Peksag4af23d72016-09-15 20:32:44 +03002225 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002226 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002227 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002228 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2229 self.assertEqual(exitcode, self.exitcode)
2230
Berker Peksag4af23d72016-09-15 20:32:44 +03002231 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002232 def test_spawnlp(self):
2233 args = self.create_args()
2234 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2235 self.assertEqual(exitcode, self.exitcode)
2236
Berker Peksag4af23d72016-09-15 20:32:44 +03002237 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002238 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002239 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002240 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2241 self.assertEqual(exitcode, self.exitcode)
2242
Berker Peksag4af23d72016-09-15 20:32:44 +03002243 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002244 def test_spawnv(self):
2245 args = self.create_args()
2246 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2247 self.assertEqual(exitcode, self.exitcode)
2248
Berker Peksag4af23d72016-09-15 20:32:44 +03002249 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002250 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002251 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002252 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2253 self.assertEqual(exitcode, self.exitcode)
2254
Berker Peksag4af23d72016-09-15 20:32:44 +03002255 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002256 def test_spawnvp(self):
2257 args = self.create_args()
2258 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2259 self.assertEqual(exitcode, self.exitcode)
2260
Berker Peksag4af23d72016-09-15 20:32:44 +03002261 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002262 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002263 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002264 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2265 self.assertEqual(exitcode, self.exitcode)
2266
Berker Peksag4af23d72016-09-15 20:32:44 +03002267 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002268 def test_nowait(self):
2269 args = self.create_args()
2270 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2271 result = os.waitpid(pid, 0)
2272 self.assertEqual(result[0], pid)
2273 status = result[1]
2274 if hasattr(os, 'WIFEXITED'):
2275 self.assertTrue(os.WIFEXITED(status))
2276 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2277 else:
2278 self.assertEqual(status, self.exitcode << 8)
2279
Berker Peksag4af23d72016-09-15 20:32:44 +03002280 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002281 def test_spawnve_bytes(self):
2282 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2283 args = self.create_args(with_env=True, use_bytes=True)
2284 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2285 self.assertEqual(exitcode, self.exitcode)
2286
Victor Stinner4659ccf2016-09-14 10:57:00 +02002287
Brian Curtin0151b8e2010-09-24 13:43:43 +00002288# The introduction of this TestCase caused at least two different errors on
2289# *nix buildbots. Temporarily skip this to let the buildbots move along.
2290@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002291@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2292class LoginTests(unittest.TestCase):
2293 def test_getlogin(self):
2294 user_name = os.getlogin()
2295 self.assertNotEqual(len(user_name), 0)
2296
2297
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002298@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2299 "needs os.getpriority and os.setpriority")
2300class ProgramPriorityTests(unittest.TestCase):
2301 """Tests for os.getpriority() and os.setpriority()."""
2302
2303 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002304
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002305 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2306 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2307 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002308 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2309 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002310 raise unittest.SkipTest("unable to reliably test setpriority "
2311 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002312 else:
2313 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002314 finally:
2315 try:
2316 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2317 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002318 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002319 raise
2320
2321
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002322if threading is not None:
2323 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002324
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002325 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002326
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002327 def __init__(self, conn):
2328 asynchat.async_chat.__init__(self, conn)
2329 self.in_buffer = []
2330 self.closed = False
2331 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002332
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002333 def handle_read(self):
2334 data = self.recv(4096)
2335 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002336
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002337 def get_data(self):
2338 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002339
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002340 def handle_close(self):
2341 self.close()
2342 self.closed = True
2343
2344 def handle_error(self):
2345 raise
2346
2347 def __init__(self, address):
2348 threading.Thread.__init__(self)
2349 asyncore.dispatcher.__init__(self)
2350 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2351 self.bind(address)
2352 self.listen(5)
2353 self.host, self.port = self.socket.getsockname()[:2]
2354 self.handler_instance = None
2355 self._active = False
2356 self._active_lock = threading.Lock()
2357
2358 # --- public API
2359
2360 @property
2361 def running(self):
2362 return self._active
2363
2364 def start(self):
2365 assert not self.running
2366 self.__flag = threading.Event()
2367 threading.Thread.start(self)
2368 self.__flag.wait()
2369
2370 def stop(self):
2371 assert self.running
2372 self._active = False
2373 self.join()
2374
2375 def wait(self):
2376 # wait for handler connection to be closed, then stop the server
2377 while not getattr(self.handler_instance, "closed", False):
2378 time.sleep(0.001)
2379 self.stop()
2380
2381 # --- internals
2382
2383 def run(self):
2384 self._active = True
2385 self.__flag.set()
2386 while self._active and asyncore.socket_map:
2387 self._active_lock.acquire()
2388 asyncore.loop(timeout=0.001, count=1)
2389 self._active_lock.release()
2390 asyncore.close_all()
2391
2392 def handle_accept(self):
2393 conn, addr = self.accept()
2394 self.handler_instance = self.Handler(conn)
2395
2396 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002397 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002398 handle_read = handle_connect
2399
2400 def writable(self):
2401 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002402
2403 def handle_error(self):
2404 raise
2405
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002406
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002407@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002408@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2409class TestSendfile(unittest.TestCase):
2410
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002411 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002412 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002413 not sys.platform.startswith("solaris") and \
2414 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002415 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2416 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002417
2418 @classmethod
2419 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002420 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002421 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002422
2423 @classmethod
2424 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002425 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002426 support.unlink(support.TESTFN)
2427
2428 def setUp(self):
2429 self.server = SendfileTestServer((support.HOST, 0))
2430 self.server.start()
2431 self.client = socket.socket()
2432 self.client.connect((self.server.host, self.server.port))
2433 self.client.settimeout(1)
2434 # synchronize by waiting for "220 ready" response
2435 self.client.recv(1024)
2436 self.sockno = self.client.fileno()
2437 self.file = open(support.TESTFN, 'rb')
2438 self.fileno = self.file.fileno()
2439
2440 def tearDown(self):
2441 self.file.close()
2442 self.client.close()
2443 if self.server.running:
2444 self.server.stop()
2445
2446 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2447 """A higher level wrapper representing how an application is
2448 supposed to use sendfile().
2449 """
2450 while 1:
2451 try:
2452 if self.SUPPORT_HEADERS_TRAILERS:
2453 return os.sendfile(sock, file, offset, nbytes, headers,
2454 trailers)
2455 else:
2456 return os.sendfile(sock, file, offset, nbytes)
2457 except OSError as err:
2458 if err.errno == errno.ECONNRESET:
2459 # disconnected
2460 raise
2461 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2462 # we have to retry send data
2463 continue
2464 else:
2465 raise
2466
2467 def test_send_whole_file(self):
2468 # normal send
2469 total_sent = 0
2470 offset = 0
2471 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002472 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002473 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2474 if sent == 0:
2475 break
2476 offset += sent
2477 total_sent += sent
2478 self.assertTrue(sent <= nbytes)
2479 self.assertEqual(offset, total_sent)
2480
2481 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002482 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002483 self.client.close()
2484 self.server.wait()
2485 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002486 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002487 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002488
2489 def test_send_at_certain_offset(self):
2490 # start sending a file at a certain offset
2491 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002492 offset = len(self.DATA) // 2
2493 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002494 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002495 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002496 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2497 if sent == 0:
2498 break
2499 offset += sent
2500 total_sent += sent
2501 self.assertTrue(sent <= nbytes)
2502
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002503 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002504 self.client.close()
2505 self.server.wait()
2506 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002507 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002508 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002509 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002510 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002511
2512 def test_offset_overflow(self):
2513 # specify an offset > file size
2514 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002515 try:
2516 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2517 except OSError as e:
2518 # Solaris can raise EINVAL if offset >= file length, ignore.
2519 if e.errno != errno.EINVAL:
2520 raise
2521 else:
2522 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002523 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002524 self.client.close()
2525 self.server.wait()
2526 data = self.server.handler_instance.get_data()
2527 self.assertEqual(data, b'')
2528
2529 def test_invalid_offset(self):
2530 with self.assertRaises(OSError) as cm:
2531 os.sendfile(self.sockno, self.fileno, -1, 4096)
2532 self.assertEqual(cm.exception.errno, errno.EINVAL)
2533
Martin Panterbf19d162015-09-09 01:01:13 +00002534 def test_keywords(self):
2535 # Keyword arguments should be supported
2536 os.sendfile(out=self.sockno, offset=0, count=4096,
2537 **{'in': self.fileno})
2538 if self.SUPPORT_HEADERS_TRAILERS:
2539 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002540 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002541
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002542 # --- headers / trailers tests
2543
Serhiy Storchaka43767632013-11-03 21:31:38 +02002544 @requires_headers_trailers
2545 def test_headers(self):
2546 total_sent = 0
2547 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2548 headers=[b"x" * 512])
2549 total_sent += sent
2550 offset = 4096
2551 nbytes = 4096
2552 while 1:
2553 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2554 offset, nbytes)
2555 if sent == 0:
2556 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002557 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002558 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002559
Serhiy Storchaka43767632013-11-03 21:31:38 +02002560 expected_data = b"x" * 512 + self.DATA
2561 self.assertEqual(total_sent, len(expected_data))
2562 self.client.close()
2563 self.server.wait()
2564 data = self.server.handler_instance.get_data()
2565 self.assertEqual(hash(data), hash(expected_data))
2566
2567 @requires_headers_trailers
2568 def test_trailers(self):
2569 TESTFN2 = support.TESTFN + "2"
2570 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002571
2572 self.addCleanup(support.unlink, TESTFN2)
2573 create_file(TESTFN2, file_data)
2574
2575 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002576 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2577 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002578 self.client.close()
2579 self.server.wait()
2580 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002581 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002582
Serhiy Storchaka43767632013-11-03 21:31:38 +02002583 @requires_headers_trailers
2584 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2585 'test needs os.SF_NODISKIO')
2586 def test_flags(self):
2587 try:
2588 os.sendfile(self.sockno, self.fileno, 0, 4096,
2589 flags=os.SF_NODISKIO)
2590 except OSError as err:
2591 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2592 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002593
2594
Larry Hastings9cf065c2012-06-22 16:30:09 -07002595def supports_extended_attributes():
2596 if not hasattr(os, "setxattr"):
2597 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002598
Larry Hastings9cf065c2012-06-22 16:30:09 -07002599 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002600 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002601 try:
2602 os.setxattr(fp.fileno(), b"user.test", b"")
2603 except OSError:
2604 return False
2605 finally:
2606 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002607
2608 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002609
2610
2611@unittest.skipUnless(supports_extended_attributes(),
2612 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002613# Kernels < 2.6.39 don't respect setxattr flags.
2614@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002615class ExtendedAttributeTests(unittest.TestCase):
2616
Larry Hastings9cf065c2012-06-22 16:30:09 -07002617 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002618 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002619 self.addCleanup(support.unlink, fn)
2620 create_file(fn)
2621
Benjamin Peterson799bd802011-08-31 22:15:17 -04002622 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002623 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002624 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002625
Victor Stinnerf12e5062011-10-16 22:12:03 +02002626 init_xattr = listxattr(fn)
2627 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002628
Larry Hastings9cf065c2012-06-22 16:30:09 -07002629 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002630 xattr = set(init_xattr)
2631 xattr.add("user.test")
2632 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002633 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2634 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2635 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002636
Benjamin Peterson799bd802011-08-31 22:15:17 -04002637 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002638 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002639 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002640
Benjamin Peterson799bd802011-08-31 22:15:17 -04002641 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002642 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002643 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002644
Larry Hastings9cf065c2012-06-22 16:30:09 -07002645 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002646 xattr.add("user.test2")
2647 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002648 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002649
Benjamin Peterson799bd802011-08-31 22:15:17 -04002650 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002651 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002652 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002653
Victor Stinnerf12e5062011-10-16 22:12:03 +02002654 xattr.remove("user.test")
2655 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002656 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2657 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2658 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2659 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002660 many = sorted("user.test{}".format(i) for i in range(100))
2661 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002662 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002663 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002664
Larry Hastings9cf065c2012-06-22 16:30:09 -07002665 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002666 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002667 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002668
2669 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2670 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002671
2672 def test_simple(self):
2673 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2674 os.listxattr)
2675
2676 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002677 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2678 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002679
2680 def test_fds(self):
2681 def getxattr(path, *args):
2682 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002683 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002684 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002685 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002686 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002687 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002688 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002689 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002690 def listxattr(path, *args):
2691 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002692 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002693 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2694
2695
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002696@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2697class TermsizeTests(unittest.TestCase):
2698 def test_does_not_crash(self):
2699 """Check if get_terminal_size() returns a meaningful value.
2700
2701 There's no easy portable way to actually check the size of the
2702 terminal, so let's check if it returns something sensible instead.
2703 """
2704 try:
2705 size = os.get_terminal_size()
2706 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002707 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002708 # Under win32 a generic OSError can be thrown if the
2709 # handle cannot be retrieved
2710 self.skipTest("failed to query terminal size")
2711 raise
2712
Antoine Pitroucfade362012-02-08 23:48:59 +01002713 self.assertGreaterEqual(size.columns, 0)
2714 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002715
2716 def test_stty_match(self):
2717 """Check if stty returns the same results
2718
2719 stty actually tests stdin, so get_terminal_size is invoked on
2720 stdin explicitly. If stty succeeded, then get_terminal_size()
2721 should work too.
2722 """
2723 try:
2724 size = subprocess.check_output(['stty', 'size']).decode().split()
2725 except (FileNotFoundError, subprocess.CalledProcessError):
2726 self.skipTest("stty invocation failed")
2727 expected = (int(size[1]), int(size[0])) # reversed order
2728
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002729 try:
2730 actual = os.get_terminal_size(sys.__stdin__.fileno())
2731 except OSError as e:
2732 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2733 # Under win32 a generic OSError can be thrown if the
2734 # handle cannot be retrieved
2735 self.skipTest("failed to query terminal size")
2736 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002737 self.assertEqual(expected, actual)
2738
2739
Victor Stinner292c8352012-10-30 02:17:38 +01002740class OSErrorTests(unittest.TestCase):
2741 def setUp(self):
2742 class Str(str):
2743 pass
2744
Victor Stinnerafe17062012-10-31 22:47:43 +01002745 self.bytes_filenames = []
2746 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002747 if support.TESTFN_UNENCODABLE is not None:
2748 decoded = support.TESTFN_UNENCODABLE
2749 else:
2750 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002751 self.unicode_filenames.append(decoded)
2752 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002753 if support.TESTFN_UNDECODABLE is not None:
2754 encoded = support.TESTFN_UNDECODABLE
2755 else:
2756 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002757 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002758 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002759 self.bytes_filenames.append(memoryview(encoded))
2760
2761 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002762
2763 def test_oserror_filename(self):
2764 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002765 (self.filenames, os.chdir,),
2766 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002767 (self.filenames, os.lstat,),
2768 (self.filenames, os.open, os.O_RDONLY),
2769 (self.filenames, os.rmdir,),
2770 (self.filenames, os.stat,),
2771 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002772 ]
2773 if sys.platform == "win32":
2774 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002775 (self.bytes_filenames, os.rename, b"dst"),
2776 (self.bytes_filenames, os.replace, b"dst"),
2777 (self.unicode_filenames, os.rename, "dst"),
2778 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002779 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002780 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002781 else:
2782 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002783 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002784 (self.filenames, os.rename, "dst"),
2785 (self.filenames, os.replace, "dst"),
2786 ))
2787 if hasattr(os, "chown"):
2788 funcs.append((self.filenames, os.chown, 0, 0))
2789 if hasattr(os, "lchown"):
2790 funcs.append((self.filenames, os.lchown, 0, 0))
2791 if hasattr(os, "truncate"):
2792 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002793 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002794 funcs.append((self.filenames, os.chflags, 0))
2795 if hasattr(os, "lchflags"):
2796 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002797 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002798 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002799 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002800 if sys.platform == "win32":
2801 funcs.append((self.bytes_filenames, os.link, b"dst"))
2802 funcs.append((self.unicode_filenames, os.link, "dst"))
2803 else:
2804 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002805 if hasattr(os, "listxattr"):
2806 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002807 (self.filenames, os.listxattr,),
2808 (self.filenames, os.getxattr, "user.test"),
2809 (self.filenames, os.setxattr, "user.test", b'user'),
2810 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002811 ))
2812 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002813 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002814 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002815 if sys.platform == "win32":
2816 funcs.append((self.unicode_filenames, os.readlink,))
2817 else:
2818 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002819
Steve Dowercc16be82016-09-08 10:35:16 -07002820
Victor Stinnerafe17062012-10-31 22:47:43 +01002821 for filenames, func, *func_args in funcs:
2822 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002823 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002824 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002825 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002826 else:
2827 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2828 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002829 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002830 self.assertIs(err.filename, name, str(func))
2831 except RuntimeError as err:
2832 if sys.platform != 'win32':
2833 raise
2834
2835 # issue27781: undecodable bytes currently raise RuntimeError
2836 # by 3.6.0b4 this will become UnicodeDecodeError or nothing
2837 self.assertIsInstance(err.__context__, UnicodeDecodeError)
Victor Stinner292c8352012-10-30 02:17:38 +01002838 else:
2839 self.fail("No exception thrown by {}".format(func))
2840
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002841class CPUCountTests(unittest.TestCase):
2842 def test_cpu_count(self):
2843 cpus = os.cpu_count()
2844 if cpus is not None:
2845 self.assertIsInstance(cpus, int)
2846 self.assertGreater(cpus, 0)
2847 else:
2848 self.skipTest("Could not determine the number of CPUs")
2849
Victor Stinnerdaf45552013-08-28 00:53:59 +02002850
2851class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002852 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002853 fd = os.open(__file__, os.O_RDONLY)
2854 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002855 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002856
Victor Stinnerdaf45552013-08-28 00:53:59 +02002857 os.set_inheritable(fd, True)
2858 self.assertEqual(os.get_inheritable(fd), True)
2859
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002860 @unittest.skipIf(fcntl is None, "need fcntl")
2861 def test_get_inheritable_cloexec(self):
2862 fd = os.open(__file__, os.O_RDONLY)
2863 self.addCleanup(os.close, fd)
2864 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002865
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002866 # clear FD_CLOEXEC flag
2867 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2868 flags &= ~fcntl.FD_CLOEXEC
2869 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002870
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002871 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002872
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002873 @unittest.skipIf(fcntl is None, "need fcntl")
2874 def test_set_inheritable_cloexec(self):
2875 fd = os.open(__file__, os.O_RDONLY)
2876 self.addCleanup(os.close, fd)
2877 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2878 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002879
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002880 os.set_inheritable(fd, True)
2881 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2882 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002883
Victor Stinnerdaf45552013-08-28 00:53:59 +02002884 def test_open(self):
2885 fd = os.open(__file__, os.O_RDONLY)
2886 self.addCleanup(os.close, fd)
2887 self.assertEqual(os.get_inheritable(fd), False)
2888
2889 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2890 def test_pipe(self):
2891 rfd, wfd = os.pipe()
2892 self.addCleanup(os.close, rfd)
2893 self.addCleanup(os.close, wfd)
2894 self.assertEqual(os.get_inheritable(rfd), False)
2895 self.assertEqual(os.get_inheritable(wfd), False)
2896
2897 def test_dup(self):
2898 fd1 = os.open(__file__, os.O_RDONLY)
2899 self.addCleanup(os.close, fd1)
2900
2901 fd2 = os.dup(fd1)
2902 self.addCleanup(os.close, fd2)
2903 self.assertEqual(os.get_inheritable(fd2), False)
2904
2905 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2906 def test_dup2(self):
2907 fd = os.open(__file__, os.O_RDONLY)
2908 self.addCleanup(os.close, fd)
2909
2910 # inheritable by default
2911 fd2 = os.open(__file__, os.O_RDONLY)
2912 try:
2913 os.dup2(fd, fd2)
2914 self.assertEqual(os.get_inheritable(fd2), True)
2915 finally:
2916 os.close(fd2)
2917
2918 # force non-inheritable
2919 fd3 = os.open(__file__, os.O_RDONLY)
2920 try:
2921 os.dup2(fd, fd3, inheritable=False)
2922 self.assertEqual(os.get_inheritable(fd3), False)
2923 finally:
2924 os.close(fd3)
2925
2926 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2927 def test_openpty(self):
2928 master_fd, slave_fd = os.openpty()
2929 self.addCleanup(os.close, master_fd)
2930 self.addCleanup(os.close, slave_fd)
2931 self.assertEqual(os.get_inheritable(master_fd), False)
2932 self.assertEqual(os.get_inheritable(slave_fd), False)
2933
2934
Brett Cannon3f9183b2016-08-26 14:44:48 -07002935class PathTConverterTests(unittest.TestCase):
2936 # tuples of (function name, allows fd arguments, additional arguments to
2937 # function, cleanup function)
2938 functions = [
2939 ('stat', True, (), None),
2940 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07002941 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07002942 ('chflags', False, (0,), None),
2943 ('lchflags', False, (0,), None),
2944 ('open', False, (0,), getattr(os, 'close', None)),
2945 ]
2946
2947 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07002948 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07002949 if os.name == 'nt':
2950 bytes_fspath = bytes_filename = None
2951 else:
2952 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07002953 bytes_fspath = _PathLike(bytes_filename)
2954 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002955 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03002956 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002957
Brett Cannonec6ce872016-09-06 15:50:29 -07002958 int_fspath = _PathLike(fd)
2959 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002960
2961 for name, allow_fd, extra_args, cleanup_fn in self.functions:
2962 with self.subTest(name=name):
2963 try:
2964 fn = getattr(os, name)
2965 except AttributeError:
2966 continue
2967
Brett Cannon8f96a302016-08-26 19:30:11 -07002968 for path in (str_filename, bytes_filename, str_fspath,
2969 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07002970 if path is None:
2971 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07002972 with self.subTest(name=name, path=path):
2973 result = fn(path, *extra_args)
2974 if cleanup_fn is not None:
2975 cleanup_fn(result)
2976
2977 with self.assertRaisesRegex(
2978 TypeError, 'should be string, bytes'):
2979 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002980
2981 if allow_fd:
2982 result = fn(fd, *extra_args) # should not fail
2983 if cleanup_fn is not None:
2984 cleanup_fn(result)
2985 else:
2986 with self.assertRaisesRegex(
2987 TypeError,
2988 'os.PathLike'):
2989 fn(fd, *extra_args)
2990
2991
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002992@unittest.skipUnless(hasattr(os, 'get_blocking'),
2993 'needs os.get_blocking() and os.set_blocking()')
2994class BlockingTests(unittest.TestCase):
2995 def test_blocking(self):
2996 fd = os.open(__file__, os.O_RDONLY)
2997 self.addCleanup(os.close, fd)
2998 self.assertEqual(os.get_blocking(fd), True)
2999
3000 os.set_blocking(fd, False)
3001 self.assertEqual(os.get_blocking(fd), False)
3002
3003 os.set_blocking(fd, True)
3004 self.assertEqual(os.get_blocking(fd), True)
3005
3006
Yury Selivanov97e2e062014-09-26 12:33:06 -04003007
3008class ExportsTests(unittest.TestCase):
3009 def test_os_all(self):
3010 self.assertIn('open', os.__all__)
3011 self.assertIn('walk', os.__all__)
3012
3013
Victor Stinner6036e442015-03-08 01:58:04 +01003014class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003015 check_no_resource_warning = support.check_no_resource_warning
3016
Victor Stinner6036e442015-03-08 01:58:04 +01003017 def setUp(self):
3018 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003019 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003020 self.addCleanup(support.rmtree, self.path)
3021 os.mkdir(self.path)
3022
3023 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003024 path = self.bytes_path if isinstance(name, bytes) else self.path
3025 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003026 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003027 return filename
3028
3029 def get_entries(self, names):
3030 entries = dict((entry.name, entry)
3031 for entry in os.scandir(self.path))
3032 self.assertEqual(sorted(entries.keys()), names)
3033 return entries
3034
3035 def assert_stat_equal(self, stat1, stat2, skip_fields):
3036 if skip_fields:
3037 for attr in dir(stat1):
3038 if not attr.startswith("st_"):
3039 continue
3040 if attr in ("st_dev", "st_ino", "st_nlink"):
3041 continue
3042 self.assertEqual(getattr(stat1, attr),
3043 getattr(stat2, attr),
3044 (stat1, stat2, attr))
3045 else:
3046 self.assertEqual(stat1, stat2)
3047
3048 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003049 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003050 self.assertEqual(entry.name, name)
3051 self.assertEqual(entry.path, os.path.join(self.path, name))
3052 self.assertEqual(entry.inode(),
3053 os.stat(entry.path, follow_symlinks=False).st_ino)
3054
3055 entry_stat = os.stat(entry.path)
3056 self.assertEqual(entry.is_dir(),
3057 stat.S_ISDIR(entry_stat.st_mode))
3058 self.assertEqual(entry.is_file(),
3059 stat.S_ISREG(entry_stat.st_mode))
3060 self.assertEqual(entry.is_symlink(),
3061 os.path.islink(entry.path))
3062
3063 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3064 self.assertEqual(entry.is_dir(follow_symlinks=False),
3065 stat.S_ISDIR(entry_lstat.st_mode))
3066 self.assertEqual(entry.is_file(follow_symlinks=False),
3067 stat.S_ISREG(entry_lstat.st_mode))
3068
3069 self.assert_stat_equal(entry.stat(),
3070 entry_stat,
3071 os.name == 'nt' and not is_symlink)
3072 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3073 entry_lstat,
3074 os.name == 'nt')
3075
3076 def test_attributes(self):
3077 link = hasattr(os, 'link')
3078 symlink = support.can_symlink()
3079
3080 dirname = os.path.join(self.path, "dir")
3081 os.mkdir(dirname)
3082 filename = self.create_file("file.txt")
3083 if link:
3084 os.link(filename, os.path.join(self.path, "link_file.txt"))
3085 if symlink:
3086 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3087 target_is_directory=True)
3088 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3089
3090 names = ['dir', 'file.txt']
3091 if link:
3092 names.append('link_file.txt')
3093 if symlink:
3094 names.extend(('symlink_dir', 'symlink_file.txt'))
3095 entries = self.get_entries(names)
3096
3097 entry = entries['dir']
3098 self.check_entry(entry, 'dir', True, False, False)
3099
3100 entry = entries['file.txt']
3101 self.check_entry(entry, 'file.txt', False, True, False)
3102
3103 if link:
3104 entry = entries['link_file.txt']
3105 self.check_entry(entry, 'link_file.txt', False, True, False)
3106
3107 if symlink:
3108 entry = entries['symlink_dir']
3109 self.check_entry(entry, 'symlink_dir', True, False, True)
3110
3111 entry = entries['symlink_file.txt']
3112 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3113
3114 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003115 path = self.bytes_path if isinstance(name, bytes) else self.path
3116 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003117 self.assertEqual(len(entries), 1)
3118
3119 entry = entries[0]
3120 self.assertEqual(entry.name, name)
3121 return entry
3122
Brett Cannon96881cd2016-06-10 14:37:21 -07003123 def create_file_entry(self, name='file.txt'):
3124 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003125 return self.get_entry(os.path.basename(filename))
3126
3127 def test_current_directory(self):
3128 filename = self.create_file()
3129 old_dir = os.getcwd()
3130 try:
3131 os.chdir(self.path)
3132
3133 # call scandir() without parameter: it must list the content
3134 # of the current directory
3135 entries = dict((entry.name, entry) for entry in os.scandir())
3136 self.assertEqual(sorted(entries.keys()),
3137 [os.path.basename(filename)])
3138 finally:
3139 os.chdir(old_dir)
3140
3141 def test_repr(self):
3142 entry = self.create_file_entry()
3143 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3144
Brett Cannon96881cd2016-06-10 14:37:21 -07003145 def test_fspath_protocol(self):
3146 entry = self.create_file_entry()
3147 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3148
3149 def test_fspath_protocol_bytes(self):
3150 bytes_filename = os.fsencode('bytesfile.txt')
3151 bytes_entry = self.create_file_entry(name=bytes_filename)
3152 fspath = os.fspath(bytes_entry)
3153 self.assertIsInstance(fspath, bytes)
3154 self.assertEqual(fspath,
3155 os.path.join(os.fsencode(self.path),bytes_filename))
3156
Victor Stinner6036e442015-03-08 01:58:04 +01003157 def test_removed_dir(self):
3158 path = os.path.join(self.path, 'dir')
3159
3160 os.mkdir(path)
3161 entry = self.get_entry('dir')
3162 os.rmdir(path)
3163
3164 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3165 if os.name == 'nt':
3166 self.assertTrue(entry.is_dir())
3167 self.assertFalse(entry.is_file())
3168 self.assertFalse(entry.is_symlink())
3169 if os.name == 'nt':
3170 self.assertRaises(FileNotFoundError, entry.inode)
3171 # don't fail
3172 entry.stat()
3173 entry.stat(follow_symlinks=False)
3174 else:
3175 self.assertGreater(entry.inode(), 0)
3176 self.assertRaises(FileNotFoundError, entry.stat)
3177 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3178
3179 def test_removed_file(self):
3180 entry = self.create_file_entry()
3181 os.unlink(entry.path)
3182
3183 self.assertFalse(entry.is_dir())
3184 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3185 if os.name == 'nt':
3186 self.assertTrue(entry.is_file())
3187 self.assertFalse(entry.is_symlink())
3188 if os.name == 'nt':
3189 self.assertRaises(FileNotFoundError, entry.inode)
3190 # don't fail
3191 entry.stat()
3192 entry.stat(follow_symlinks=False)
3193 else:
3194 self.assertGreater(entry.inode(), 0)
3195 self.assertRaises(FileNotFoundError, entry.stat)
3196 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3197
3198 def test_broken_symlink(self):
3199 if not support.can_symlink():
3200 return self.skipTest('cannot create symbolic link')
3201
3202 filename = self.create_file("file.txt")
3203 os.symlink(filename,
3204 os.path.join(self.path, "symlink.txt"))
3205 entries = self.get_entries(['file.txt', 'symlink.txt'])
3206 entry = entries['symlink.txt']
3207 os.unlink(filename)
3208
3209 self.assertGreater(entry.inode(), 0)
3210 self.assertFalse(entry.is_dir())
3211 self.assertFalse(entry.is_file()) # broken symlink returns False
3212 self.assertFalse(entry.is_dir(follow_symlinks=False))
3213 self.assertFalse(entry.is_file(follow_symlinks=False))
3214 self.assertTrue(entry.is_symlink())
3215 self.assertRaises(FileNotFoundError, entry.stat)
3216 # don't fail
3217 entry.stat(follow_symlinks=False)
3218
3219 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003220 self.create_file("file.txt")
3221
3222 path_bytes = os.fsencode(self.path)
3223 entries = list(os.scandir(path_bytes))
3224 self.assertEqual(len(entries), 1, entries)
3225 entry = entries[0]
3226
3227 self.assertEqual(entry.name, b'file.txt')
3228 self.assertEqual(entry.path,
3229 os.fsencode(os.path.join(self.path, 'file.txt')))
3230
3231 def test_empty_path(self):
3232 self.assertRaises(FileNotFoundError, os.scandir, '')
3233
3234 def test_consume_iterator_twice(self):
3235 self.create_file("file.txt")
3236 iterator = os.scandir(self.path)
3237
3238 entries = list(iterator)
3239 self.assertEqual(len(entries), 1, entries)
3240
3241 # check than consuming the iterator twice doesn't raise exception
3242 entries2 = list(iterator)
3243 self.assertEqual(len(entries2), 0, entries2)
3244
3245 def test_bad_path_type(self):
3246 for obj in [1234, 1.234, {}, []]:
3247 self.assertRaises(TypeError, os.scandir, obj)
3248
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003249 def test_close(self):
3250 self.create_file("file.txt")
3251 self.create_file("file2.txt")
3252 iterator = os.scandir(self.path)
3253 next(iterator)
3254 iterator.close()
3255 # multiple closes
3256 iterator.close()
3257 with self.check_no_resource_warning():
3258 del iterator
3259
3260 def test_context_manager(self):
3261 self.create_file("file.txt")
3262 self.create_file("file2.txt")
3263 with os.scandir(self.path) as iterator:
3264 next(iterator)
3265 with self.check_no_resource_warning():
3266 del iterator
3267
3268 def test_context_manager_close(self):
3269 self.create_file("file.txt")
3270 self.create_file("file2.txt")
3271 with os.scandir(self.path) as iterator:
3272 next(iterator)
3273 iterator.close()
3274
3275 def test_context_manager_exception(self):
3276 self.create_file("file.txt")
3277 self.create_file("file2.txt")
3278 with self.assertRaises(ZeroDivisionError):
3279 with os.scandir(self.path) as iterator:
3280 next(iterator)
3281 1/0
3282 with self.check_no_resource_warning():
3283 del iterator
3284
3285 def test_resource_warning(self):
3286 self.create_file("file.txt")
3287 self.create_file("file2.txt")
3288 iterator = os.scandir(self.path)
3289 next(iterator)
3290 with self.assertWarns(ResourceWarning):
3291 del iterator
3292 support.gc_collect()
3293 # exhausted iterator
3294 iterator = os.scandir(self.path)
3295 list(iterator)
3296 with self.check_no_resource_warning():
3297 del iterator
3298
Victor Stinner6036e442015-03-08 01:58:04 +01003299
Ethan Furmancdc08792016-06-02 15:06:09 -07003300class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003301
3302 # Abstracted so it can be overridden to test pure Python implementation
3303 # if a C version is provided.
3304 fspath = staticmethod(os.fspath)
3305
Ethan Furmancdc08792016-06-02 15:06:09 -07003306 def test_return_bytes(self):
3307 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003308 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003309
3310 def test_return_string(self):
3311 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003312 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003313
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003314 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003315 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003316 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003317
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003318 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003319 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3320 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3321
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003322 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003323 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3324 self.assertTrue(issubclass(_PathLike, os.PathLike))
3325 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003326
Ethan Furmancdc08792016-06-02 15:06:09 -07003327 def test_garbage_in_exception_out(self):
3328 vapor = type('blah', (), {})
3329 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003330 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003331
3332 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003333 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003334
Brett Cannon044283a2016-07-15 10:41:49 -07003335 def test_bad_pathlike(self):
3336 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003337 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003338 # __fspath__ attribute that is not callable.
3339 c = type('foo', (), {})
3340 c.__fspath__ = 1
3341 self.assertRaises(TypeError, self.fspath, c())
3342 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003343 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003344 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003345
3346# Only test if the C version is provided, otherwise TestPEP519 already tested
3347# the pure Python implementation.
3348if hasattr(os, "_fspath"):
3349 class TestPEP519PurePython(TestPEP519):
3350
3351 """Explicitly test the pure Python implementation of os.fspath()."""
3352
3353 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003354
3355
Fred Drake2e2be372001-09-20 21:33:42 +00003356if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003357 unittest.main()