blob: 2de94c643d425ca25e1a305b46b4cae9219b6f12 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
59except ImportError:
60 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
93@contextlib.contextmanager
94def bytes_filename_warn(expected):
95 msg = 'The Windows bytes API has been deprecated'
96 if os.name == 'nt':
97 with ignore_deprecation_warnings(msg, quiet=not expected):
98 yield
99 else:
100 yield
101
102
Brett Cannonec6ce872016-09-06 15:50:29 -0700103class _PathLike(os.PathLike):
104
105 def __init__(self, path=""):
106 self.path = path
107
108 def __str__(self):
109 return str(self.path)
110
111 def __fspath__(self):
112 if isinstance(self.path, BaseException):
113 raise self.path
114 else:
115 return self.path
116
117
Victor Stinnerae39d232016-03-24 17:12:55 +0100118def create_file(filename, content=b'content'):
119 with open(filename, "xb", 0) as fp:
120 fp.write(content)
121
122
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000123# Tests creating TESTFN
124class FileTests(unittest.TestCase):
125 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000126 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000127 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000128 tearDown = setUp
129
130 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000131 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000132 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000133 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000134
Christian Heimesfdab48e2008-01-20 09:06:41 +0000135 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000136 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
137 # We must allocate two consecutive file descriptors, otherwise
138 # it will mess up other file descriptors (perhaps even the three
139 # standard ones).
140 second = os.dup(first)
141 try:
142 retries = 0
143 while second != first + 1:
144 os.close(first)
145 retries += 1
146 if retries > 10:
147 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000148 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000149 first, second = second, os.dup(second)
150 finally:
151 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000152 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000153 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000154 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000155
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000156 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000157 def test_rename(self):
158 path = support.TESTFN
159 old = sys.getrefcount(path)
160 self.assertRaises(TypeError, os.rename, path, 0)
161 new = sys.getrefcount(path)
162 self.assertEqual(old, new)
163
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000164 def test_read(self):
165 with open(support.TESTFN, "w+b") as fobj:
166 fobj.write(b"spam")
167 fobj.flush()
168 fd = fobj.fileno()
169 os.lseek(fd, 0, 0)
170 s = os.read(fd, 4)
171 self.assertEqual(type(s), bytes)
172 self.assertEqual(s, b"spam")
173
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200174 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200175 # Skip the test on 32-bit platforms: the number of bytes must fit in a
176 # Py_ssize_t type
177 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
178 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200179 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
180 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200181 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100182 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200183
184 # Issue #21932: Make sure that os.read() does not raise an
185 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200186 with open(support.TESTFN, "rb") as fp:
187 data = os.read(fp.fileno(), size)
188
189 # The test does not try to read more than 2 GB at once because the
190 # operating system is free to return less bytes than requested.
191 self.assertEqual(data, b'test')
192
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000193 def test_write(self):
194 # os.write() accepts bytes- and buffer-like objects but not strings
195 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
196 self.assertRaises(TypeError, os.write, fd, "beans")
197 os.write(fd, b"bacon\n")
198 os.write(fd, bytearray(b"eggs\n"))
199 os.write(fd, memoryview(b"spam\n"))
200 os.close(fd)
201 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000202 self.assertEqual(fobj.read().splitlines(),
203 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000204
Victor Stinnere0daff12011-03-20 23:36:35 +0100205 def write_windows_console(self, *args):
206 retcode = subprocess.call(args,
207 # use a new console to not flood the test output
208 creationflags=subprocess.CREATE_NEW_CONSOLE,
209 # use a shell to hide the console window (SW_HIDE)
210 shell=True)
211 self.assertEqual(retcode, 0)
212
213 @unittest.skipUnless(sys.platform == 'win32',
214 'test specific to the Windows console')
215 def test_write_windows_console(self):
216 # Issue #11395: the Windows console returns an error (12: not enough
217 # space error) on writing into stdout if stdout mode is binary and the
218 # length is greater than 66,000 bytes (or less, depending on heap
219 # usage).
220 code = "print('x' * 100000)"
221 self.write_windows_console(sys.executable, "-c", code)
222 self.write_windows_console(sys.executable, "-u", "-c", code)
223
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000224 def fdopen_helper(self, *args):
225 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200226 f = os.fdopen(fd, *args)
227 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000228
229 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200230 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
231 os.close(fd)
232
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000233 self.fdopen_helper()
234 self.fdopen_helper('r')
235 self.fdopen_helper('r', 100)
236
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100237 def test_replace(self):
238 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100239 self.addCleanup(support.unlink, support.TESTFN)
240 self.addCleanup(support.unlink, TESTFN2)
241
242 create_file(support.TESTFN, b"1")
243 create_file(TESTFN2, b"2")
244
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100245 os.replace(support.TESTFN, TESTFN2)
246 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
247 with open(TESTFN2, 'r') as f:
248 self.assertEqual(f.read(), "1")
249
Martin Panterbf19d162015-09-09 01:01:13 +0000250 def test_open_keywords(self):
251 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
252 dir_fd=None)
253 os.close(f)
254
255 def test_symlink_keywords(self):
256 symlink = support.get_attribute(os, "symlink")
257 try:
258 symlink(src='target', dst=support.TESTFN,
259 target_is_directory=False, dir_fd=None)
260 except (NotImplementedError, OSError):
261 pass # No OS support or unprivileged user
262
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264# Test attributes on return values from os.*stat* family.
265class StatAttributeTests(unittest.TestCase):
266 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200267 self.fname = support.TESTFN
268 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100269 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270
Serhiy Storchaka43767632013-11-03 21:31:38 +0200271 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000272 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000273 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274
275 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000276 self.assertEqual(result[stat.ST_SIZE], 3)
277 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000279 # Make sure all the attributes are there
280 members = dir(result)
281 for name in dir(stat):
282 if name[:3] == 'ST_':
283 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000284 if name.endswith("TIME"):
285 def trunc(x): return int(x)
286 else:
287 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000288 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291
Larry Hastings6fe20b32012-04-19 15:07:49 -0700292 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700293 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700294 for name in 'st_atime st_mtime st_ctime'.split():
295 floaty = int(getattr(result, name) * 100000)
296 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700297 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700298
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000299 try:
300 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000302 except IndexError:
303 pass
304
305 # Make sure that assignment fails
306 try:
307 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200308 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000309 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000310 pass
311
312 try:
313 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200314 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000315 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000316 pass
317
318 try:
319 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200320 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 except AttributeError:
322 pass
323
324 # Use the stat_result constructor with a too-short tuple.
325 try:
326 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200327 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 except TypeError:
329 pass
330
Ezio Melotti42da6632011-03-15 05:18:48 +0200331 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332 try:
333 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
334 except TypeError:
335 pass
336
Antoine Pitrou38425292010-09-21 18:19:07 +0000337 def test_stat_attributes(self):
338 self.check_stat_attributes(self.fname)
339
340 def test_stat_attributes_bytes(self):
341 try:
342 fname = self.fname.encode(sys.getfilesystemencoding())
343 except UnicodeEncodeError:
344 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner923590e2016-03-24 09:11:48 +0100345 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100346 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000347
Christian Heimes25827622013-10-12 01:27:08 +0200348 def test_stat_result_pickle(self):
349 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200350 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
351 p = pickle.dumps(result, proto)
352 self.assertIn(b'stat_result', p)
353 if proto < 4:
354 self.assertIn(b'cos\nstat_result\n', p)
355 unpickled = pickle.loads(p)
356 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200357
Serhiy Storchaka43767632013-11-03 21:31:38 +0200358 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000360 try:
361 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000362 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000363 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000364 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200365 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366
367 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000368 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000369
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000370 # Make sure all the attributes are there.
371 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
372 'ffree', 'favail', 'flag', 'namemax')
373 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000374 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000375
376 # Make sure that assignment really fails
377 try:
378 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200379 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000380 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381 pass
382
383 try:
384 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200385 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386 except AttributeError:
387 pass
388
389 # Use the constructor with a too-short tuple.
390 try:
391 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200392 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000393 except TypeError:
394 pass
395
Ezio Melotti42da6632011-03-15 05:18:48 +0200396 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000397 try:
398 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
399 except TypeError:
400 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000401
Christian Heimes25827622013-10-12 01:27:08 +0200402 @unittest.skipUnless(hasattr(os, 'statvfs'),
403 "need os.statvfs()")
404 def test_statvfs_result_pickle(self):
405 try:
406 result = os.statvfs(self.fname)
407 except OSError as e:
408 # On AtheOS, glibc always returns ENOSYS
409 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200410 self.skipTest('os.statvfs() failed with ENOSYS')
411
Serhiy Storchakabad12572014-12-15 14:03:42 +0200412 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
413 p = pickle.dumps(result, proto)
414 self.assertIn(b'statvfs_result', p)
415 if proto < 4:
416 self.assertIn(b'cos\nstatvfs_result\n', p)
417 unpickled = pickle.loads(p)
418 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200419
Serhiy Storchaka43767632013-11-03 21:31:38 +0200420 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
421 def test_1686475(self):
422 # Verify that an open file can be stat'ed
423 try:
424 os.stat(r"c:\pagefile.sys")
425 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600426 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200427 except OSError as e:
428 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000429
Serhiy Storchaka43767632013-11-03 21:31:38 +0200430 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
431 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
432 def test_15261(self):
433 # Verify that stat'ing a closed fd does not cause crash
434 r, w = os.pipe()
435 try:
436 os.stat(r) # should not raise error
437 finally:
438 os.close(r)
439 os.close(w)
440 with self.assertRaises(OSError) as ctx:
441 os.stat(r)
442 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100443
Zachary Ware63f277b2014-06-19 09:46:37 -0500444 def check_file_attributes(self, result):
445 self.assertTrue(hasattr(result, 'st_file_attributes'))
446 self.assertTrue(isinstance(result.st_file_attributes, int))
447 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
448
449 @unittest.skipUnless(sys.platform == "win32",
450 "st_file_attributes is Win32 specific")
451 def test_file_attributes(self):
452 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
453 result = os.stat(self.fname)
454 self.check_file_attributes(result)
455 self.assertEqual(
456 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
457 0)
458
459 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200460 dirname = support.TESTFN + "dir"
461 os.mkdir(dirname)
462 self.addCleanup(os.rmdir, dirname)
463
464 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500465 self.check_file_attributes(result)
466 self.assertEqual(
467 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
468 stat.FILE_ATTRIBUTE_DIRECTORY)
469
Victor Stinner47aacc82015-06-12 17:26:23 +0200470
471class UtimeTests(unittest.TestCase):
472 def setUp(self):
473 self.dirname = support.TESTFN
474 self.fname = os.path.join(self.dirname, "f1")
475
476 self.addCleanup(support.rmtree, self.dirname)
477 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100478 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200479
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200480 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100481 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200482 os.stat_float_times(state)
483
Victor Stinner47aacc82015-06-12 17:26:23 +0200484 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100485 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200486 old_float_times = os.stat_float_times(-1)
487 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200488
489 os.stat_float_times(True)
490
491 def support_subsecond(self, filename):
492 # Heuristic to check if the filesystem supports timestamp with
493 # subsecond resolution: check if float and int timestamps are different
494 st = os.stat(filename)
495 return ((st.st_atime != st[7])
496 or (st.st_mtime != st[8])
497 or (st.st_ctime != st[9]))
498
499 def _test_utime(self, set_time, filename=None):
500 if not filename:
501 filename = self.fname
502
503 support_subsecond = self.support_subsecond(filename)
504 if support_subsecond:
505 # Timestamp with a resolution of 1 microsecond (10^-6).
506 #
507 # The resolution of the C internal function used by os.utime()
508 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
509 # test with a resolution of 1 ns requires more work:
510 # see the issue #15745.
511 atime_ns = 1002003000 # 1.002003 seconds
512 mtime_ns = 4005006000 # 4.005006 seconds
513 else:
514 # use a resolution of 1 second
515 atime_ns = 5 * 10**9
516 mtime_ns = 8 * 10**9
517
518 set_time(filename, (atime_ns, mtime_ns))
519 st = os.stat(filename)
520
521 if support_subsecond:
522 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
523 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
524 else:
525 self.assertEqual(st.st_atime, atime_ns * 1e-9)
526 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
527 self.assertEqual(st.st_atime_ns, atime_ns)
528 self.assertEqual(st.st_mtime_ns, mtime_ns)
529
530 def test_utime(self):
531 def set_time(filename, ns):
532 # test the ns keyword parameter
533 os.utime(filename, ns=ns)
534 self._test_utime(set_time)
535
536 @staticmethod
537 def ns_to_sec(ns):
538 # Convert a number of nanosecond (int) to a number of seconds (float).
539 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
540 # issue, os.utime() rounds towards minus infinity.
541 return (ns * 1e-9) + 0.5e-9
542
543 def test_utime_by_indexed(self):
544 # pass times as floating point seconds as the second indexed parameter
545 def set_time(filename, ns):
546 atime_ns, mtime_ns = ns
547 atime = self.ns_to_sec(atime_ns)
548 mtime = self.ns_to_sec(mtime_ns)
549 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
550 # or utime(time_t)
551 os.utime(filename, (atime, mtime))
552 self._test_utime(set_time)
553
554 def test_utime_by_times(self):
555 def set_time(filename, ns):
556 atime_ns, mtime_ns = ns
557 atime = self.ns_to_sec(atime_ns)
558 mtime = self.ns_to_sec(mtime_ns)
559 # test the times keyword parameter
560 os.utime(filename, times=(atime, mtime))
561 self._test_utime(set_time)
562
563 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
564 "follow_symlinks support for utime required "
565 "for this test.")
566 def test_utime_nofollow_symlinks(self):
567 def set_time(filename, ns):
568 # use follow_symlinks=False to test utimensat(timespec)
569 # or lutimes(timeval)
570 os.utime(filename, ns=ns, follow_symlinks=False)
571 self._test_utime(set_time)
572
573 @unittest.skipUnless(os.utime in os.supports_fd,
574 "fd support for utime required for this test.")
575 def test_utime_fd(self):
576 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100577 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200578 # use a file descriptor to test futimens(timespec)
579 # or futimes(timeval)
580 os.utime(fp.fileno(), ns=ns)
581 self._test_utime(set_time)
582
583 @unittest.skipUnless(os.utime in os.supports_dir_fd,
584 "dir_fd support for utime required for this test.")
585 def test_utime_dir_fd(self):
586 def set_time(filename, ns):
587 dirname, name = os.path.split(filename)
588 dirfd = os.open(dirname, os.O_RDONLY)
589 try:
590 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
591 os.utime(name, dir_fd=dirfd, ns=ns)
592 finally:
593 os.close(dirfd)
594 self._test_utime(set_time)
595
596 def test_utime_directory(self):
597 def set_time(filename, ns):
598 # test calling os.utime() on a directory
599 os.utime(filename, ns=ns)
600 self._test_utime(set_time, filename=self.dirname)
601
602 def _test_utime_current(self, set_time):
603 # Get the system clock
604 current = time.time()
605
606 # Call os.utime() to set the timestamp to the current system clock
607 set_time(self.fname)
608
609 if not self.support_subsecond(self.fname):
610 delta = 1.0
611 else:
612 # On Windows, the usual resolution of time.time() is 15.6 ms
613 delta = 0.020
614 st = os.stat(self.fname)
615 msg = ("st_time=%r, current=%r, dt=%r"
616 % (st.st_mtime, current, st.st_mtime - current))
617 self.assertAlmostEqual(st.st_mtime, current,
618 delta=delta, msg=msg)
619
620 def test_utime_current(self):
621 def set_time(filename):
622 # Set to the current time in the new way
623 os.utime(self.fname)
624 self._test_utime_current(set_time)
625
626 def test_utime_current_old(self):
627 def set_time(filename):
628 # Set to the current time in the old explicit way.
629 os.utime(self.fname, None)
630 self._test_utime_current(set_time)
631
632 def get_file_system(self, path):
633 if sys.platform == 'win32':
634 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
635 import ctypes
636 kernel32 = ctypes.windll.kernel32
637 buf = ctypes.create_unicode_buffer("", 100)
638 ok = kernel32.GetVolumeInformationW(root, None, 0,
639 None, None, None,
640 buf, len(buf))
641 if ok:
642 return buf.value
643 # return None if the filesystem is unknown
644
645 def test_large_time(self):
646 # Many filesystems are limited to the year 2038. At least, the test
647 # pass with NTFS filesystem.
648 if self.get_file_system(self.dirname) != "NTFS":
649 self.skipTest("requires NTFS")
650
651 large = 5000000000 # some day in 2128
652 os.utime(self.fname, (large, large))
653 self.assertEqual(os.stat(self.fname).st_mtime, large)
654
655 def test_utime_invalid_arguments(self):
656 # seconds and nanoseconds parameters are mutually exclusive
657 with self.assertRaises(ValueError):
658 os.utime(self.fname, (5, 5), ns=(5, 5))
659
660
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000661from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000662
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000663class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000664 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000665 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000666
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000667 def setUp(self):
668 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000669 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000670 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000671 for key, value in self._reference().items():
672 os.environ[key] = value
673
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000674 def tearDown(self):
675 os.environ.clear()
676 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000677 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000678 os.environb.clear()
679 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000680
Christian Heimes90333392007-11-01 19:08:42 +0000681 def _reference(self):
682 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
683
684 def _empty_mapping(self):
685 os.environ.clear()
686 return os.environ
687
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000688 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200689 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
690 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000691 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000692 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300693 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200694 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300695 value = popen.read().strip()
696 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000697
Xavier de Gayed1415312016-07-22 12:15:29 +0200698 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
699 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000700 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200701 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
702 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300703 it = iter(popen)
704 self.assertEqual(next(it), "line1\n")
705 self.assertEqual(next(it), "line2\n")
706 self.assertEqual(next(it), "line3\n")
707 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000708
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000709 # Verify environ keys and values from the OS are of the
710 # correct str type.
711 def test_keyvalue_types(self):
712 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000713 self.assertEqual(type(key), str)
714 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000715
Christian Heimes90333392007-11-01 19:08:42 +0000716 def test_items(self):
717 for key, value in self._reference().items():
718 self.assertEqual(os.environ.get(key), value)
719
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000720 # Issue 7310
721 def test___repr__(self):
722 """Check that the repr() of os.environ looks like environ({...})."""
723 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000724 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
725 '{!r}: {!r}'.format(key, value)
726 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000727
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000728 def test_get_exec_path(self):
729 defpath_list = os.defpath.split(os.pathsep)
730 test_path = ['/monty', '/python', '', '/flying/circus']
731 test_env = {'PATH': os.pathsep.join(test_path)}
732
733 saved_environ = os.environ
734 try:
735 os.environ = dict(test_env)
736 # Test that defaulting to os.environ works.
737 self.assertSequenceEqual(test_path, os.get_exec_path())
738 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
739 finally:
740 os.environ = saved_environ
741
742 # No PATH environment variable
743 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
744 # Empty PATH environment variable
745 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
746 # Supplied PATH environment variable
747 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
748
Victor Stinnerb745a742010-05-18 17:17:23 +0000749 if os.supports_bytes_environ:
750 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000751 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000752 # ignore BytesWarning warning
753 with warnings.catch_warnings(record=True):
754 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000755 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000756 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000757 pass
758 else:
759 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000760
761 # bytes key and/or value
762 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
763 ['abc'])
764 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
765 ['abc'])
766 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
767 ['abc'])
768
769 @unittest.skipUnless(os.supports_bytes_environ,
770 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000771 def test_environb(self):
772 # os.environ -> os.environb
773 value = 'euro\u20ac'
774 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000775 value_bytes = value.encode(sys.getfilesystemencoding(),
776 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000777 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000778 msg = "U+20AC character is not encodable to %s" % (
779 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000780 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000781 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000782 self.assertEqual(os.environ['unicode'], value)
783 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000784
785 # os.environb -> os.environ
786 value = b'\xff'
787 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000788 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000789 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000790 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000791
Charles-François Natali2966f102011-11-26 11:32:46 +0100792 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
793 # #13415).
794 @support.requires_freebsd_version(7)
795 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100796 def test_unset_error(self):
797 if sys.platform == "win32":
798 # an environment variable is limited to 32,767 characters
799 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100800 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100801 else:
802 # "=" is not allowed in a variable name
803 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100804 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100805
Victor Stinner6d101392013-04-14 16:35:04 +0200806 def test_key_type(self):
807 missing = 'missingkey'
808 self.assertNotIn(missing, os.environ)
809
Victor Stinner839e5ea2013-04-14 16:43:03 +0200810 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200811 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200812 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200813 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200814
Victor Stinner839e5ea2013-04-14 16:43:03 +0200815 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200816 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200817 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200818 self.assertTrue(cm.exception.__suppress_context__)
819
Victor Stinner6d101392013-04-14 16:35:04 +0200820
Tim Petersc4e09402003-04-25 07:11:48 +0000821class WalkTests(unittest.TestCase):
822 """Tests for os.walk()."""
823
Victor Stinner0561c532015-03-12 10:28:24 +0100824 # Wrapper to hide minor differences between os.walk and os.fwalk
825 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200826 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200827 if 'follow_symlinks' in kwargs:
828 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200829 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100830
Charles-François Natali7372b062012-02-05 15:15:38 +0100831 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100832 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100833 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000834
835 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000836 # TESTFN/
837 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000838 # tmp1
839 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000840 # tmp2
841 # SUB11/ no kids
842 # SUB2/ a file kid and a dirsymlink kid
843 # tmp3
844 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200845 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000846 # TEST2/
847 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100848 self.walk_path = join(support.TESTFN, "TEST1")
849 self.sub1_path = join(self.walk_path, "SUB1")
850 self.sub11_path = join(self.sub1_path, "SUB11")
851 sub2_path = join(self.walk_path, "SUB2")
852 tmp1_path = join(self.walk_path, "tmp1")
853 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000854 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100855 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000856 t2_path = join(support.TESTFN, "TEST2")
857 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200858 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000859
860 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100861 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000862 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000863 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100864
Guido van Rossumd8faa362007-04-27 19:54:29 +0000865 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100866 with open(path, "x") as f:
867 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000868
Victor Stinner0561c532015-03-12 10:28:24 +0100869 if support.can_symlink():
870 os.symlink(os.path.abspath(t2_path), self.link_path)
871 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200872 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100873 else:
874 self.sub2_tree = (sub2_path, [], ["tmp3"])
875
876 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000877 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300878 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100879
Tim Petersc4e09402003-04-25 07:11:48 +0000880 self.assertEqual(len(all), 4)
881 # We can't know which order SUB1 and SUB2 will appear in.
882 # Not flipped: TESTFN, SUB1, SUB11, SUB2
883 # flipped: TESTFN, SUB2, SUB1, SUB11
884 flipped = all[0][1][0] != "SUB1"
885 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200886 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100887 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
888 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
889 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
890 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000891
Brett Cannon3f9183b2016-08-26 14:44:48 -0700892 def test_walk_prune(self, walk_path=None):
893 if walk_path is None:
894 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000895 # Prune the search.
896 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700897 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000898 all.append((root, dirs, files))
899 # Don't descend into SUB1.
900 if 'SUB1' in dirs:
901 # Note that this also mutates the dirs we appended to all!
902 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000903
Victor Stinner0561c532015-03-12 10:28:24 +0100904 self.assertEqual(len(all), 2)
905 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700906 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100907
908 all[1][-1].sort()
909 self.assertEqual(all[1], self.sub2_tree)
910
Brett Cannon3f9183b2016-08-26 14:44:48 -0700911 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700912 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700913
Victor Stinner0561c532015-03-12 10:28:24 +0100914 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000915 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100916 all = list(self.walk(self.walk_path, topdown=False))
917
Victor Stinner53b0a412016-03-26 01:12:36 +0100918 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000919 # We can't know which order SUB1 and SUB2 will appear in.
920 # Not flipped: SUB11, SUB1, SUB2, TESTFN
921 # flipped: SUB2, SUB11, SUB1, TESTFN
922 flipped = all[3][1][0] != "SUB1"
923 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200924 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100925 self.assertEqual(all[3],
926 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
927 self.assertEqual(all[flipped],
928 (self.sub11_path, [], []))
929 self.assertEqual(all[flipped + 1],
930 (self.sub1_path, ["SUB11"], ["tmp2"]))
931 self.assertEqual(all[2 - 2 * flipped],
932 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000933
Victor Stinner0561c532015-03-12 10:28:24 +0100934 def test_walk_symlink(self):
935 if not support.can_symlink():
936 self.skipTest("need symlink support")
937
938 # Walk, following symlinks.
939 walk_it = self.walk(self.walk_path, follow_symlinks=True)
940 for root, dirs, files in walk_it:
941 if root == self.link_path:
942 self.assertEqual(dirs, [])
943 self.assertEqual(files, ["tmp4"])
944 break
945 else:
946 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000947
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200948 def test_walk_bad_dir(self):
949 # Walk top-down.
950 errors = []
951 walk_it = self.walk(self.walk_path, onerror=errors.append)
952 root, dirs, files = next(walk_it)
953 self.assertFalse(errors)
954 dir1 = dirs[0]
955 dir1new = dir1 + '.new'
956 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
957 roots = [r for r, d, f in walk_it]
958 self.assertTrue(errors)
959 self.assertNotIn(os.path.join(root, dir1), roots)
960 self.assertNotIn(os.path.join(root, dir1new), roots)
961 for dir2 in dirs[1:]:
962 self.assertIn(os.path.join(root, dir2), roots)
963
Charles-François Natali7372b062012-02-05 15:15:38 +0100964
965@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
966class FwalkTests(WalkTests):
967 """Tests for os.fwalk()."""
968
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200969 def walk(self, top, **kwargs):
970 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100971 yield (root, dirs, files)
972
Larry Hastingsc48fe982012-06-25 04:49:05 -0700973 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
974 """
975 compare with walk() results.
976 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700977 walk_kwargs = walk_kwargs.copy()
978 fwalk_kwargs = fwalk_kwargs.copy()
979 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
980 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
981 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700982
Charles-François Natali7372b062012-02-05 15:15:38 +0100983 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700984 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100985 expected[root] = (set(dirs), set(files))
986
Larry Hastingsc48fe982012-06-25 04:49:05 -0700987 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100988 self.assertIn(root, expected)
989 self.assertEqual(expected[root], (set(dirs), set(files)))
990
Larry Hastingsc48fe982012-06-25 04:49:05 -0700991 def test_compare_to_walk(self):
992 kwargs = {'top': support.TESTFN}
993 self._compare_to_walk(kwargs, kwargs)
994
Charles-François Natali7372b062012-02-05 15:15:38 +0100995 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700996 try:
997 fd = os.open(".", os.O_RDONLY)
998 walk_kwargs = {'top': support.TESTFN}
999 fwalk_kwargs = walk_kwargs.copy()
1000 fwalk_kwargs['dir_fd'] = fd
1001 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1002 finally:
1003 os.close(fd)
1004
1005 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001006 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001007 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1008 args = support.TESTFN, topdown, None
1009 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001010 # check that the FD is valid
1011 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001012 # redundant check
1013 os.stat(rootfd)
1014 # check that listdir() returns consistent information
1015 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001016
1017 def test_fd_leak(self):
1018 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1019 # we both check that calling fwalk() a large number of times doesn't
1020 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1021 minfd = os.dup(1)
1022 os.close(minfd)
1023 for i in range(256):
1024 for x in os.fwalk(support.TESTFN):
1025 pass
1026 newfd = os.dup(1)
1027 self.addCleanup(os.close, newfd)
1028 self.assertEqual(newfd, minfd)
1029
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001030class BytesWalkTests(WalkTests):
1031 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001032 def setUp(self):
1033 super().setUp()
1034 self.stack = contextlib.ExitStack()
1035 if os.name == 'nt':
Victor Stinner923590e2016-03-24 09:11:48 +01001036 self.stack.enter_context(bytes_filename_warn(False))
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001037
1038 def tearDown(self):
1039 self.stack.close()
1040 super().tearDown()
1041
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001042 def walk(self, top, **kwargs):
1043 if 'follow_symlinks' in kwargs:
1044 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1045 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1046 root = os.fsdecode(broot)
1047 dirs = list(map(os.fsdecode, bdirs))
1048 files = list(map(os.fsdecode, bfiles))
1049 yield (root, dirs, files)
1050 bdirs[:] = list(map(os.fsencode, dirs))
1051 bfiles[:] = list(map(os.fsencode, files))
1052
Charles-François Natali7372b062012-02-05 15:15:38 +01001053
Guido van Rossume7ba4952007-06-06 23:52:48 +00001054class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001055 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001056 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001057
1058 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001059 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001060 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1061 os.makedirs(path) # Should work
1062 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1063 os.makedirs(path)
1064
1065 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001066 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001067 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1068 os.makedirs(path)
1069 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1070 'dir5', 'dir6')
1071 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001072
Terry Reedy5a22b652010-12-02 07:05:56 +00001073 def test_exist_ok_existing_directory(self):
1074 path = os.path.join(support.TESTFN, 'dir1')
1075 mode = 0o777
1076 old_mask = os.umask(0o022)
1077 os.makedirs(path, mode)
1078 self.assertRaises(OSError, os.makedirs, path, mode)
1079 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001080 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001081 os.makedirs(path, mode=mode, exist_ok=True)
1082 os.umask(old_mask)
1083
Martin Pantera82642f2015-11-19 04:48:44 +00001084 # Issue #25583: A drive root could raise PermissionError on Windows
1085 os.makedirs(os.path.abspath('/'), exist_ok=True)
1086
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001087 def test_exist_ok_s_isgid_directory(self):
1088 path = os.path.join(support.TESTFN, 'dir1')
1089 S_ISGID = stat.S_ISGID
1090 mode = 0o777
1091 old_mask = os.umask(0o022)
1092 try:
1093 existing_testfn_mode = stat.S_IMODE(
1094 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001095 try:
1096 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001097 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001098 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001099 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1100 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1101 # The os should apply S_ISGID from the parent dir for us, but
1102 # this test need not depend on that behavior. Be explicit.
1103 os.makedirs(path, mode | S_ISGID)
1104 # http://bugs.python.org/issue14992
1105 # Should not fail when the bit is already set.
1106 os.makedirs(path, mode, exist_ok=True)
1107 # remove the bit.
1108 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001109 # May work even when the bit is not already set when demanded.
1110 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001111 finally:
1112 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001113
1114 def test_exist_ok_existing_regular_file(self):
1115 base = support.TESTFN
1116 path = os.path.join(support.TESTFN, 'dir1')
1117 f = open(path, 'w')
1118 f.write('abc')
1119 f.close()
1120 self.assertRaises(OSError, os.makedirs, path)
1121 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1122 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1123 os.remove(path)
1124
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001125 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001126 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001127 'dir4', 'dir5', 'dir6')
1128 # If the tests failed, the bottom-most directory ('../dir6')
1129 # may not have been created, so we look for the outermost directory
1130 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001131 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001132 path = os.path.dirname(path)
1133
1134 os.removedirs(path)
1135
Andrew Svetlov405faed2012-12-25 12:18:09 +02001136
R David Murrayf2ad1732014-12-25 18:36:56 -05001137@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1138class ChownFileTests(unittest.TestCase):
1139
Berker Peksag036a71b2015-07-21 09:29:48 +03001140 @classmethod
1141 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001142 os.mkdir(support.TESTFN)
1143
1144 def test_chown_uid_gid_arguments_must_be_index(self):
1145 stat = os.stat(support.TESTFN)
1146 uid = stat.st_uid
1147 gid = stat.st_gid
1148 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1149 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1150 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1151 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1152 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1153
1154 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1155 def test_chown(self):
1156 gid_1, gid_2 = groups[:2]
1157 uid = os.stat(support.TESTFN).st_uid
1158 os.chown(support.TESTFN, uid, gid_1)
1159 gid = os.stat(support.TESTFN).st_gid
1160 self.assertEqual(gid, gid_1)
1161 os.chown(support.TESTFN, uid, gid_2)
1162 gid = os.stat(support.TESTFN).st_gid
1163 self.assertEqual(gid, gid_2)
1164
1165 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1166 "test needs root privilege and more than one user")
1167 def test_chown_with_root(self):
1168 uid_1, uid_2 = all_users[:2]
1169 gid = os.stat(support.TESTFN).st_gid
1170 os.chown(support.TESTFN, uid_1, gid)
1171 uid = os.stat(support.TESTFN).st_uid
1172 self.assertEqual(uid, uid_1)
1173 os.chown(support.TESTFN, uid_2, gid)
1174 uid = os.stat(support.TESTFN).st_uid
1175 self.assertEqual(uid, uid_2)
1176
1177 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1178 "test needs non-root account and more than one user")
1179 def test_chown_without_permission(self):
1180 uid_1, uid_2 = all_users[:2]
1181 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001182 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001183 os.chown(support.TESTFN, uid_1, gid)
1184 os.chown(support.TESTFN, uid_2, gid)
1185
Berker Peksag036a71b2015-07-21 09:29:48 +03001186 @classmethod
1187 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001188 os.rmdir(support.TESTFN)
1189
1190
Andrew Svetlov405faed2012-12-25 12:18:09 +02001191class RemoveDirsTests(unittest.TestCase):
1192 def setUp(self):
1193 os.makedirs(support.TESTFN)
1194
1195 def tearDown(self):
1196 support.rmtree(support.TESTFN)
1197
1198 def test_remove_all(self):
1199 dira = os.path.join(support.TESTFN, 'dira')
1200 os.mkdir(dira)
1201 dirb = os.path.join(dira, 'dirb')
1202 os.mkdir(dirb)
1203 os.removedirs(dirb)
1204 self.assertFalse(os.path.exists(dirb))
1205 self.assertFalse(os.path.exists(dira))
1206 self.assertFalse(os.path.exists(support.TESTFN))
1207
1208 def test_remove_partial(self):
1209 dira = os.path.join(support.TESTFN, 'dira')
1210 os.mkdir(dira)
1211 dirb = os.path.join(dira, 'dirb')
1212 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001213 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001214 os.removedirs(dirb)
1215 self.assertFalse(os.path.exists(dirb))
1216 self.assertTrue(os.path.exists(dira))
1217 self.assertTrue(os.path.exists(support.TESTFN))
1218
1219 def test_remove_nothing(self):
1220 dira = os.path.join(support.TESTFN, 'dira')
1221 os.mkdir(dira)
1222 dirb = os.path.join(dira, 'dirb')
1223 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001224 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001225 with self.assertRaises(OSError):
1226 os.removedirs(dirb)
1227 self.assertTrue(os.path.exists(dirb))
1228 self.assertTrue(os.path.exists(dira))
1229 self.assertTrue(os.path.exists(support.TESTFN))
1230
1231
Guido van Rossume7ba4952007-06-06 23:52:48 +00001232class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001233 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001234 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001235 f.write(b'hello')
1236 f.close()
1237 with open(os.devnull, 'rb') as f:
1238 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001239
Andrew Svetlov405faed2012-12-25 12:18:09 +02001240
Guido van Rossume7ba4952007-06-06 23:52:48 +00001241class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001242 def test_urandom_length(self):
1243 self.assertEqual(len(os.urandom(0)), 0)
1244 self.assertEqual(len(os.urandom(1)), 1)
1245 self.assertEqual(len(os.urandom(10)), 10)
1246 self.assertEqual(len(os.urandom(100)), 100)
1247 self.assertEqual(len(os.urandom(1000)), 1000)
1248
1249 def test_urandom_value(self):
1250 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001251 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001252 data2 = os.urandom(16)
1253 self.assertNotEqual(data1, data2)
1254
1255 def get_urandom_subprocess(self, count):
1256 code = '\n'.join((
1257 'import os, sys',
1258 'data = os.urandom(%s)' % count,
1259 'sys.stdout.buffer.write(data)',
1260 'sys.stdout.buffer.flush()'))
1261 out = assert_python_ok('-c', code)
1262 stdout = out[1]
1263 self.assertEqual(len(stdout), 16)
1264 return stdout
1265
1266 def test_urandom_subprocess(self):
1267 data1 = self.get_urandom_subprocess(16)
1268 data2 = self.get_urandom_subprocess(16)
1269 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001270
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001271
Victor Stinner9b1f4742016-09-06 16:18:52 -07001272@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1273class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001274 @classmethod
1275 def setUpClass(cls):
1276 try:
1277 os.getrandom(1)
1278 except OSError as exc:
1279 if exc.errno == errno.ENOSYS:
1280 # Python compiled on a more recent Linux version
1281 # than the current Linux kernel
1282 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1283 else:
1284 raise
1285
Victor Stinner9b1f4742016-09-06 16:18:52 -07001286 def test_getrandom_type(self):
1287 data = os.getrandom(16)
1288 self.assertIsInstance(data, bytes)
1289 self.assertEqual(len(data), 16)
1290
1291 def test_getrandom0(self):
1292 empty = os.getrandom(0)
1293 self.assertEqual(empty, b'')
1294
1295 def test_getrandom_random(self):
1296 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1297
1298 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1299 # resource /dev/random
1300
1301 def test_getrandom_nonblock(self):
1302 # The call must not fail. Check also that the flag exists
1303 try:
1304 os.getrandom(1, os.GRND_NONBLOCK)
1305 except BlockingIOError:
1306 # System urandom is not initialized yet
1307 pass
1308
1309 def test_getrandom_value(self):
1310 data1 = os.getrandom(16)
1311 data2 = os.getrandom(16)
1312 self.assertNotEqual(data1, data2)
1313
1314
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001315# os.urandom() doesn't use a file descriptor when it is implemented with the
1316# getentropy() function, the getrandom() function or the getrandom() syscall
1317OS_URANDOM_DONT_USE_FD = (
1318 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1319 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1320 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001321
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001322@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1323 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001324class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001325 @unittest.skipUnless(resource, "test requires the resource module")
1326 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001327 # Check urandom() failing when it is not able to open /dev/random.
1328 # We spawn a new process to make the test more robust (if getrlimit()
1329 # failed to restore the file descriptor limit after this, the whole
1330 # test suite would crash; this actually happened on the OS X Tiger
1331 # buildbot).
1332 code = """if 1:
1333 import errno
1334 import os
1335 import resource
1336
1337 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1338 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1339 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001340 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001341 except OSError as e:
1342 assert e.errno == errno.EMFILE, e.errno
1343 else:
1344 raise AssertionError("OSError not raised")
1345 """
1346 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001347
Antoine Pitroue472aea2014-04-26 14:33:03 +02001348 def test_urandom_fd_closed(self):
1349 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1350 # closed.
1351 code = """if 1:
1352 import os
1353 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001354 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001355 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001356 with test.support.SuppressCrashReport():
1357 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001358 sys.stdout.buffer.write(os.urandom(4))
1359 """
1360 rc, out, err = assert_python_ok('-Sc', code)
1361
1362 def test_urandom_fd_reopened(self):
1363 # Issue #21207: urandom() should detect its fd to /dev/urandom
1364 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001365 self.addCleanup(support.unlink, support.TESTFN)
1366 create_file(support.TESTFN, b"x" * 256)
1367
Antoine Pitroue472aea2014-04-26 14:33:03 +02001368 code = """if 1:
1369 import os
1370 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001371 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001372 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001373 with test.support.SuppressCrashReport():
1374 for fd in range(3, 256):
1375 try:
1376 os.close(fd)
1377 except OSError:
1378 pass
1379 else:
1380 # Found the urandom fd (XXX hopefully)
1381 break
1382 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001383 with open({TESTFN!r}, 'rb') as f:
1384 os.dup2(f.fileno(), fd)
1385 sys.stdout.buffer.write(os.urandom(4))
1386 sys.stdout.buffer.write(os.urandom(4))
1387 """.format(TESTFN=support.TESTFN)
1388 rc, out, err = assert_python_ok('-Sc', code)
1389 self.assertEqual(len(out), 8)
1390 self.assertNotEqual(out[0:4], out[4:8])
1391 rc, out2, err2 = assert_python_ok('-Sc', code)
1392 self.assertEqual(len(out2), 8)
1393 self.assertNotEqual(out2, out)
1394
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001395
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001396@contextlib.contextmanager
1397def _execvpe_mockup(defpath=None):
1398 """
1399 Stubs out execv and execve functions when used as context manager.
1400 Records exec calls. The mock execv and execve functions always raise an
1401 exception as they would normally never return.
1402 """
1403 # A list of tuples containing (function name, first arg, args)
1404 # of calls to execv or execve that have been made.
1405 calls = []
1406
1407 def mock_execv(name, *args):
1408 calls.append(('execv', name, args))
1409 raise RuntimeError("execv called")
1410
1411 def mock_execve(name, *args):
1412 calls.append(('execve', name, args))
1413 raise OSError(errno.ENOTDIR, "execve called")
1414
1415 try:
1416 orig_execv = os.execv
1417 orig_execve = os.execve
1418 orig_defpath = os.defpath
1419 os.execv = mock_execv
1420 os.execve = mock_execve
1421 if defpath is not None:
1422 os.defpath = defpath
1423 yield calls
1424 finally:
1425 os.execv = orig_execv
1426 os.execve = orig_execve
1427 os.defpath = orig_defpath
1428
Guido van Rossume7ba4952007-06-06 23:52:48 +00001429class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001430 @unittest.skipIf(USING_LINUXTHREADS,
1431 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001432 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001433 self.assertRaises(OSError, os.execvpe, 'no such app-',
1434 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001435
Thomas Heller6790d602007-08-30 17:15:14 +00001436 def test_execvpe_with_bad_arglist(self):
1437 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1438
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001439 @unittest.skipUnless(hasattr(os, '_execvpe'),
1440 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001441 def _test_internal_execvpe(self, test_type):
1442 program_path = os.sep + 'absolutepath'
1443 if test_type is bytes:
1444 program = b'executable'
1445 fullpath = os.path.join(os.fsencode(program_path), program)
1446 native_fullpath = fullpath
1447 arguments = [b'progname', 'arg1', 'arg2']
1448 else:
1449 program = 'executable'
1450 arguments = ['progname', 'arg1', 'arg2']
1451 fullpath = os.path.join(program_path, program)
1452 if os.name != "nt":
1453 native_fullpath = os.fsencode(fullpath)
1454 else:
1455 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001456 env = {'spam': 'beans'}
1457
Victor Stinnerb745a742010-05-18 17:17:23 +00001458 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001459 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001460 self.assertRaises(RuntimeError,
1461 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001462 self.assertEqual(len(calls), 1)
1463 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1464
Victor Stinnerb745a742010-05-18 17:17:23 +00001465 # test os._execvpe() with a relative path:
1466 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001467 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001468 self.assertRaises(OSError,
1469 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001470 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001471 self.assertSequenceEqual(calls[0],
1472 ('execve', native_fullpath, (arguments, env)))
1473
1474 # test os._execvpe() with a relative path:
1475 # os.get_exec_path() reads the 'PATH' variable
1476 with _execvpe_mockup() as calls:
1477 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001478 if test_type is bytes:
1479 env_path[b'PATH'] = program_path
1480 else:
1481 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001482 self.assertRaises(OSError,
1483 os._execvpe, program, arguments, env=env_path)
1484 self.assertEqual(len(calls), 1)
1485 self.assertSequenceEqual(calls[0],
1486 ('execve', native_fullpath, (arguments, env_path)))
1487
1488 def test_internal_execvpe_str(self):
1489 self._test_internal_execvpe(str)
1490 if os.name != "nt":
1491 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001492
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001493
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001495class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001496 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001497 try:
1498 os.stat(support.TESTFN)
1499 except FileNotFoundError:
1500 exists = False
1501 except OSError as exc:
1502 exists = True
1503 self.fail("file %s must not exist; os.stat failed with %s"
1504 % (support.TESTFN, exc))
1505 else:
1506 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001507
Thomas Wouters477c8d52006-05-27 19:21:47 +00001508 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001509 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001510
1511 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001512 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001513
1514 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001515 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001516
1517 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001518 self.addCleanup(support.unlink, support.TESTFN)
1519
Victor Stinnere77c9742016-03-25 10:28:23 +01001520 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001521 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001522
1523 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001524 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001525
Thomas Wouters477c8d52006-05-27 19:21:47 +00001526 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001527 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001528
Victor Stinnere77c9742016-03-25 10:28:23 +01001529
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001530class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001531 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001532 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1533 #singles.append("close")
1534 #We omit close because it doesn'r raise an exception on some platforms
1535 def get_single(f):
1536 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001537 if hasattr(os, f):
1538 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001539 return helper
1540 for f in singles:
1541 locals()["test_"+f] = get_single(f)
1542
Benjamin Peterson7522c742009-01-19 21:00:09 +00001543 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001544 try:
1545 f(support.make_bad_fd(), *args)
1546 except OSError as e:
1547 self.assertEqual(e.errno, errno.EBADF)
1548 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001549 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001550 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001551
Serhiy Storchaka43767632013-11-03 21:31:38 +02001552 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001553 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001554 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001555
Serhiy Storchaka43767632013-11-03 21:31:38 +02001556 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001557 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001558 fd = support.make_bad_fd()
1559 # Make sure none of the descriptors we are about to close are
1560 # currently valid (issue 6542).
1561 for i in range(10):
1562 try: os.fstat(fd+i)
1563 except OSError:
1564 pass
1565 else:
1566 break
1567 if i < 2:
1568 raise unittest.SkipTest(
1569 "Unable to acquire a range of invalid file descriptors")
1570 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001571
Serhiy Storchaka43767632013-11-03 21:31:38 +02001572 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001573 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001574 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001575
Serhiy Storchaka43767632013-11-03 21:31:38 +02001576 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001577 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001578 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001579
Serhiy Storchaka43767632013-11-03 21:31:38 +02001580 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001581 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001582 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001583
Serhiy Storchaka43767632013-11-03 21:31:38 +02001584 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001585 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001586 self.check(os.pathconf, "PC_NAME_MAX")
1587 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001588
Serhiy Storchaka43767632013-11-03 21:31:38 +02001589 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001590 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001591 self.check(os.truncate, 0)
1592 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001593
Serhiy Storchaka43767632013-11-03 21:31:38 +02001594 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001595 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001596 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001597
Serhiy Storchaka43767632013-11-03 21:31:38 +02001598 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001599 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001600 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001601
Victor Stinner57ddf782014-01-08 15:21:28 +01001602 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1603 def test_readv(self):
1604 buf = bytearray(10)
1605 self.check(os.readv, [buf])
1606
Serhiy Storchaka43767632013-11-03 21:31:38 +02001607 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001608 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001609 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001610
Serhiy Storchaka43767632013-11-03 21:31:38 +02001611 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001612 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001613 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001614
Victor Stinner57ddf782014-01-08 15:21:28 +01001615 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1616 def test_writev(self):
1617 self.check(os.writev, [b'abc'])
1618
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001619 def test_inheritable(self):
1620 self.check(os.get_inheritable)
1621 self.check(os.set_inheritable, True)
1622
1623 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1624 'needs os.get_blocking() and os.set_blocking()')
1625 def test_blocking(self):
1626 self.check(os.get_blocking)
1627 self.check(os.set_blocking, True)
1628
Brian Curtin1b9df392010-11-24 20:24:31 +00001629
1630class LinkTests(unittest.TestCase):
1631 def setUp(self):
1632 self.file1 = support.TESTFN
1633 self.file2 = os.path.join(support.TESTFN + "2")
1634
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001635 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001636 for file in (self.file1, self.file2):
1637 if os.path.exists(file):
1638 os.unlink(file)
1639
Brian Curtin1b9df392010-11-24 20:24:31 +00001640 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001641 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001642
Victor Stinner923590e2016-03-24 09:11:48 +01001643 with bytes_filename_warn(False):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001644 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001645 with open(file1, "r") as f1, open(file2, "r") as f2:
1646 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1647
1648 def test_link(self):
1649 self._test_link(self.file1, self.file2)
1650
1651 def test_link_bytes(self):
1652 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1653 bytes(self.file2, sys.getfilesystemencoding()))
1654
Brian Curtinf498b752010-11-30 15:54:04 +00001655 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001656 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001657 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001658 except UnicodeError:
1659 raise unittest.SkipTest("Unable to encode for this platform.")
1660
Brian Curtinf498b752010-11-30 15:54:04 +00001661 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001662 self.file2 = self.file1 + "2"
1663 self._test_link(self.file1, self.file2)
1664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1666class PosixUidGidTests(unittest.TestCase):
1667 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1668 def test_setuid(self):
1669 if os.getuid() != 0:
1670 self.assertRaises(OSError, os.setuid, 0)
1671 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001672
Serhiy Storchaka43767632013-11-03 21:31:38 +02001673 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1674 def test_setgid(self):
1675 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1676 self.assertRaises(OSError, os.setgid, 0)
1677 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001678
Serhiy Storchaka43767632013-11-03 21:31:38 +02001679 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1680 def test_seteuid(self):
1681 if os.getuid() != 0:
1682 self.assertRaises(OSError, os.seteuid, 0)
1683 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001684
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1686 def test_setegid(self):
1687 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1688 self.assertRaises(OSError, os.setegid, 0)
1689 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001690
Serhiy Storchaka43767632013-11-03 21:31:38 +02001691 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1692 def test_setreuid(self):
1693 if os.getuid() != 0:
1694 self.assertRaises(OSError, os.setreuid, 0, 0)
1695 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1696 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001697
Serhiy Storchaka43767632013-11-03 21:31:38 +02001698 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1699 def test_setreuid_neg1(self):
1700 # Needs to accept -1. We run this in a subprocess to avoid
1701 # altering the test runner's process state (issue8045).
1702 subprocess.check_call([
1703 sys.executable, '-c',
1704 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001705
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1707 def test_setregid(self):
1708 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1709 self.assertRaises(OSError, os.setregid, 0, 0)
1710 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1711 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001712
Serhiy Storchaka43767632013-11-03 21:31:38 +02001713 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1714 def test_setregid_neg1(self):
1715 # Needs to accept -1. We run this in a subprocess to avoid
1716 # altering the test runner's process state (issue8045).
1717 subprocess.check_call([
1718 sys.executable, '-c',
1719 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001720
Serhiy Storchaka43767632013-11-03 21:31:38 +02001721@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1722class Pep383Tests(unittest.TestCase):
1723 def setUp(self):
1724 if support.TESTFN_UNENCODABLE:
1725 self.dir = support.TESTFN_UNENCODABLE
1726 elif support.TESTFN_NONASCII:
1727 self.dir = support.TESTFN_NONASCII
1728 else:
1729 self.dir = support.TESTFN
1730 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001731
Serhiy Storchaka43767632013-11-03 21:31:38 +02001732 bytesfn = []
1733 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001734 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001735 fn = os.fsencode(fn)
1736 except UnicodeEncodeError:
1737 return
1738 bytesfn.append(fn)
1739 add_filename(support.TESTFN_UNICODE)
1740 if support.TESTFN_UNENCODABLE:
1741 add_filename(support.TESTFN_UNENCODABLE)
1742 if support.TESTFN_NONASCII:
1743 add_filename(support.TESTFN_NONASCII)
1744 if not bytesfn:
1745 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001746
Serhiy Storchaka43767632013-11-03 21:31:38 +02001747 self.unicodefn = set()
1748 os.mkdir(self.dir)
1749 try:
1750 for fn in bytesfn:
1751 support.create_empty_file(os.path.join(self.bdir, fn))
1752 fn = os.fsdecode(fn)
1753 if fn in self.unicodefn:
1754 raise ValueError("duplicate filename")
1755 self.unicodefn.add(fn)
1756 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001757 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001758 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001759
Serhiy Storchaka43767632013-11-03 21:31:38 +02001760 def tearDown(self):
1761 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001762
Serhiy Storchaka43767632013-11-03 21:31:38 +02001763 def test_listdir(self):
1764 expected = self.unicodefn
1765 found = set(os.listdir(self.dir))
1766 self.assertEqual(found, expected)
1767 # test listdir without arguments
1768 current_directory = os.getcwd()
1769 try:
1770 os.chdir(os.sep)
1771 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1772 finally:
1773 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001774
Serhiy Storchaka43767632013-11-03 21:31:38 +02001775 def test_open(self):
1776 for fn in self.unicodefn:
1777 f = open(os.path.join(self.dir, fn), 'rb')
1778 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001779
Serhiy Storchaka43767632013-11-03 21:31:38 +02001780 @unittest.skipUnless(hasattr(os, 'statvfs'),
1781 "need os.statvfs()")
1782 def test_statvfs(self):
1783 # issue #9645
1784 for fn in self.unicodefn:
1785 # should not fail with file not found error
1786 fullname = os.path.join(self.dir, fn)
1787 os.statvfs(fullname)
1788
1789 def test_stat(self):
1790 for fn in self.unicodefn:
1791 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001792
Brian Curtineb24d742010-04-12 17:16:38 +00001793@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1794class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001795 def _kill(self, sig):
1796 # Start sys.executable as a subprocess and communicate from the
1797 # subprocess to the parent that the interpreter is ready. When it
1798 # becomes ready, send *sig* via os.kill to the subprocess and check
1799 # that the return code is equal to *sig*.
1800 import ctypes
1801 from ctypes import wintypes
1802 import msvcrt
1803
1804 # Since we can't access the contents of the process' stdout until the
1805 # process has exited, use PeekNamedPipe to see what's inside stdout
1806 # without waiting. This is done so we can tell that the interpreter
1807 # is started and running at a point where it could handle a signal.
1808 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1809 PeekNamedPipe.restype = wintypes.BOOL
1810 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1811 ctypes.POINTER(ctypes.c_char), # stdout buf
1812 wintypes.DWORD, # Buffer size
1813 ctypes.POINTER(wintypes.DWORD), # bytes read
1814 ctypes.POINTER(wintypes.DWORD), # bytes avail
1815 ctypes.POINTER(wintypes.DWORD)) # bytes left
1816 msg = "running"
1817 proc = subprocess.Popen([sys.executable, "-c",
1818 "import sys;"
1819 "sys.stdout.write('{}');"
1820 "sys.stdout.flush();"
1821 "input()".format(msg)],
1822 stdout=subprocess.PIPE,
1823 stderr=subprocess.PIPE,
1824 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001825 self.addCleanup(proc.stdout.close)
1826 self.addCleanup(proc.stderr.close)
1827 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001828
1829 count, max = 0, 100
1830 while count < max and proc.poll() is None:
1831 # Create a string buffer to store the result of stdout from the pipe
1832 buf = ctypes.create_string_buffer(len(msg))
1833 # Obtain the text currently in proc.stdout
1834 # Bytes read/avail/left are left as NULL and unused
1835 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1836 buf, ctypes.sizeof(buf), None, None, None)
1837 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1838 if buf.value:
1839 self.assertEqual(msg, buf.value.decode())
1840 break
1841 time.sleep(0.1)
1842 count += 1
1843 else:
1844 self.fail("Did not receive communication from the subprocess")
1845
Brian Curtineb24d742010-04-12 17:16:38 +00001846 os.kill(proc.pid, sig)
1847 self.assertEqual(proc.wait(), sig)
1848
1849 def test_kill_sigterm(self):
1850 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001851 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001852
1853 def test_kill_int(self):
1854 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001855 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001856
1857 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001858 tagname = "test_os_%s" % uuid.uuid1()
1859 m = mmap.mmap(-1, 1, tagname)
1860 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001861 # Run a script which has console control handling enabled.
1862 proc = subprocess.Popen([sys.executable,
1863 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001864 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001865 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1866 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001867 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001868 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001869 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001870 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001871 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001872 count += 1
1873 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001874 # Forcefully kill the process if we weren't able to signal it.
1875 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001876 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001877 os.kill(proc.pid, event)
1878 # proc.send_signal(event) could also be done here.
1879 # Allow time for the signal to be passed and the process to exit.
1880 time.sleep(0.5)
1881 if not proc.poll():
1882 # Forcefully kill the process if we weren't able to signal it.
1883 os.kill(proc.pid, signal.SIGINT)
1884 self.fail("subprocess did not stop on {}".format(name))
1885
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001886 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001887 def test_CTRL_C_EVENT(self):
1888 from ctypes import wintypes
1889 import ctypes
1890
1891 # Make a NULL value by creating a pointer with no argument.
1892 NULL = ctypes.POINTER(ctypes.c_int)()
1893 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1894 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1895 wintypes.BOOL)
1896 SetConsoleCtrlHandler.restype = wintypes.BOOL
1897
1898 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001899 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001900 # by subprocesses.
1901 SetConsoleCtrlHandler(NULL, 0)
1902
1903 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1904
1905 def test_CTRL_BREAK_EVENT(self):
1906 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1907
1908
Brian Curtind40e6f72010-07-08 21:39:08 +00001909@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001910class Win32ListdirTests(unittest.TestCase):
1911 """Test listdir on Windows."""
1912
1913 def setUp(self):
1914 self.created_paths = []
1915 for i in range(2):
1916 dir_name = 'SUB%d' % i
1917 dir_path = os.path.join(support.TESTFN, dir_name)
1918 file_name = 'FILE%d' % i
1919 file_path = os.path.join(support.TESTFN, file_name)
1920 os.makedirs(dir_path)
1921 with open(file_path, 'w') as f:
1922 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1923 self.created_paths.extend([dir_name, file_name])
1924 self.created_paths.sort()
1925
1926 def tearDown(self):
1927 shutil.rmtree(support.TESTFN)
1928
1929 def test_listdir_no_extended_path(self):
1930 """Test when the path is not an "extended" path."""
1931 # unicode
1932 self.assertEqual(
1933 sorted(os.listdir(support.TESTFN)),
1934 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001935
Tim Golden781bbeb2013-10-25 20:24:06 +01001936 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001937 with bytes_filename_warn(False):
1938 self.assertEqual(
1939 sorted(os.listdir(os.fsencode(support.TESTFN))),
1940 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001941
1942 def test_listdir_extended_path(self):
1943 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001944 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001945 # unicode
1946 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1947 self.assertEqual(
1948 sorted(os.listdir(path)),
1949 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001950
Tim Golden781bbeb2013-10-25 20:24:06 +01001951 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001952 with bytes_filename_warn(False):
1953 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1954 self.assertEqual(
1955 sorted(os.listdir(path)),
1956 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001957
1958
1959@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001960@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001961class Win32SymlinkTests(unittest.TestCase):
1962 filelink = 'filelinktest'
1963 filelink_target = os.path.abspath(__file__)
1964 dirlink = 'dirlinktest'
1965 dirlink_target = os.path.dirname(filelink_target)
1966 missing_link = 'missing link'
1967
1968 def setUp(self):
1969 assert os.path.exists(self.dirlink_target)
1970 assert os.path.exists(self.filelink_target)
1971 assert not os.path.exists(self.dirlink)
1972 assert not os.path.exists(self.filelink)
1973 assert not os.path.exists(self.missing_link)
1974
1975 def tearDown(self):
1976 if os.path.exists(self.filelink):
1977 os.remove(self.filelink)
1978 if os.path.exists(self.dirlink):
1979 os.rmdir(self.dirlink)
1980 if os.path.lexists(self.missing_link):
1981 os.remove(self.missing_link)
1982
1983 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001984 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001985 self.assertTrue(os.path.exists(self.dirlink))
1986 self.assertTrue(os.path.isdir(self.dirlink))
1987 self.assertTrue(os.path.islink(self.dirlink))
1988 self.check_stat(self.dirlink, self.dirlink_target)
1989
1990 def test_file_link(self):
1991 os.symlink(self.filelink_target, self.filelink)
1992 self.assertTrue(os.path.exists(self.filelink))
1993 self.assertTrue(os.path.isfile(self.filelink))
1994 self.assertTrue(os.path.islink(self.filelink))
1995 self.check_stat(self.filelink, self.filelink_target)
1996
1997 def _create_missing_dir_link(self):
1998 'Create a "directory" link to a non-existent target'
1999 linkname = self.missing_link
2000 if os.path.lexists(linkname):
2001 os.remove(linkname)
2002 target = r'c:\\target does not exist.29r3c740'
2003 assert not os.path.exists(target)
2004 target_is_dir = True
2005 os.symlink(target, linkname, target_is_dir)
2006
2007 def test_remove_directory_link_to_missing_target(self):
2008 self._create_missing_dir_link()
2009 # For compatibility with Unix, os.remove will check the
2010 # directory status and call RemoveDirectory if the symlink
2011 # was created with target_is_dir==True.
2012 os.remove(self.missing_link)
2013
2014 @unittest.skip("currently fails; consider for improvement")
2015 def test_isdir_on_directory_link_to_missing_target(self):
2016 self._create_missing_dir_link()
2017 # consider having isdir return true for directory links
2018 self.assertTrue(os.path.isdir(self.missing_link))
2019
2020 @unittest.skip("currently fails; consider for improvement")
2021 def test_rmdir_on_directory_link_to_missing_target(self):
2022 self._create_missing_dir_link()
2023 # consider allowing rmdir to remove directory links
2024 os.rmdir(self.missing_link)
2025
2026 def check_stat(self, link, target):
2027 self.assertEqual(os.stat(link), os.stat(target))
2028 self.assertNotEqual(os.lstat(link), os.stat(link))
2029
Brian Curtind25aef52011-06-13 15:16:04 -05002030 bytes_link = os.fsencode(link)
Victor Stinner923590e2016-03-24 09:11:48 +01002031 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002032 self.assertEqual(os.stat(bytes_link), os.stat(target))
Victor Stinner923590e2016-03-24 09:11:48 +01002033 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002034 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002035
2036 def test_12084(self):
2037 level1 = os.path.abspath(support.TESTFN)
2038 level2 = os.path.join(level1, "level2")
2039 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002040 self.addCleanup(support.rmtree, level1)
2041
2042 os.mkdir(level1)
2043 os.mkdir(level2)
2044 os.mkdir(level3)
2045
2046 file1 = os.path.abspath(os.path.join(level1, "file1"))
2047 create_file(file1)
2048
2049 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002050 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002051 os.chdir(level2)
2052 link = os.path.join(level2, "link")
2053 os.symlink(os.path.relpath(file1), "link")
2054 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002055
Victor Stinnerae39d232016-03-24 17:12:55 +01002056 # Check os.stat calls from the same dir as the link
2057 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002058
Victor Stinnerae39d232016-03-24 17:12:55 +01002059 # Check os.stat calls from a dir below the link
2060 os.chdir(level1)
2061 self.assertEqual(os.stat(file1),
2062 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002063
Victor Stinnerae39d232016-03-24 17:12:55 +01002064 # Check os.stat calls from a dir above the link
2065 os.chdir(level3)
2066 self.assertEqual(os.stat(file1),
2067 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002068 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002069 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002070
Brian Curtind40e6f72010-07-08 21:39:08 +00002071
Tim Golden0321cf22014-05-05 19:46:17 +01002072@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2073class Win32JunctionTests(unittest.TestCase):
2074 junction = 'junctiontest'
2075 junction_target = os.path.dirname(os.path.abspath(__file__))
2076
2077 def setUp(self):
2078 assert os.path.exists(self.junction_target)
2079 assert not os.path.exists(self.junction)
2080
2081 def tearDown(self):
2082 if os.path.exists(self.junction):
2083 # os.rmdir delegates to Windows' RemoveDirectoryW,
2084 # which removes junction points safely.
2085 os.rmdir(self.junction)
2086
2087 def test_create_junction(self):
2088 _winapi.CreateJunction(self.junction_target, self.junction)
2089 self.assertTrue(os.path.exists(self.junction))
2090 self.assertTrue(os.path.isdir(self.junction))
2091
2092 # Junctions are not recognized as links.
2093 self.assertFalse(os.path.islink(self.junction))
2094
2095 def test_unlink_removes_junction(self):
2096 _winapi.CreateJunction(self.junction_target, self.junction)
2097 self.assertTrue(os.path.exists(self.junction))
2098
2099 os.unlink(self.junction)
2100 self.assertFalse(os.path.exists(self.junction))
2101
2102
Jason R. Coombs3a092862013-05-27 23:21:28 -04002103@support.skip_unless_symlink
2104class NonLocalSymlinkTests(unittest.TestCase):
2105
2106 def setUp(self):
2107 """
2108 Create this structure:
2109
2110 base
2111 \___ some_dir
2112 """
2113 os.makedirs('base/some_dir')
2114
2115 def tearDown(self):
2116 shutil.rmtree('base')
2117
2118 def test_directory_link_nonlocal(self):
2119 """
2120 The symlink target should resolve relative to the link, not relative
2121 to the current directory.
2122
2123 Then, link base/some_link -> base/some_dir and ensure that some_link
2124 is resolved as a directory.
2125
2126 In issue13772, it was discovered that directory detection failed if
2127 the symlink target was not specified relative to the current
2128 directory, which was a defect in the implementation.
2129 """
2130 src = os.path.join('base', 'some_link')
2131 os.symlink('some_dir', src)
2132 assert os.path.isdir(src)
2133
2134
Victor Stinnere8d51452010-08-19 01:05:19 +00002135class FSEncodingTests(unittest.TestCase):
2136 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002137 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2138 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002139
Victor Stinnere8d51452010-08-19 01:05:19 +00002140 def test_identity(self):
2141 # assert fsdecode(fsencode(x)) == x
2142 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2143 try:
2144 bytesfn = os.fsencode(fn)
2145 except UnicodeEncodeError:
2146 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002147 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002148
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002149
Brett Cannonefb00c02012-02-29 18:31:31 -05002150
2151class DeviceEncodingTests(unittest.TestCase):
2152
2153 def test_bad_fd(self):
2154 # Return None when an fd doesn't actually exist.
2155 self.assertIsNone(os.device_encoding(123456))
2156
Philip Jenveye308b7c2012-02-29 16:16:15 -08002157 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2158 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002159 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002160 def test_device_encoding(self):
2161 encoding = os.device_encoding(0)
2162 self.assertIsNotNone(encoding)
2163 self.assertTrue(codecs.lookup(encoding))
2164
2165
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002166class PidTests(unittest.TestCase):
2167 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2168 def test_getppid(self):
2169 p = subprocess.Popen([sys.executable, '-c',
2170 'import os; print(os.getppid())'],
2171 stdout=subprocess.PIPE)
2172 stdout, _ = p.communicate()
2173 # We are the parent of our subprocess
2174 self.assertEqual(int(stdout), os.getpid())
2175
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002176 def test_waitpid(self):
2177 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002178 # Add an implicit test for PyUnicode_FSConverter().
2179 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002180 status = os.waitpid(pid, 0)
2181 self.assertEqual(status, (pid, 0))
2182
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002183
Brian Curtin0151b8e2010-09-24 13:43:43 +00002184# The introduction of this TestCase caused at least two different errors on
2185# *nix buildbots. Temporarily skip this to let the buildbots move along.
2186@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002187@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2188class LoginTests(unittest.TestCase):
2189 def test_getlogin(self):
2190 user_name = os.getlogin()
2191 self.assertNotEqual(len(user_name), 0)
2192
2193
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002194@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2195 "needs os.getpriority and os.setpriority")
2196class ProgramPriorityTests(unittest.TestCase):
2197 """Tests for os.getpriority() and os.setpriority()."""
2198
2199 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002200
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002201 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2202 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2203 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002204 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2205 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002206 raise unittest.SkipTest("unable to reliably test setpriority "
2207 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002208 else:
2209 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002210 finally:
2211 try:
2212 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2213 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002214 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002215 raise
2216
2217
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002218if threading is not None:
2219 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002220
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002221 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002222
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002223 def __init__(self, conn):
2224 asynchat.async_chat.__init__(self, conn)
2225 self.in_buffer = []
2226 self.closed = False
2227 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002228
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002229 def handle_read(self):
2230 data = self.recv(4096)
2231 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002232
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002233 def get_data(self):
2234 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002235
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002236 def handle_close(self):
2237 self.close()
2238 self.closed = True
2239
2240 def handle_error(self):
2241 raise
2242
2243 def __init__(self, address):
2244 threading.Thread.__init__(self)
2245 asyncore.dispatcher.__init__(self)
2246 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2247 self.bind(address)
2248 self.listen(5)
2249 self.host, self.port = self.socket.getsockname()[:2]
2250 self.handler_instance = None
2251 self._active = False
2252 self._active_lock = threading.Lock()
2253
2254 # --- public API
2255
2256 @property
2257 def running(self):
2258 return self._active
2259
2260 def start(self):
2261 assert not self.running
2262 self.__flag = threading.Event()
2263 threading.Thread.start(self)
2264 self.__flag.wait()
2265
2266 def stop(self):
2267 assert self.running
2268 self._active = False
2269 self.join()
2270
2271 def wait(self):
2272 # wait for handler connection to be closed, then stop the server
2273 while not getattr(self.handler_instance, "closed", False):
2274 time.sleep(0.001)
2275 self.stop()
2276
2277 # --- internals
2278
2279 def run(self):
2280 self._active = True
2281 self.__flag.set()
2282 while self._active and asyncore.socket_map:
2283 self._active_lock.acquire()
2284 asyncore.loop(timeout=0.001, count=1)
2285 self._active_lock.release()
2286 asyncore.close_all()
2287
2288 def handle_accept(self):
2289 conn, addr = self.accept()
2290 self.handler_instance = self.Handler(conn)
2291
2292 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002293 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002294 handle_read = handle_connect
2295
2296 def writable(self):
2297 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002298
2299 def handle_error(self):
2300 raise
2301
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002302
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002303@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002304@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2305class TestSendfile(unittest.TestCase):
2306
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002307 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002308 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002309 not sys.platform.startswith("solaris") and \
2310 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002311 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2312 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002313
2314 @classmethod
2315 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002316 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002317 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002318
2319 @classmethod
2320 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002321 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002322 support.unlink(support.TESTFN)
2323
2324 def setUp(self):
2325 self.server = SendfileTestServer((support.HOST, 0))
2326 self.server.start()
2327 self.client = socket.socket()
2328 self.client.connect((self.server.host, self.server.port))
2329 self.client.settimeout(1)
2330 # synchronize by waiting for "220 ready" response
2331 self.client.recv(1024)
2332 self.sockno = self.client.fileno()
2333 self.file = open(support.TESTFN, 'rb')
2334 self.fileno = self.file.fileno()
2335
2336 def tearDown(self):
2337 self.file.close()
2338 self.client.close()
2339 if self.server.running:
2340 self.server.stop()
2341
2342 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2343 """A higher level wrapper representing how an application is
2344 supposed to use sendfile().
2345 """
2346 while 1:
2347 try:
2348 if self.SUPPORT_HEADERS_TRAILERS:
2349 return os.sendfile(sock, file, offset, nbytes, headers,
2350 trailers)
2351 else:
2352 return os.sendfile(sock, file, offset, nbytes)
2353 except OSError as err:
2354 if err.errno == errno.ECONNRESET:
2355 # disconnected
2356 raise
2357 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2358 # we have to retry send data
2359 continue
2360 else:
2361 raise
2362
2363 def test_send_whole_file(self):
2364 # normal send
2365 total_sent = 0
2366 offset = 0
2367 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002368 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002369 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2370 if sent == 0:
2371 break
2372 offset += sent
2373 total_sent += sent
2374 self.assertTrue(sent <= nbytes)
2375 self.assertEqual(offset, total_sent)
2376
2377 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002378 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002379 self.client.close()
2380 self.server.wait()
2381 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002382 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002383 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002384
2385 def test_send_at_certain_offset(self):
2386 # start sending a file at a certain offset
2387 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002388 offset = len(self.DATA) // 2
2389 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002390 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002391 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002392 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2393 if sent == 0:
2394 break
2395 offset += sent
2396 total_sent += sent
2397 self.assertTrue(sent <= nbytes)
2398
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002399 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002400 self.client.close()
2401 self.server.wait()
2402 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002403 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002404 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002405 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002406 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002407
2408 def test_offset_overflow(self):
2409 # specify an offset > file size
2410 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002411 try:
2412 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2413 except OSError as e:
2414 # Solaris can raise EINVAL if offset >= file length, ignore.
2415 if e.errno != errno.EINVAL:
2416 raise
2417 else:
2418 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002419 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002420 self.client.close()
2421 self.server.wait()
2422 data = self.server.handler_instance.get_data()
2423 self.assertEqual(data, b'')
2424
2425 def test_invalid_offset(self):
2426 with self.assertRaises(OSError) as cm:
2427 os.sendfile(self.sockno, self.fileno, -1, 4096)
2428 self.assertEqual(cm.exception.errno, errno.EINVAL)
2429
Martin Panterbf19d162015-09-09 01:01:13 +00002430 def test_keywords(self):
2431 # Keyword arguments should be supported
2432 os.sendfile(out=self.sockno, offset=0, count=4096,
2433 **{'in': self.fileno})
2434 if self.SUPPORT_HEADERS_TRAILERS:
2435 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002436 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002437
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002438 # --- headers / trailers tests
2439
Serhiy Storchaka43767632013-11-03 21:31:38 +02002440 @requires_headers_trailers
2441 def test_headers(self):
2442 total_sent = 0
2443 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2444 headers=[b"x" * 512])
2445 total_sent += sent
2446 offset = 4096
2447 nbytes = 4096
2448 while 1:
2449 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2450 offset, nbytes)
2451 if sent == 0:
2452 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002453 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002454 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002455
Serhiy Storchaka43767632013-11-03 21:31:38 +02002456 expected_data = b"x" * 512 + self.DATA
2457 self.assertEqual(total_sent, len(expected_data))
2458 self.client.close()
2459 self.server.wait()
2460 data = self.server.handler_instance.get_data()
2461 self.assertEqual(hash(data), hash(expected_data))
2462
2463 @requires_headers_trailers
2464 def test_trailers(self):
2465 TESTFN2 = support.TESTFN + "2"
2466 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002467
2468 self.addCleanup(support.unlink, TESTFN2)
2469 create_file(TESTFN2, file_data)
2470
2471 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002472 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2473 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002474 self.client.close()
2475 self.server.wait()
2476 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002477 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002478
Serhiy Storchaka43767632013-11-03 21:31:38 +02002479 @requires_headers_trailers
2480 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2481 'test needs os.SF_NODISKIO')
2482 def test_flags(self):
2483 try:
2484 os.sendfile(self.sockno, self.fileno, 0, 4096,
2485 flags=os.SF_NODISKIO)
2486 except OSError as err:
2487 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2488 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002489
2490
Larry Hastings9cf065c2012-06-22 16:30:09 -07002491def supports_extended_attributes():
2492 if not hasattr(os, "setxattr"):
2493 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002494
Larry Hastings9cf065c2012-06-22 16:30:09 -07002495 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002496 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002497 try:
2498 os.setxattr(fp.fileno(), b"user.test", b"")
2499 except OSError:
2500 return False
2501 finally:
2502 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002503
2504 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002505
2506
2507@unittest.skipUnless(supports_extended_attributes(),
2508 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002509# Kernels < 2.6.39 don't respect setxattr flags.
2510@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002511class ExtendedAttributeTests(unittest.TestCase):
2512
Larry Hastings9cf065c2012-06-22 16:30:09 -07002513 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002514 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002515 self.addCleanup(support.unlink, fn)
2516 create_file(fn)
2517
Benjamin Peterson799bd802011-08-31 22:15:17 -04002518 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002519 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002520 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002521
Victor Stinnerf12e5062011-10-16 22:12:03 +02002522 init_xattr = listxattr(fn)
2523 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002524
Larry Hastings9cf065c2012-06-22 16:30:09 -07002525 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002526 xattr = set(init_xattr)
2527 xattr.add("user.test")
2528 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002529 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2530 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2531 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002532
Benjamin Peterson799bd802011-08-31 22:15:17 -04002533 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002534 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002535 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002536
Benjamin Peterson799bd802011-08-31 22:15:17 -04002537 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002538 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002539 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002540
Larry Hastings9cf065c2012-06-22 16:30:09 -07002541 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002542 xattr.add("user.test2")
2543 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002544 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002545
Benjamin Peterson799bd802011-08-31 22:15:17 -04002546 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002547 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002548 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002549
Victor Stinnerf12e5062011-10-16 22:12:03 +02002550 xattr.remove("user.test")
2551 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002552 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2553 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2554 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2555 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002556 many = sorted("user.test{}".format(i) for i in range(100))
2557 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002558 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002559 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002560
Larry Hastings9cf065c2012-06-22 16:30:09 -07002561 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002562 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002563 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002564
2565 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2566 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002567
2568 def test_simple(self):
2569 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2570 os.listxattr)
2571
2572 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002573 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2574 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002575
2576 def test_fds(self):
2577 def getxattr(path, *args):
2578 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002579 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002580 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002581 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002582 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002583 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002584 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002585 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002586 def listxattr(path, *args):
2587 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002588 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002589 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2590
2591
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002592@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2593class Win32DeprecatedBytesAPI(unittest.TestCase):
2594 def test_deprecated(self):
2595 import nt
2596 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002597 for func, *args in (
2598 (nt._getfullpathname, filename),
2599 (nt._isdir, filename),
2600 (os.access, filename, os.R_OK),
2601 (os.chdir, filename),
2602 (os.chmod, filename, 0o777),
2603 (os.getcwdb,),
2604 (os.link, filename, filename),
2605 (os.listdir, filename),
2606 (os.lstat, filename),
2607 (os.mkdir, filename),
2608 (os.open, filename, os.O_RDONLY),
2609 (os.rename, filename, filename),
2610 (os.rmdir, filename),
2611 (os.startfile, filename),
2612 (os.stat, filename),
2613 (os.unlink, filename),
2614 (os.utime, filename),
2615 ):
2616 with bytes_filename_warn(True):
2617 try:
2618 func(*args)
2619 except OSError:
2620 # ignore OSError, we only care about DeprecationWarning
2621 pass
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002622
Victor Stinner28216442011-11-16 00:34:44 +01002623 @support.skip_unless_symlink
2624 def test_symlink(self):
Victor Stinnere984eb52016-03-25 22:51:17 +01002625 self.addCleanup(support.unlink, support.TESTFN)
2626
Victor Stinner28216442011-11-16 00:34:44 +01002627 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002628 with bytes_filename_warn(True):
2629 os.symlink(filename, filename)
Victor Stinner28216442011-11-16 00:34:44 +01002630
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002631
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002632@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2633class TermsizeTests(unittest.TestCase):
2634 def test_does_not_crash(self):
2635 """Check if get_terminal_size() returns a meaningful value.
2636
2637 There's no easy portable way to actually check the size of the
2638 terminal, so let's check if it returns something sensible instead.
2639 """
2640 try:
2641 size = os.get_terminal_size()
2642 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002643 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002644 # Under win32 a generic OSError can be thrown if the
2645 # handle cannot be retrieved
2646 self.skipTest("failed to query terminal size")
2647 raise
2648
Antoine Pitroucfade362012-02-08 23:48:59 +01002649 self.assertGreaterEqual(size.columns, 0)
2650 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002651
2652 def test_stty_match(self):
2653 """Check if stty returns the same results
2654
2655 stty actually tests stdin, so get_terminal_size is invoked on
2656 stdin explicitly. If stty succeeded, then get_terminal_size()
2657 should work too.
2658 """
2659 try:
2660 size = subprocess.check_output(['stty', 'size']).decode().split()
2661 except (FileNotFoundError, subprocess.CalledProcessError):
2662 self.skipTest("stty invocation failed")
2663 expected = (int(size[1]), int(size[0])) # reversed order
2664
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002665 try:
2666 actual = os.get_terminal_size(sys.__stdin__.fileno())
2667 except OSError as e:
2668 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2669 # Under win32 a generic OSError can be thrown if the
2670 # handle cannot be retrieved
2671 self.skipTest("failed to query terminal size")
2672 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002673 self.assertEqual(expected, actual)
2674
2675
Victor Stinner292c8352012-10-30 02:17:38 +01002676class OSErrorTests(unittest.TestCase):
2677 def setUp(self):
2678 class Str(str):
2679 pass
2680
Victor Stinnerafe17062012-10-31 22:47:43 +01002681 self.bytes_filenames = []
2682 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002683 if support.TESTFN_UNENCODABLE is not None:
2684 decoded = support.TESTFN_UNENCODABLE
2685 else:
2686 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002687 self.unicode_filenames.append(decoded)
2688 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002689 if support.TESTFN_UNDECODABLE is not None:
2690 encoded = support.TESTFN_UNDECODABLE
2691 else:
2692 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002693 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002694 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002695 self.bytes_filenames.append(memoryview(encoded))
2696
2697 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002698
2699 def test_oserror_filename(self):
2700 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002701 (self.filenames, os.chdir,),
2702 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002703 (self.filenames, os.lstat,),
2704 (self.filenames, os.open, os.O_RDONLY),
2705 (self.filenames, os.rmdir,),
2706 (self.filenames, os.stat,),
2707 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002708 ]
2709 if sys.platform == "win32":
2710 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002711 (self.bytes_filenames, os.rename, b"dst"),
2712 (self.bytes_filenames, os.replace, b"dst"),
2713 (self.unicode_filenames, os.rename, "dst"),
2714 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002715 # Issue #16414: Don't test undecodable names with listdir()
2716 # because of a Windows bug.
2717 #
2718 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2719 # empty list (instead of failing), whereas os.listdir(b'\xff')
2720 # raises a FileNotFoundError. It looks like a Windows bug:
2721 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2722 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2723 # ERROR_PATH_NOT_FOUND (3).
2724 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002725 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002726 else:
2727 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002728 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002729 (self.filenames, os.rename, "dst"),
2730 (self.filenames, os.replace, "dst"),
2731 ))
2732 if hasattr(os, "chown"):
2733 funcs.append((self.filenames, os.chown, 0, 0))
2734 if hasattr(os, "lchown"):
2735 funcs.append((self.filenames, os.lchown, 0, 0))
2736 if hasattr(os, "truncate"):
2737 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002738 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002739 funcs.append((self.filenames, os.chflags, 0))
2740 if hasattr(os, "lchflags"):
2741 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002742 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002743 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002744 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002745 if sys.platform == "win32":
2746 funcs.append((self.bytes_filenames, os.link, b"dst"))
2747 funcs.append((self.unicode_filenames, os.link, "dst"))
2748 else:
2749 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002750 if hasattr(os, "listxattr"):
2751 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002752 (self.filenames, os.listxattr,),
2753 (self.filenames, os.getxattr, "user.test"),
2754 (self.filenames, os.setxattr, "user.test", b'user'),
2755 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002756 ))
2757 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002758 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002759 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002760 if sys.platform == "win32":
2761 funcs.append((self.unicode_filenames, os.readlink,))
2762 else:
2763 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002764
Victor Stinnerafe17062012-10-31 22:47:43 +01002765 for filenames, func, *func_args in funcs:
2766 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002767 try:
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002768 if isinstance(name, str):
Victor Stinner923590e2016-03-24 09:11:48 +01002769 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002770 elif isinstance(name, bytes):
2771 with bytes_filename_warn(False):
2772 func(name, *func_args)
2773 else:
2774 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2775 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002776 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002777 self.assertIs(err.filename, name)
2778 else:
2779 self.fail("No exception thrown by {}".format(func))
2780
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002781class CPUCountTests(unittest.TestCase):
2782 def test_cpu_count(self):
2783 cpus = os.cpu_count()
2784 if cpus is not None:
2785 self.assertIsInstance(cpus, int)
2786 self.assertGreater(cpus, 0)
2787 else:
2788 self.skipTest("Could not determine the number of CPUs")
2789
Victor Stinnerdaf45552013-08-28 00:53:59 +02002790
2791class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002792 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002793 fd = os.open(__file__, os.O_RDONLY)
2794 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002795 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002796
Victor Stinnerdaf45552013-08-28 00:53:59 +02002797 os.set_inheritable(fd, True)
2798 self.assertEqual(os.get_inheritable(fd), True)
2799
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002800 @unittest.skipIf(fcntl is None, "need fcntl")
2801 def test_get_inheritable_cloexec(self):
2802 fd = os.open(__file__, os.O_RDONLY)
2803 self.addCleanup(os.close, fd)
2804 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002805
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002806 # clear FD_CLOEXEC flag
2807 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2808 flags &= ~fcntl.FD_CLOEXEC
2809 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002810
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002811 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002812
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002813 @unittest.skipIf(fcntl is None, "need fcntl")
2814 def test_set_inheritable_cloexec(self):
2815 fd = os.open(__file__, os.O_RDONLY)
2816 self.addCleanup(os.close, fd)
2817 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2818 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002819
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002820 os.set_inheritable(fd, True)
2821 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2822 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002823
Victor Stinnerdaf45552013-08-28 00:53:59 +02002824 def test_open(self):
2825 fd = os.open(__file__, os.O_RDONLY)
2826 self.addCleanup(os.close, fd)
2827 self.assertEqual(os.get_inheritable(fd), False)
2828
2829 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2830 def test_pipe(self):
2831 rfd, wfd = os.pipe()
2832 self.addCleanup(os.close, rfd)
2833 self.addCleanup(os.close, wfd)
2834 self.assertEqual(os.get_inheritable(rfd), False)
2835 self.assertEqual(os.get_inheritable(wfd), False)
2836
2837 def test_dup(self):
2838 fd1 = os.open(__file__, os.O_RDONLY)
2839 self.addCleanup(os.close, fd1)
2840
2841 fd2 = os.dup(fd1)
2842 self.addCleanup(os.close, fd2)
2843 self.assertEqual(os.get_inheritable(fd2), False)
2844
2845 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2846 def test_dup2(self):
2847 fd = os.open(__file__, os.O_RDONLY)
2848 self.addCleanup(os.close, fd)
2849
2850 # inheritable by default
2851 fd2 = os.open(__file__, os.O_RDONLY)
2852 try:
2853 os.dup2(fd, fd2)
2854 self.assertEqual(os.get_inheritable(fd2), True)
2855 finally:
2856 os.close(fd2)
2857
2858 # force non-inheritable
2859 fd3 = os.open(__file__, os.O_RDONLY)
2860 try:
2861 os.dup2(fd, fd3, inheritable=False)
2862 self.assertEqual(os.get_inheritable(fd3), False)
2863 finally:
2864 os.close(fd3)
2865
2866 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2867 def test_openpty(self):
2868 master_fd, slave_fd = os.openpty()
2869 self.addCleanup(os.close, master_fd)
2870 self.addCleanup(os.close, slave_fd)
2871 self.assertEqual(os.get_inheritable(master_fd), False)
2872 self.assertEqual(os.get_inheritable(slave_fd), False)
2873
2874
Brett Cannon3f9183b2016-08-26 14:44:48 -07002875class PathTConverterTests(unittest.TestCase):
2876 # tuples of (function name, allows fd arguments, additional arguments to
2877 # function, cleanup function)
2878 functions = [
2879 ('stat', True, (), None),
2880 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07002881 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07002882 ('chflags', False, (0,), None),
2883 ('lchflags', False, (0,), None),
2884 ('open', False, (0,), getattr(os, 'close', None)),
2885 ]
2886
2887 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07002888 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07002889 if os.name == 'nt':
2890 bytes_fspath = bytes_filename = None
2891 else:
2892 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07002893 bytes_fspath = _PathLike(bytes_filename)
2894 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002895 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03002896 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002897
Brett Cannonec6ce872016-09-06 15:50:29 -07002898 int_fspath = _PathLike(fd)
2899 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002900
2901 for name, allow_fd, extra_args, cleanup_fn in self.functions:
2902 with self.subTest(name=name):
2903 try:
2904 fn = getattr(os, name)
2905 except AttributeError:
2906 continue
2907
Brett Cannon8f96a302016-08-26 19:30:11 -07002908 for path in (str_filename, bytes_filename, str_fspath,
2909 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07002910 if path is None:
2911 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07002912 with self.subTest(name=name, path=path):
2913 result = fn(path, *extra_args)
2914 if cleanup_fn is not None:
2915 cleanup_fn(result)
2916
2917 with self.assertRaisesRegex(
2918 TypeError, 'should be string, bytes'):
2919 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002920
2921 if allow_fd:
2922 result = fn(fd, *extra_args) # should not fail
2923 if cleanup_fn is not None:
2924 cleanup_fn(result)
2925 else:
2926 with self.assertRaisesRegex(
2927 TypeError,
2928 'os.PathLike'):
2929 fn(fd, *extra_args)
2930
2931
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002932@unittest.skipUnless(hasattr(os, 'get_blocking'),
2933 'needs os.get_blocking() and os.set_blocking()')
2934class BlockingTests(unittest.TestCase):
2935 def test_blocking(self):
2936 fd = os.open(__file__, os.O_RDONLY)
2937 self.addCleanup(os.close, fd)
2938 self.assertEqual(os.get_blocking(fd), True)
2939
2940 os.set_blocking(fd, False)
2941 self.assertEqual(os.get_blocking(fd), False)
2942
2943 os.set_blocking(fd, True)
2944 self.assertEqual(os.get_blocking(fd), True)
2945
2946
Yury Selivanov97e2e062014-09-26 12:33:06 -04002947
2948class ExportsTests(unittest.TestCase):
2949 def test_os_all(self):
2950 self.assertIn('open', os.__all__)
2951 self.assertIn('walk', os.__all__)
2952
2953
Victor Stinner6036e442015-03-08 01:58:04 +01002954class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02002955 check_no_resource_warning = support.check_no_resource_warning
2956
Victor Stinner6036e442015-03-08 01:58:04 +01002957 def setUp(self):
2958 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07002959 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01002960 self.addCleanup(support.rmtree, self.path)
2961 os.mkdir(self.path)
2962
2963 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07002964 path = self.bytes_path if isinstance(name, bytes) else self.path
2965 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01002966 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01002967 return filename
2968
2969 def get_entries(self, names):
2970 entries = dict((entry.name, entry)
2971 for entry in os.scandir(self.path))
2972 self.assertEqual(sorted(entries.keys()), names)
2973 return entries
2974
2975 def assert_stat_equal(self, stat1, stat2, skip_fields):
2976 if skip_fields:
2977 for attr in dir(stat1):
2978 if not attr.startswith("st_"):
2979 continue
2980 if attr in ("st_dev", "st_ino", "st_nlink"):
2981 continue
2982 self.assertEqual(getattr(stat1, attr),
2983 getattr(stat2, attr),
2984 (stat1, stat2, attr))
2985 else:
2986 self.assertEqual(stat1, stat2)
2987
2988 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07002989 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01002990 self.assertEqual(entry.name, name)
2991 self.assertEqual(entry.path, os.path.join(self.path, name))
2992 self.assertEqual(entry.inode(),
2993 os.stat(entry.path, follow_symlinks=False).st_ino)
2994
2995 entry_stat = os.stat(entry.path)
2996 self.assertEqual(entry.is_dir(),
2997 stat.S_ISDIR(entry_stat.st_mode))
2998 self.assertEqual(entry.is_file(),
2999 stat.S_ISREG(entry_stat.st_mode))
3000 self.assertEqual(entry.is_symlink(),
3001 os.path.islink(entry.path))
3002
3003 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3004 self.assertEqual(entry.is_dir(follow_symlinks=False),
3005 stat.S_ISDIR(entry_lstat.st_mode))
3006 self.assertEqual(entry.is_file(follow_symlinks=False),
3007 stat.S_ISREG(entry_lstat.st_mode))
3008
3009 self.assert_stat_equal(entry.stat(),
3010 entry_stat,
3011 os.name == 'nt' and not is_symlink)
3012 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3013 entry_lstat,
3014 os.name == 'nt')
3015
3016 def test_attributes(self):
3017 link = hasattr(os, 'link')
3018 symlink = support.can_symlink()
3019
3020 dirname = os.path.join(self.path, "dir")
3021 os.mkdir(dirname)
3022 filename = self.create_file("file.txt")
3023 if link:
3024 os.link(filename, os.path.join(self.path, "link_file.txt"))
3025 if symlink:
3026 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3027 target_is_directory=True)
3028 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3029
3030 names = ['dir', 'file.txt']
3031 if link:
3032 names.append('link_file.txt')
3033 if symlink:
3034 names.extend(('symlink_dir', 'symlink_file.txt'))
3035 entries = self.get_entries(names)
3036
3037 entry = entries['dir']
3038 self.check_entry(entry, 'dir', True, False, False)
3039
3040 entry = entries['file.txt']
3041 self.check_entry(entry, 'file.txt', False, True, False)
3042
3043 if link:
3044 entry = entries['link_file.txt']
3045 self.check_entry(entry, 'link_file.txt', False, True, False)
3046
3047 if symlink:
3048 entry = entries['symlink_dir']
3049 self.check_entry(entry, 'symlink_dir', True, False, True)
3050
3051 entry = entries['symlink_file.txt']
3052 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3053
3054 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003055 path = self.bytes_path if isinstance(name, bytes) else self.path
3056 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003057 self.assertEqual(len(entries), 1)
3058
3059 entry = entries[0]
3060 self.assertEqual(entry.name, name)
3061 return entry
3062
Brett Cannon96881cd2016-06-10 14:37:21 -07003063 def create_file_entry(self, name='file.txt'):
3064 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003065 return self.get_entry(os.path.basename(filename))
3066
3067 def test_current_directory(self):
3068 filename = self.create_file()
3069 old_dir = os.getcwd()
3070 try:
3071 os.chdir(self.path)
3072
3073 # call scandir() without parameter: it must list the content
3074 # of the current directory
3075 entries = dict((entry.name, entry) for entry in os.scandir())
3076 self.assertEqual(sorted(entries.keys()),
3077 [os.path.basename(filename)])
3078 finally:
3079 os.chdir(old_dir)
3080
3081 def test_repr(self):
3082 entry = self.create_file_entry()
3083 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3084
Brett Cannon96881cd2016-06-10 14:37:21 -07003085 def test_fspath_protocol(self):
3086 entry = self.create_file_entry()
3087 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3088
Martin Panter0c9ad592016-06-13 03:28:35 +00003089 @unittest.skipIf(os.name == "nt", "test requires bytes path support")
Brett Cannon96881cd2016-06-10 14:37:21 -07003090 def test_fspath_protocol_bytes(self):
3091 bytes_filename = os.fsencode('bytesfile.txt')
3092 bytes_entry = self.create_file_entry(name=bytes_filename)
3093 fspath = os.fspath(bytes_entry)
3094 self.assertIsInstance(fspath, bytes)
3095 self.assertEqual(fspath,
3096 os.path.join(os.fsencode(self.path),bytes_filename))
3097
Victor Stinner6036e442015-03-08 01:58:04 +01003098 def test_removed_dir(self):
3099 path = os.path.join(self.path, 'dir')
3100
3101 os.mkdir(path)
3102 entry = self.get_entry('dir')
3103 os.rmdir(path)
3104
3105 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3106 if os.name == 'nt':
3107 self.assertTrue(entry.is_dir())
3108 self.assertFalse(entry.is_file())
3109 self.assertFalse(entry.is_symlink())
3110 if os.name == 'nt':
3111 self.assertRaises(FileNotFoundError, entry.inode)
3112 # don't fail
3113 entry.stat()
3114 entry.stat(follow_symlinks=False)
3115 else:
3116 self.assertGreater(entry.inode(), 0)
3117 self.assertRaises(FileNotFoundError, entry.stat)
3118 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3119
3120 def test_removed_file(self):
3121 entry = self.create_file_entry()
3122 os.unlink(entry.path)
3123
3124 self.assertFalse(entry.is_dir())
3125 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3126 if os.name == 'nt':
3127 self.assertTrue(entry.is_file())
3128 self.assertFalse(entry.is_symlink())
3129 if os.name == 'nt':
3130 self.assertRaises(FileNotFoundError, entry.inode)
3131 # don't fail
3132 entry.stat()
3133 entry.stat(follow_symlinks=False)
3134 else:
3135 self.assertGreater(entry.inode(), 0)
3136 self.assertRaises(FileNotFoundError, entry.stat)
3137 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3138
3139 def test_broken_symlink(self):
3140 if not support.can_symlink():
3141 return self.skipTest('cannot create symbolic link')
3142
3143 filename = self.create_file("file.txt")
3144 os.symlink(filename,
3145 os.path.join(self.path, "symlink.txt"))
3146 entries = self.get_entries(['file.txt', 'symlink.txt'])
3147 entry = entries['symlink.txt']
3148 os.unlink(filename)
3149
3150 self.assertGreater(entry.inode(), 0)
3151 self.assertFalse(entry.is_dir())
3152 self.assertFalse(entry.is_file()) # broken symlink returns False
3153 self.assertFalse(entry.is_dir(follow_symlinks=False))
3154 self.assertFalse(entry.is_file(follow_symlinks=False))
3155 self.assertTrue(entry.is_symlink())
3156 self.assertRaises(FileNotFoundError, entry.stat)
3157 # don't fail
3158 entry.stat(follow_symlinks=False)
3159
3160 def test_bytes(self):
3161 if os.name == "nt":
3162 # On Windows, os.scandir(bytes) must raise an exception
Victor Stinner923590e2016-03-24 09:11:48 +01003163 with bytes_filename_warn(True):
3164 self.assertRaises(TypeError, os.scandir, b'.')
Victor Stinner6036e442015-03-08 01:58:04 +01003165 return
3166
3167 self.create_file("file.txt")
3168
3169 path_bytes = os.fsencode(self.path)
3170 entries = list(os.scandir(path_bytes))
3171 self.assertEqual(len(entries), 1, entries)
3172 entry = entries[0]
3173
3174 self.assertEqual(entry.name, b'file.txt')
3175 self.assertEqual(entry.path,
3176 os.fsencode(os.path.join(self.path, 'file.txt')))
3177
3178 def test_empty_path(self):
3179 self.assertRaises(FileNotFoundError, os.scandir, '')
3180
3181 def test_consume_iterator_twice(self):
3182 self.create_file("file.txt")
3183 iterator = os.scandir(self.path)
3184
3185 entries = list(iterator)
3186 self.assertEqual(len(entries), 1, entries)
3187
3188 # check than consuming the iterator twice doesn't raise exception
3189 entries2 = list(iterator)
3190 self.assertEqual(len(entries2), 0, entries2)
3191
3192 def test_bad_path_type(self):
3193 for obj in [1234, 1.234, {}, []]:
3194 self.assertRaises(TypeError, os.scandir, obj)
3195
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003196 def test_close(self):
3197 self.create_file("file.txt")
3198 self.create_file("file2.txt")
3199 iterator = os.scandir(self.path)
3200 next(iterator)
3201 iterator.close()
3202 # multiple closes
3203 iterator.close()
3204 with self.check_no_resource_warning():
3205 del iterator
3206
3207 def test_context_manager(self):
3208 self.create_file("file.txt")
3209 self.create_file("file2.txt")
3210 with os.scandir(self.path) as iterator:
3211 next(iterator)
3212 with self.check_no_resource_warning():
3213 del iterator
3214
3215 def test_context_manager_close(self):
3216 self.create_file("file.txt")
3217 self.create_file("file2.txt")
3218 with os.scandir(self.path) as iterator:
3219 next(iterator)
3220 iterator.close()
3221
3222 def test_context_manager_exception(self):
3223 self.create_file("file.txt")
3224 self.create_file("file2.txt")
3225 with self.assertRaises(ZeroDivisionError):
3226 with os.scandir(self.path) as iterator:
3227 next(iterator)
3228 1/0
3229 with self.check_no_resource_warning():
3230 del iterator
3231
3232 def test_resource_warning(self):
3233 self.create_file("file.txt")
3234 self.create_file("file2.txt")
3235 iterator = os.scandir(self.path)
3236 next(iterator)
3237 with self.assertWarns(ResourceWarning):
3238 del iterator
3239 support.gc_collect()
3240 # exhausted iterator
3241 iterator = os.scandir(self.path)
3242 list(iterator)
3243 with self.check_no_resource_warning():
3244 del iterator
3245
Victor Stinner6036e442015-03-08 01:58:04 +01003246
Ethan Furmancdc08792016-06-02 15:06:09 -07003247class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003248
3249 # Abstracted so it can be overridden to test pure Python implementation
3250 # if a C version is provided.
3251 fspath = staticmethod(os.fspath)
3252
Ethan Furmancdc08792016-06-02 15:06:09 -07003253 def test_return_bytes(self):
3254 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003255 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003256
3257 def test_return_string(self):
3258 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003259 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003260
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003261 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003262 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003263 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003264
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003265 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003266 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3267 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3268
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003269 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003270 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3271 self.assertTrue(issubclass(_PathLike, os.PathLike))
3272 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003273
Ethan Furmancdc08792016-06-02 15:06:09 -07003274 def test_garbage_in_exception_out(self):
3275 vapor = type('blah', (), {})
3276 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003277 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003278
3279 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003280 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003281
Brett Cannon044283a2016-07-15 10:41:49 -07003282 def test_bad_pathlike(self):
3283 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003284 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003285 # __fspath__ attribute that is not callable.
3286 c = type('foo', (), {})
3287 c.__fspath__ = 1
3288 self.assertRaises(TypeError, self.fspath, c())
3289 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003290 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003291 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003292
3293# Only test if the C version is provided, otherwise TestPEP519 already tested
3294# the pure Python implementation.
3295if hasattr(os, "_fspath"):
3296 class TestPEP519PurePython(TestPEP519):
3297
3298 """Explicitly test the pure Python implementation of os.fspath()."""
3299
3300 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003301
3302
Fred Drake2e2be372001-09-20 21:33:42 +00003303if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003304 unittest.main()