blob: 66f563465fe3f9c32d3d6fc02c6cdd854da00797 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner138adb82015-06-12 22:01:54 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner138adb82015-06-12 22:01:54 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner138adb82015-06-12 22:01:54 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner138adb82015-06-12 22:01:54 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner138adb82015-06-12 22:01:54 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner138adb82015-06-12 22:01:54 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200225
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226# Test attributes on return values from os.*stat* family.
227class StatAttributeTests(unittest.TestCase):
228 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 os.mkdir(support.TESTFN)
230 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000232 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235 def tearDown(self):
236 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000237 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238
Serhiy Storchaka43767632013-11-03 21:31:38 +0200239 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000240 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242
243 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(result[stat.ST_SIZE], 3)
245 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247 # Make sure all the attributes are there
248 members = dir(result)
249 for name in dir(stat):
250 if name[:3] == 'ST_':
251 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000252 if name.endswith("TIME"):
253 def trunc(x): return int(x)
254 else:
255 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
Larry Hastings6fe20b32012-04-19 15:07:49 -0700260 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700261 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 for name in 'st_atime st_mtime st_ctime'.split():
263 floaty = int(getattr(result, name) * 100000)
264 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700265 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700266
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 try:
268 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200269 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270 except IndexError:
271 pass
272
273 # Make sure that assignment fails
274 try:
275 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200276 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000277 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 pass
279
280 try:
281 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200282 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000283 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 pass
285
286 try:
287 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200288 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 except AttributeError:
290 pass
291
292 # Use the stat_result constructor with a too-short tuple.
293 try:
294 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except TypeError:
297 pass
298
Ezio Melotti42da6632011-03-15 05:18:48 +0200299 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 try:
301 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
302 except TypeError:
303 pass
304
Antoine Pitrou38425292010-09-21 18:19:07 +0000305 def test_stat_attributes(self):
306 self.check_stat_attributes(self.fname)
307
308 def test_stat_attributes_bytes(self):
309 try:
310 fname = self.fname.encode(sys.getfilesystemencoding())
311 except UnicodeEncodeError:
312 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100313 with warnings.catch_warnings():
314 warnings.simplefilter("ignore", DeprecationWarning)
315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 try:
330 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000331 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000332 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000333 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200334 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
336 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000337 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000338
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000339 # Make sure all the attributes are there.
340 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
341 'ffree', 'favail', 'flag', 'namemax')
342 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344
345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
374 try:
375 result = os.statvfs(self.fname)
376 except OSError as e:
377 # On AtheOS, glibc always returns ENOSYS
378 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200379 self.skipTest('os.statvfs() failed with ENOSYS')
380
Serhiy Storchakabad12572014-12-15 14:03:42 +0200381 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
382 p = pickle.dumps(result, proto)
383 self.assertIn(b'statvfs_result', p)
384 if proto < 4:
385 self.assertIn(b'cos\nstatvfs_result\n', p)
386 unpickled = pickle.loads(p)
387 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200388
Serhiy Storchaka43767632013-11-03 21:31:38 +0200389 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
390 def test_1686475(self):
391 # Verify that an open file can be stat'ed
392 try:
393 os.stat(r"c:\pagefile.sys")
394 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600395 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200396 except OSError as e:
397 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000398
Serhiy Storchaka43767632013-11-03 21:31:38 +0200399 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
400 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
401 def test_15261(self):
402 # Verify that stat'ing a closed fd does not cause crash
403 r, w = os.pipe()
404 try:
405 os.stat(r) # should not raise error
406 finally:
407 os.close(r)
408 os.close(w)
409 with self.assertRaises(OSError) as ctx:
410 os.stat(r)
411 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100412
Zachary Ware63f277b2014-06-19 09:46:37 -0500413 def check_file_attributes(self, result):
414 self.assertTrue(hasattr(result, 'st_file_attributes'))
415 self.assertTrue(isinstance(result.st_file_attributes, int))
416 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
417
418 @unittest.skipUnless(sys.platform == "win32",
419 "st_file_attributes is Win32 specific")
420 def test_file_attributes(self):
421 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
422 result = os.stat(self.fname)
423 self.check_file_attributes(result)
424 self.assertEqual(
425 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
426 0)
427
428 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
429 result = os.stat(support.TESTFN)
430 self.check_file_attributes(result)
431 self.assertEqual(
432 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
433 stat.FILE_ATTRIBUTE_DIRECTORY)
434
Victor Stinner138adb82015-06-12 22:01:54 +0200435
436class UtimeTests(unittest.TestCase):
437 def setUp(self):
438 self.dirname = support.TESTFN
439 self.fname = os.path.join(self.dirname, "f1")
440
441 self.addCleanup(support.rmtree, self.dirname)
442 os.mkdir(self.dirname)
443 with open(self.fname, 'wb') as fp:
444 fp.write(b"ABC")
445
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200446 def restore_float_times(state):
447 with warnings.catch_warnings():
448 warnings.simplefilter("ignore", DeprecationWarning)
449
450 os.stat_float_times(state)
451
Victor Stinner138adb82015-06-12 22:01:54 +0200452 # ensure that st_atime and st_mtime are float
453 with warnings.catch_warnings():
454 warnings.simplefilter("ignore", DeprecationWarning)
455
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200456 old_float_times = os.stat_float_times(-1)
457 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner138adb82015-06-12 22:01:54 +0200458
459 os.stat_float_times(True)
460
461 def support_subsecond(self, filename):
462 # Heuristic to check if the filesystem supports timestamp with
463 # subsecond resolution: check if float and int timestamps are different
464 st = os.stat(filename)
465 return ((st.st_atime != st[7])
466 or (st.st_mtime != st[8])
467 or (st.st_ctime != st[9]))
468
469 def _test_utime(self, set_time, filename=None):
470 if not filename:
471 filename = self.fname
472
473 support_subsecond = self.support_subsecond(filename)
474 if support_subsecond:
475 # Timestamp with a resolution of 1 microsecond (10^-6).
476 #
477 # The resolution of the C internal function used by os.utime()
478 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
479 # test with a resolution of 1 ns requires more work:
480 # see the issue #15745.
481 atime_ns = 1002003000 # 1.002003 seconds
482 mtime_ns = 4005006000 # 4.005006 seconds
483 else:
484 # use a resolution of 1 second
485 atime_ns = 5 * 10**9
486 mtime_ns = 8 * 10**9
487
488 set_time(filename, (atime_ns, mtime_ns))
489 st = os.stat(filename)
490
491 if support_subsecond:
492 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
493 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
494 else:
495 self.assertEqual(st.st_atime, atime_ns * 1e-9)
496 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
497 self.assertEqual(st.st_atime_ns, atime_ns)
498 self.assertEqual(st.st_mtime_ns, mtime_ns)
499
500 def test_utime(self):
501 def set_time(filename, ns):
502 # test the ns keyword parameter
503 os.utime(filename, ns=ns)
504 self._test_utime(set_time)
505
506 @staticmethod
507 def ns_to_sec(ns):
508 # Convert a number of nanosecond (int) to a number of seconds (float).
509 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
510 # issue, os.utime() rounds towards minus infinity.
511 return (ns * 1e-9) + 0.5e-9
512
513 def test_utime_by_indexed(self):
514 # pass times as floating point seconds as the second indexed parameter
515 def set_time(filename, ns):
516 atime_ns, mtime_ns = ns
517 atime = self.ns_to_sec(atime_ns)
518 mtime = self.ns_to_sec(mtime_ns)
519 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
520 # or utime(time_t)
521 os.utime(filename, (atime, mtime))
522 self._test_utime(set_time)
523
524 def test_utime_by_times(self):
525 def set_time(filename, ns):
526 atime_ns, mtime_ns = ns
527 atime = self.ns_to_sec(atime_ns)
528 mtime = self.ns_to_sec(mtime_ns)
529 # test the times keyword parameter
530 os.utime(filename, times=(atime, mtime))
531 self._test_utime(set_time)
532
533 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
534 "follow_symlinks support for utime required "
535 "for this test.")
536 def test_utime_nofollow_symlinks(self):
537 def set_time(filename, ns):
538 # use follow_symlinks=False to test utimensat(timespec)
539 # or lutimes(timeval)
540 os.utime(filename, ns=ns, follow_symlinks=False)
541 self._test_utime(set_time)
542
543 @unittest.skipUnless(os.utime in os.supports_fd,
544 "fd support for utime required for this test.")
545 def test_utime_fd(self):
546 def set_time(filename, ns):
547 with open(filename, 'wb') as fp:
548 # use a file descriptor to test futimens(timespec)
549 # or futimes(timeval)
550 os.utime(fp.fileno(), ns=ns)
551 self._test_utime(set_time)
552
553 @unittest.skipUnless(os.utime in os.supports_dir_fd,
554 "dir_fd support for utime required for this test.")
555 def test_utime_dir_fd(self):
556 def set_time(filename, ns):
557 dirname, name = os.path.split(filename)
558 dirfd = os.open(dirname, os.O_RDONLY)
559 try:
560 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
561 os.utime(name, dir_fd=dirfd, ns=ns)
562 finally:
563 os.close(dirfd)
564 self._test_utime(set_time)
565
566 def test_utime_directory(self):
567 def set_time(filename, ns):
568 # test calling os.utime() on a directory
569 os.utime(filename, ns=ns)
570 self._test_utime(set_time, filename=self.dirname)
571
572 def _test_utime_current(self, set_time):
573 # Get the system clock
574 current = time.time()
575
576 # Call os.utime() to set the timestamp to the current system clock
577 set_time(self.fname)
578
579 if not self.support_subsecond(self.fname):
580 delta = 1.0
581 else:
582 # On Windows, the usual resolution of time.time() is 15.6 ms
583 delta = 0.020
584 st = os.stat(self.fname)
585 msg = ("st_time=%r, current=%r, dt=%r"
586 % (st.st_mtime, current, st.st_mtime - current))
587 self.assertAlmostEqual(st.st_mtime, current,
588 delta=delta, msg=msg)
589
590 def test_utime_current(self):
591 def set_time(filename):
592 # Set to the current time in the new way
593 os.utime(self.fname)
594 self._test_utime_current(set_time)
595
596 def test_utime_current_old(self):
597 def set_time(filename):
598 # Set to the current time in the old explicit way.
599 os.utime(self.fname, None)
600 self._test_utime_current(set_time)
601
602 def get_file_system(self, path):
603 if sys.platform == 'win32':
604 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
605 import ctypes
606 kernel32 = ctypes.windll.kernel32
607 buf = ctypes.create_unicode_buffer("", 100)
608 ok = kernel32.GetVolumeInformationW(root, None, 0,
609 None, None, None,
610 buf, len(buf))
611 if ok:
612 return buf.value
613 # return None if the filesystem is unknown
614
615 def test_large_time(self):
616 # Many filesystems are limited to the year 2038. At least, the test
617 # pass with NTFS filesystem.
618 if self.get_file_system(self.dirname) != "NTFS":
619 self.skipTest("requires NTFS")
620
621 large = 5000000000 # some day in 2128
622 os.utime(self.fname, (large, large))
623 self.assertEqual(os.stat(self.fname).st_mtime, large)
624
625 def test_utime_invalid_arguments(self):
626 # seconds and nanoseconds parameters are mutually exclusive
627 with self.assertRaises(ValueError):
628 os.utime(self.fname, (5, 5), ns=(5, 5))
629
630
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000631from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000632
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000633class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000634 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000635 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000636
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000637 def setUp(self):
638 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000639 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000640 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000641 for key, value in self._reference().items():
642 os.environ[key] = value
643
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000644 def tearDown(self):
645 os.environ.clear()
646 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000647 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000648 os.environb.clear()
649 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000650
Christian Heimes90333392007-11-01 19:08:42 +0000651 def _reference(self):
652 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
653
654 def _empty_mapping(self):
655 os.environ.clear()
656 return os.environ
657
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000658 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300659 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000660 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000661 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300662 os.environ.update(HELLO="World")
663 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
664 value = popen.read().strip()
665 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000666
Ezio Melottic7e139b2012-09-26 20:01:34 +0300667 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000668 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300669 with os.popen(
670 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
671 it = iter(popen)
672 self.assertEqual(next(it), "line1\n")
673 self.assertEqual(next(it), "line2\n")
674 self.assertEqual(next(it), "line3\n")
675 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000676
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000677 # Verify environ keys and values from the OS are of the
678 # correct str type.
679 def test_keyvalue_types(self):
680 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000681 self.assertEqual(type(key), str)
682 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000683
Christian Heimes90333392007-11-01 19:08:42 +0000684 def test_items(self):
685 for key, value in self._reference().items():
686 self.assertEqual(os.environ.get(key), value)
687
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000688 # Issue 7310
689 def test___repr__(self):
690 """Check that the repr() of os.environ looks like environ({...})."""
691 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000692 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
693 '{!r}: {!r}'.format(key, value)
694 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000695
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000696 def test_get_exec_path(self):
697 defpath_list = os.defpath.split(os.pathsep)
698 test_path = ['/monty', '/python', '', '/flying/circus']
699 test_env = {'PATH': os.pathsep.join(test_path)}
700
701 saved_environ = os.environ
702 try:
703 os.environ = dict(test_env)
704 # Test that defaulting to os.environ works.
705 self.assertSequenceEqual(test_path, os.get_exec_path())
706 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
707 finally:
708 os.environ = saved_environ
709
710 # No PATH environment variable
711 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
712 # Empty PATH environment variable
713 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
714 # Supplied PATH environment variable
715 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
716
Victor Stinnerb745a742010-05-18 17:17:23 +0000717 if os.supports_bytes_environ:
718 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000719 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000720 # ignore BytesWarning warning
721 with warnings.catch_warnings(record=True):
722 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000723 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000724 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000725 pass
726 else:
727 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000728
729 # bytes key and/or value
730 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
731 ['abc'])
732 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
733 ['abc'])
734 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
735 ['abc'])
736
737 @unittest.skipUnless(os.supports_bytes_environ,
738 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000739 def test_environb(self):
740 # os.environ -> os.environb
741 value = 'euro\u20ac'
742 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000743 value_bytes = value.encode(sys.getfilesystemencoding(),
744 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000745 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000746 msg = "U+20AC character is not encodable to %s" % (
747 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000748 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000749 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(os.environ['unicode'], value)
751 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000752
753 # os.environb -> os.environ
754 value = b'\xff'
755 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000757 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000758 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000759
Charles-François Natali2966f102011-11-26 11:32:46 +0100760 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
761 # #13415).
762 @support.requires_freebsd_version(7)
763 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100764 def test_unset_error(self):
765 if sys.platform == "win32":
766 # an environment variable is limited to 32,767 characters
767 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100768 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100769 else:
770 # "=" is not allowed in a variable name
771 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100772 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100773
Victor Stinner6d101392013-04-14 16:35:04 +0200774 def test_key_type(self):
775 missing = 'missingkey'
776 self.assertNotIn(missing, os.environ)
777
Victor Stinner839e5ea2013-04-14 16:43:03 +0200778 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200779 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200780 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200781 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200782
Victor Stinner839e5ea2013-04-14 16:43:03 +0200783 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200784 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200785 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200786 self.assertTrue(cm.exception.__suppress_context__)
787
Victor Stinner6d101392013-04-14 16:35:04 +0200788
Tim Petersc4e09402003-04-25 07:11:48 +0000789class WalkTests(unittest.TestCase):
790 """Tests for os.walk()."""
791
Victor Stinner0561c532015-03-12 10:28:24 +0100792 # Wrapper to hide minor differences between os.walk and os.fwalk
793 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200794 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200795 if 'follow_symlinks' in kwargs:
796 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200797 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100798
Charles-François Natali7372b062012-02-05 15:15:38 +0100799 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100800 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000801
802 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000803 # TESTFN/
804 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000805 # tmp1
806 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000807 # tmp2
808 # SUB11/ no kids
809 # SUB2/ a file kid and a dirsymlink kid
810 # tmp3
811 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200812 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000813 # TEST2/
814 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100815 self.walk_path = join(support.TESTFN, "TEST1")
816 self.sub1_path = join(self.walk_path, "SUB1")
817 self.sub11_path = join(self.sub1_path, "SUB11")
818 sub2_path = join(self.walk_path, "SUB2")
819 tmp1_path = join(self.walk_path, "tmp1")
820 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000821 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100822 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000823 t2_path = join(support.TESTFN, "TEST2")
824 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200825 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000826
827 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100828 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000829 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000830 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100831
Guido van Rossumd8faa362007-04-27 19:54:29 +0000832 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000833 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000834 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
835 f.close()
836
Victor Stinner0561c532015-03-12 10:28:24 +0100837 if support.can_symlink():
838 os.symlink(os.path.abspath(t2_path), self.link_path)
839 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200840 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100841 else:
842 self.sub2_tree = (sub2_path, [], ["tmp3"])
843
844 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000845 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100846 all = list(os.walk(self.walk_path))
847
Tim Petersc4e09402003-04-25 07:11:48 +0000848 self.assertEqual(len(all), 4)
849 # We can't know which order SUB1 and SUB2 will appear in.
850 # Not flipped: TESTFN, SUB1, SUB11, SUB2
851 # flipped: TESTFN, SUB2, SUB1, SUB11
852 flipped = all[0][1][0] != "SUB1"
853 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200854 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100855 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
856 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
857 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
858 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000859
Victor Stinner0561c532015-03-12 10:28:24 +0100860 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000861 # Prune the search.
862 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100863 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000864 all.append((root, dirs, files))
865 # Don't descend into SUB1.
866 if 'SUB1' in dirs:
867 # Note that this also mutates the dirs we appended to all!
868 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000869
Victor Stinner0561c532015-03-12 10:28:24 +0100870 self.assertEqual(len(all), 2)
871 self.assertEqual(all[0],
872 (self.walk_path, ["SUB2"], ["tmp1"]))
873
874 all[1][-1].sort()
875 self.assertEqual(all[1], self.sub2_tree)
876
877 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000878 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100879 all = list(self.walk(self.walk_path, topdown=False))
880
Tim Petersc4e09402003-04-25 07:11:48 +0000881 self.assertEqual(len(all), 4)
882 # We can't know which order SUB1 and SUB2 will appear in.
883 # Not flipped: SUB11, SUB1, SUB2, TESTFN
884 # flipped: SUB2, SUB11, SUB1, TESTFN
885 flipped = all[3][1][0] != "SUB1"
886 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200887 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100888 self.assertEqual(all[3],
889 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
890 self.assertEqual(all[flipped],
891 (self.sub11_path, [], []))
892 self.assertEqual(all[flipped + 1],
893 (self.sub1_path, ["SUB11"], ["tmp2"]))
894 self.assertEqual(all[2 - 2 * flipped],
895 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000896
Victor Stinner0561c532015-03-12 10:28:24 +0100897 def test_walk_symlink(self):
898 if not support.can_symlink():
899 self.skipTest("need symlink support")
900
901 # Walk, following symlinks.
902 walk_it = self.walk(self.walk_path, follow_symlinks=True)
903 for root, dirs, files in walk_it:
904 if root == self.link_path:
905 self.assertEqual(dirs, [])
906 self.assertEqual(files, ["tmp4"])
907 break
908 else:
909 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000910
911 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000912 # Tear everything down. This is a decent use for bottom-up on
913 # Windows, which doesn't have a recursive delete command. The
914 # (not so) subtlety is that rmdir will fail unless the dir's
915 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000916 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000917 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000918 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000919 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000920 dirname = os.path.join(root, name)
921 if not os.path.islink(dirname):
922 os.rmdir(dirname)
923 else:
924 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000925 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000926
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200927 def test_walk_bad_dir(self):
928 # Walk top-down.
929 errors = []
930 walk_it = self.walk(self.walk_path, onerror=errors.append)
931 root, dirs, files = next(walk_it)
932 self.assertFalse(errors)
933 dir1 = dirs[0]
934 dir1new = dir1 + '.new'
935 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
936 roots = [r for r, d, f in walk_it]
937 self.assertTrue(errors)
938 self.assertNotIn(os.path.join(root, dir1), roots)
939 self.assertNotIn(os.path.join(root, dir1new), roots)
940 for dir2 in dirs[1:]:
941 self.assertIn(os.path.join(root, dir2), roots)
942
Charles-François Natali7372b062012-02-05 15:15:38 +0100943
944@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
945class FwalkTests(WalkTests):
946 """Tests for os.fwalk()."""
947
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200948 def walk(self, top, **kwargs):
949 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100950 yield (root, dirs, files)
951
Larry Hastingsc48fe982012-06-25 04:49:05 -0700952 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
953 """
954 compare with walk() results.
955 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700956 walk_kwargs = walk_kwargs.copy()
957 fwalk_kwargs = fwalk_kwargs.copy()
958 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
959 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
960 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700961
Charles-François Natali7372b062012-02-05 15:15:38 +0100962 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700963 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100964 expected[root] = (set(dirs), set(files))
965
Larry Hastingsc48fe982012-06-25 04:49:05 -0700966 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100967 self.assertIn(root, expected)
968 self.assertEqual(expected[root], (set(dirs), set(files)))
969
Larry Hastingsc48fe982012-06-25 04:49:05 -0700970 def test_compare_to_walk(self):
971 kwargs = {'top': support.TESTFN}
972 self._compare_to_walk(kwargs, kwargs)
973
Charles-François Natali7372b062012-02-05 15:15:38 +0100974 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700975 try:
976 fd = os.open(".", os.O_RDONLY)
977 walk_kwargs = {'top': support.TESTFN}
978 fwalk_kwargs = walk_kwargs.copy()
979 fwalk_kwargs['dir_fd'] = fd
980 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
981 finally:
982 os.close(fd)
983
984 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100985 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700986 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
987 args = support.TESTFN, topdown, None
988 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100989 # check that the FD is valid
990 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700991 # redundant check
992 os.stat(rootfd)
993 # check that listdir() returns consistent information
994 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100995
996 def test_fd_leak(self):
997 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
998 # we both check that calling fwalk() a large number of times doesn't
999 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1000 minfd = os.dup(1)
1001 os.close(minfd)
1002 for i in range(256):
1003 for x in os.fwalk(support.TESTFN):
1004 pass
1005 newfd = os.dup(1)
1006 self.addCleanup(os.close, newfd)
1007 self.assertEqual(newfd, minfd)
1008
1009 def tearDown(self):
1010 # cleanup
1011 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1012 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001013 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001014 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001015 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001016 if stat.S_ISDIR(st.st_mode):
1017 os.rmdir(name, dir_fd=rootfd)
1018 else:
1019 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001020 os.rmdir(support.TESTFN)
1021
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001022class BytesWalkTests(WalkTests):
1023 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001024 def setUp(self):
1025 super().setUp()
1026 self.stack = contextlib.ExitStack()
1027 if os.name == 'nt':
1028 self.stack.enter_context(warnings.catch_warnings())
1029 warnings.simplefilter("ignore", DeprecationWarning)
1030
1031 def tearDown(self):
1032 self.stack.close()
1033 super().tearDown()
1034
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001035 def walk(self, top, **kwargs):
1036 if 'follow_symlinks' in kwargs:
1037 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1038 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1039 root = os.fsdecode(broot)
1040 dirs = list(map(os.fsdecode, bdirs))
1041 files = list(map(os.fsdecode, bfiles))
1042 yield (root, dirs, files)
1043 bdirs[:] = list(map(os.fsencode, dirs))
1044 bfiles[:] = list(map(os.fsencode, files))
1045
Charles-François Natali7372b062012-02-05 15:15:38 +01001046
Guido van Rossume7ba4952007-06-06 23:52:48 +00001047class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001048 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001049 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001050
1051 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001052 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001053 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1054 os.makedirs(path) # Should work
1055 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1056 os.makedirs(path)
1057
1058 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001059 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001060 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1061 os.makedirs(path)
1062 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1063 'dir5', 'dir6')
1064 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001065
Terry Reedy5a22b652010-12-02 07:05:56 +00001066 def test_exist_ok_existing_directory(self):
1067 path = os.path.join(support.TESTFN, 'dir1')
1068 mode = 0o777
1069 old_mask = os.umask(0o022)
1070 os.makedirs(path, mode)
1071 self.assertRaises(OSError, os.makedirs, path, mode)
1072 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001073 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001074 os.makedirs(path, mode=mode, exist_ok=True)
1075 os.umask(old_mask)
1076
Martin Pantera82642f2015-11-19 04:48:44 +00001077 # Issue #25583: A drive root could raise PermissionError on Windows
1078 os.makedirs(os.path.abspath('/'), exist_ok=True)
1079
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001080 def test_exist_ok_s_isgid_directory(self):
1081 path = os.path.join(support.TESTFN, 'dir1')
1082 S_ISGID = stat.S_ISGID
1083 mode = 0o777
1084 old_mask = os.umask(0o022)
1085 try:
1086 existing_testfn_mode = stat.S_IMODE(
1087 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001088 try:
1089 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001090 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001091 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001092 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1093 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1094 # The os should apply S_ISGID from the parent dir for us, but
1095 # this test need not depend on that behavior. Be explicit.
1096 os.makedirs(path, mode | S_ISGID)
1097 # http://bugs.python.org/issue14992
1098 # Should not fail when the bit is already set.
1099 os.makedirs(path, mode, exist_ok=True)
1100 # remove the bit.
1101 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001102 # May work even when the bit is not already set when demanded.
1103 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001104 finally:
1105 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001106
1107 def test_exist_ok_existing_regular_file(self):
1108 base = support.TESTFN
1109 path = os.path.join(support.TESTFN, 'dir1')
1110 f = open(path, 'w')
1111 f.write('abc')
1112 f.close()
1113 self.assertRaises(OSError, os.makedirs, path)
1114 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1115 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1116 os.remove(path)
1117
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001118 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001119 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001120 'dir4', 'dir5', 'dir6')
1121 # If the tests failed, the bottom-most directory ('../dir6')
1122 # may not have been created, so we look for the outermost directory
1123 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001124 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001125 path = os.path.dirname(path)
1126
1127 os.removedirs(path)
1128
Andrew Svetlov405faed2012-12-25 12:18:09 +02001129
R David Murrayf2ad1732014-12-25 18:36:56 -05001130@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1131class ChownFileTests(unittest.TestCase):
1132
Berker Peksag036a71b2015-07-21 09:29:48 +03001133 @classmethod
1134 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001135 os.mkdir(support.TESTFN)
1136
1137 def test_chown_uid_gid_arguments_must_be_index(self):
1138 stat = os.stat(support.TESTFN)
1139 uid = stat.st_uid
1140 gid = stat.st_gid
1141 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1142 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1143 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1144 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1145 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1146
1147 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1148 def test_chown(self):
1149 gid_1, gid_2 = groups[:2]
1150 uid = os.stat(support.TESTFN).st_uid
1151 os.chown(support.TESTFN, uid, gid_1)
1152 gid = os.stat(support.TESTFN).st_gid
1153 self.assertEqual(gid, gid_1)
1154 os.chown(support.TESTFN, uid, gid_2)
1155 gid = os.stat(support.TESTFN).st_gid
1156 self.assertEqual(gid, gid_2)
1157
1158 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1159 "test needs root privilege and more than one user")
1160 def test_chown_with_root(self):
1161 uid_1, uid_2 = all_users[:2]
1162 gid = os.stat(support.TESTFN).st_gid
1163 os.chown(support.TESTFN, uid_1, gid)
1164 uid = os.stat(support.TESTFN).st_uid
1165 self.assertEqual(uid, uid_1)
1166 os.chown(support.TESTFN, uid_2, gid)
1167 uid = os.stat(support.TESTFN).st_uid
1168 self.assertEqual(uid, uid_2)
1169
1170 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1171 "test needs non-root account and more than one user")
1172 def test_chown_without_permission(self):
1173 uid_1, uid_2 = all_users[:2]
1174 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001175 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001176 os.chown(support.TESTFN, uid_1, gid)
1177 os.chown(support.TESTFN, uid_2, gid)
1178
Berker Peksag036a71b2015-07-21 09:29:48 +03001179 @classmethod
1180 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001181 os.rmdir(support.TESTFN)
1182
1183
Andrew Svetlov405faed2012-12-25 12:18:09 +02001184class RemoveDirsTests(unittest.TestCase):
1185 def setUp(self):
1186 os.makedirs(support.TESTFN)
1187
1188 def tearDown(self):
1189 support.rmtree(support.TESTFN)
1190
1191 def test_remove_all(self):
1192 dira = os.path.join(support.TESTFN, 'dira')
1193 os.mkdir(dira)
1194 dirb = os.path.join(dira, 'dirb')
1195 os.mkdir(dirb)
1196 os.removedirs(dirb)
1197 self.assertFalse(os.path.exists(dirb))
1198 self.assertFalse(os.path.exists(dira))
1199 self.assertFalse(os.path.exists(support.TESTFN))
1200
1201 def test_remove_partial(self):
1202 dira = os.path.join(support.TESTFN, 'dira')
1203 os.mkdir(dira)
1204 dirb = os.path.join(dira, 'dirb')
1205 os.mkdir(dirb)
1206 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1207 f.write('text')
1208 os.removedirs(dirb)
1209 self.assertFalse(os.path.exists(dirb))
1210 self.assertTrue(os.path.exists(dira))
1211 self.assertTrue(os.path.exists(support.TESTFN))
1212
1213 def test_remove_nothing(self):
1214 dira = os.path.join(support.TESTFN, 'dira')
1215 os.mkdir(dira)
1216 dirb = os.path.join(dira, 'dirb')
1217 os.mkdir(dirb)
1218 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1219 f.write('text')
1220 with self.assertRaises(OSError):
1221 os.removedirs(dirb)
1222 self.assertTrue(os.path.exists(dirb))
1223 self.assertTrue(os.path.exists(dira))
1224 self.assertTrue(os.path.exists(support.TESTFN))
1225
1226
Guido van Rossume7ba4952007-06-06 23:52:48 +00001227class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001228 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001229 with open(os.devnull, 'wb') as f:
1230 f.write(b'hello')
1231 f.close()
1232 with open(os.devnull, 'rb') as f:
1233 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001234
Andrew Svetlov405faed2012-12-25 12:18:09 +02001235
Guido van Rossume7ba4952007-06-06 23:52:48 +00001236class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001237 def test_urandom_length(self):
1238 self.assertEqual(len(os.urandom(0)), 0)
1239 self.assertEqual(len(os.urandom(1)), 1)
1240 self.assertEqual(len(os.urandom(10)), 10)
1241 self.assertEqual(len(os.urandom(100)), 100)
1242 self.assertEqual(len(os.urandom(1000)), 1000)
1243
1244 def test_urandom_value(self):
1245 data1 = os.urandom(16)
1246 data2 = os.urandom(16)
1247 self.assertNotEqual(data1, data2)
1248
1249 def get_urandom_subprocess(self, count):
1250 code = '\n'.join((
1251 'import os, sys',
1252 'data = os.urandom(%s)' % count,
1253 'sys.stdout.buffer.write(data)',
1254 'sys.stdout.buffer.flush()'))
1255 out = assert_python_ok('-c', code)
1256 stdout = out[1]
1257 self.assertEqual(len(stdout), 16)
1258 return stdout
1259
1260 def test_urandom_subprocess(self):
1261 data1 = self.get_urandom_subprocess(16)
1262 data2 = self.get_urandom_subprocess(16)
1263 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001264
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001265
Victor Stinnerbae2d622015-10-01 09:47:30 +02001266# os.urandom() doesn't use a file descriptor when it is implemented with the
1267# getentropy() function, the getrandom() function or the getrandom() syscall
1268OS_URANDOM_DONT_USE_FD = (
1269 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1270 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1271 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001272
Victor Stinnerbae2d622015-10-01 09:47:30 +02001273@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1274 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001275class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001276 @unittest.skipUnless(resource, "test requires the resource module")
1277 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001278 # Check urandom() failing when it is not able to open /dev/random.
1279 # We spawn a new process to make the test more robust (if getrlimit()
1280 # failed to restore the file descriptor limit after this, the whole
1281 # test suite would crash; this actually happened on the OS X Tiger
1282 # buildbot).
1283 code = """if 1:
1284 import errno
1285 import os
1286 import resource
1287
1288 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1289 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1290 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001291 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001292 except OSError as e:
1293 assert e.errno == errno.EMFILE, e.errno
1294 else:
1295 raise AssertionError("OSError not raised")
1296 """
1297 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001298
Antoine Pitroue472aea2014-04-26 14:33:03 +02001299 def test_urandom_fd_closed(self):
1300 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1301 # closed.
1302 code = """if 1:
1303 import os
1304 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001305 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001306 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001307 with test.support.SuppressCrashReport():
1308 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001309 sys.stdout.buffer.write(os.urandom(4))
1310 """
1311 rc, out, err = assert_python_ok('-Sc', code)
1312
1313 def test_urandom_fd_reopened(self):
1314 # Issue #21207: urandom() should detect its fd to /dev/urandom
1315 # changed to something else, and reopen it.
1316 with open(support.TESTFN, 'wb') as f:
1317 f.write(b"x" * 256)
1318 self.addCleanup(os.unlink, support.TESTFN)
1319 code = """if 1:
1320 import os
1321 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001322 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001323 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001324 with test.support.SuppressCrashReport():
1325 for fd in range(3, 256):
1326 try:
1327 os.close(fd)
1328 except OSError:
1329 pass
1330 else:
1331 # Found the urandom fd (XXX hopefully)
1332 break
1333 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001334 with open({TESTFN!r}, 'rb') as f:
1335 os.dup2(f.fileno(), fd)
1336 sys.stdout.buffer.write(os.urandom(4))
1337 sys.stdout.buffer.write(os.urandom(4))
1338 """.format(TESTFN=support.TESTFN)
1339 rc, out, err = assert_python_ok('-Sc', code)
1340 self.assertEqual(len(out), 8)
1341 self.assertNotEqual(out[0:4], out[4:8])
1342 rc, out2, err2 = assert_python_ok('-Sc', code)
1343 self.assertEqual(len(out2), 8)
1344 self.assertNotEqual(out2, out)
1345
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001346
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001347@contextlib.contextmanager
1348def _execvpe_mockup(defpath=None):
1349 """
1350 Stubs out execv and execve functions when used as context manager.
1351 Records exec calls. The mock execv and execve functions always raise an
1352 exception as they would normally never return.
1353 """
1354 # A list of tuples containing (function name, first arg, args)
1355 # of calls to execv or execve that have been made.
1356 calls = []
1357
1358 def mock_execv(name, *args):
1359 calls.append(('execv', name, args))
1360 raise RuntimeError("execv called")
1361
1362 def mock_execve(name, *args):
1363 calls.append(('execve', name, args))
1364 raise OSError(errno.ENOTDIR, "execve called")
1365
1366 try:
1367 orig_execv = os.execv
1368 orig_execve = os.execve
1369 orig_defpath = os.defpath
1370 os.execv = mock_execv
1371 os.execve = mock_execve
1372 if defpath is not None:
1373 os.defpath = defpath
1374 yield calls
1375 finally:
1376 os.execv = orig_execv
1377 os.execve = orig_execve
1378 os.defpath = orig_defpath
1379
Guido van Rossume7ba4952007-06-06 23:52:48 +00001380class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001381 @unittest.skipIf(USING_LINUXTHREADS,
1382 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001383 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001384 self.assertRaises(OSError, os.execvpe, 'no such app-',
1385 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001386
Thomas Heller6790d602007-08-30 17:15:14 +00001387 def test_execvpe_with_bad_arglist(self):
1388 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1389
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001390 @unittest.skipUnless(hasattr(os, '_execvpe'),
1391 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001392 def _test_internal_execvpe(self, test_type):
1393 program_path = os.sep + 'absolutepath'
1394 if test_type is bytes:
1395 program = b'executable'
1396 fullpath = os.path.join(os.fsencode(program_path), program)
1397 native_fullpath = fullpath
1398 arguments = [b'progname', 'arg1', 'arg2']
1399 else:
1400 program = 'executable'
1401 arguments = ['progname', 'arg1', 'arg2']
1402 fullpath = os.path.join(program_path, program)
1403 if os.name != "nt":
1404 native_fullpath = os.fsencode(fullpath)
1405 else:
1406 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001407 env = {'spam': 'beans'}
1408
Victor Stinnerb745a742010-05-18 17:17:23 +00001409 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001410 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001411 self.assertRaises(RuntimeError,
1412 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001413 self.assertEqual(len(calls), 1)
1414 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1415
Victor Stinnerb745a742010-05-18 17:17:23 +00001416 # test os._execvpe() with a relative path:
1417 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001418 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001419 self.assertRaises(OSError,
1420 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001421 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001422 self.assertSequenceEqual(calls[0],
1423 ('execve', native_fullpath, (arguments, env)))
1424
1425 # test os._execvpe() with a relative path:
1426 # os.get_exec_path() reads the 'PATH' variable
1427 with _execvpe_mockup() as calls:
1428 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001429 if test_type is bytes:
1430 env_path[b'PATH'] = program_path
1431 else:
1432 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001433 self.assertRaises(OSError,
1434 os._execvpe, program, arguments, env=env_path)
1435 self.assertEqual(len(calls), 1)
1436 self.assertSequenceEqual(calls[0],
1437 ('execve', native_fullpath, (arguments, env_path)))
1438
1439 def test_internal_execvpe_str(self):
1440 self._test_internal_execvpe(str)
1441 if os.name != "nt":
1442 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001443
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001444
Serhiy Storchaka43767632013-11-03 21:31:38 +02001445@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001446class Win32ErrorTests(unittest.TestCase):
1447 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001448 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001449
1450 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001451 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001452
1453 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001454 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001455
1456 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001457 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001458 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001459 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001460 finally:
1461 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001462 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001463
1464 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001465 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001466
Thomas Wouters477c8d52006-05-27 19:21:47 +00001467 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001468 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001469
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001470class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001471 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001472 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1473 #singles.append("close")
1474 #We omit close because it doesn'r raise an exception on some platforms
1475 def get_single(f):
1476 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001477 if hasattr(os, f):
1478 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001479 return helper
1480 for f in singles:
1481 locals()["test_"+f] = get_single(f)
1482
Benjamin Peterson7522c742009-01-19 21:00:09 +00001483 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001484 try:
1485 f(support.make_bad_fd(), *args)
1486 except OSError as e:
1487 self.assertEqual(e.errno, errno.EBADF)
1488 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001489 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001490 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001491
Serhiy Storchaka43767632013-11-03 21:31:38 +02001492 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001493 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001495
Serhiy Storchaka43767632013-11-03 21:31:38 +02001496 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001497 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001498 fd = support.make_bad_fd()
1499 # Make sure none of the descriptors we are about to close are
1500 # currently valid (issue 6542).
1501 for i in range(10):
1502 try: os.fstat(fd+i)
1503 except OSError:
1504 pass
1505 else:
1506 break
1507 if i < 2:
1508 raise unittest.SkipTest(
1509 "Unable to acquire a range of invalid file descriptors")
1510 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001511
Serhiy Storchaka43767632013-11-03 21:31:38 +02001512 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001513 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001515
Serhiy Storchaka43767632013-11-03 21:31:38 +02001516 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001517 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001518 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001519
Serhiy Storchaka43767632013-11-03 21:31:38 +02001520 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001521 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001522 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001523
Serhiy Storchaka43767632013-11-03 21:31:38 +02001524 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001525 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001526 self.check(os.pathconf, "PC_NAME_MAX")
1527 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001528
Serhiy Storchaka43767632013-11-03 21:31:38 +02001529 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001530 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001531 self.check(os.truncate, 0)
1532 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001533
Serhiy Storchaka43767632013-11-03 21:31:38 +02001534 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001535 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001536 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001537
Serhiy Storchaka43767632013-11-03 21:31:38 +02001538 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001539 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001540 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001541
Victor Stinner57ddf782014-01-08 15:21:28 +01001542 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1543 def test_readv(self):
1544 buf = bytearray(10)
1545 self.check(os.readv, [buf])
1546
Serhiy Storchaka43767632013-11-03 21:31:38 +02001547 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001548 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001549 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001550
Serhiy Storchaka43767632013-11-03 21:31:38 +02001551 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001552 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001553 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001554
Victor Stinner57ddf782014-01-08 15:21:28 +01001555 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1556 def test_writev(self):
1557 self.check(os.writev, [b'abc'])
1558
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001559 def test_inheritable(self):
1560 self.check(os.get_inheritable)
1561 self.check(os.set_inheritable, True)
1562
1563 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1564 'needs os.get_blocking() and os.set_blocking()')
1565 def test_blocking(self):
1566 self.check(os.get_blocking)
1567 self.check(os.set_blocking, True)
1568
Brian Curtin1b9df392010-11-24 20:24:31 +00001569
1570class LinkTests(unittest.TestCase):
1571 def setUp(self):
1572 self.file1 = support.TESTFN
1573 self.file2 = os.path.join(support.TESTFN + "2")
1574
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001575 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001576 for file in (self.file1, self.file2):
1577 if os.path.exists(file):
1578 os.unlink(file)
1579
Brian Curtin1b9df392010-11-24 20:24:31 +00001580 def _test_link(self, file1, file2):
1581 with open(file1, "w") as f1:
1582 f1.write("test")
1583
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001584 with warnings.catch_warnings():
1585 warnings.simplefilter("ignore", DeprecationWarning)
1586 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001587 with open(file1, "r") as f1, open(file2, "r") as f2:
1588 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1589
1590 def test_link(self):
1591 self._test_link(self.file1, self.file2)
1592
1593 def test_link_bytes(self):
1594 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1595 bytes(self.file2, sys.getfilesystemencoding()))
1596
Brian Curtinf498b752010-11-30 15:54:04 +00001597 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001598 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001599 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001600 except UnicodeError:
1601 raise unittest.SkipTest("Unable to encode for this platform.")
1602
Brian Curtinf498b752010-11-30 15:54:04 +00001603 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001604 self.file2 = self.file1 + "2"
1605 self._test_link(self.file1, self.file2)
1606
Serhiy Storchaka43767632013-11-03 21:31:38 +02001607@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1608class PosixUidGidTests(unittest.TestCase):
1609 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1610 def test_setuid(self):
1611 if os.getuid() != 0:
1612 self.assertRaises(OSError, os.setuid, 0)
1613 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001614
Serhiy Storchaka43767632013-11-03 21:31:38 +02001615 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1616 def test_setgid(self):
1617 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1618 self.assertRaises(OSError, os.setgid, 0)
1619 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001620
Serhiy Storchaka43767632013-11-03 21:31:38 +02001621 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1622 def test_seteuid(self):
1623 if os.getuid() != 0:
1624 self.assertRaises(OSError, os.seteuid, 0)
1625 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001626
Serhiy Storchaka43767632013-11-03 21:31:38 +02001627 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1628 def test_setegid(self):
1629 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1630 self.assertRaises(OSError, os.setegid, 0)
1631 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001632
Serhiy Storchaka43767632013-11-03 21:31:38 +02001633 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1634 def test_setreuid(self):
1635 if os.getuid() != 0:
1636 self.assertRaises(OSError, os.setreuid, 0, 0)
1637 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1638 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001639
Serhiy Storchaka43767632013-11-03 21:31:38 +02001640 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1641 def test_setreuid_neg1(self):
1642 # Needs to accept -1. We run this in a subprocess to avoid
1643 # altering the test runner's process state (issue8045).
1644 subprocess.check_call([
1645 sys.executable, '-c',
1646 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001647
Serhiy Storchaka43767632013-11-03 21:31:38 +02001648 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1649 def test_setregid(self):
1650 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1651 self.assertRaises(OSError, os.setregid, 0, 0)
1652 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1653 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001654
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1656 def test_setregid_neg1(self):
1657 # Needs to accept -1. We run this in a subprocess to avoid
1658 # altering the test runner's process state (issue8045).
1659 subprocess.check_call([
1660 sys.executable, '-c',
1661 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001662
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1664class Pep383Tests(unittest.TestCase):
1665 def setUp(self):
1666 if support.TESTFN_UNENCODABLE:
1667 self.dir = support.TESTFN_UNENCODABLE
1668 elif support.TESTFN_NONASCII:
1669 self.dir = support.TESTFN_NONASCII
1670 else:
1671 self.dir = support.TESTFN
1672 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001673
Serhiy Storchaka43767632013-11-03 21:31:38 +02001674 bytesfn = []
1675 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001676 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 fn = os.fsencode(fn)
1678 except UnicodeEncodeError:
1679 return
1680 bytesfn.append(fn)
1681 add_filename(support.TESTFN_UNICODE)
1682 if support.TESTFN_UNENCODABLE:
1683 add_filename(support.TESTFN_UNENCODABLE)
1684 if support.TESTFN_NONASCII:
1685 add_filename(support.TESTFN_NONASCII)
1686 if not bytesfn:
1687 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001688
Serhiy Storchaka43767632013-11-03 21:31:38 +02001689 self.unicodefn = set()
1690 os.mkdir(self.dir)
1691 try:
1692 for fn in bytesfn:
1693 support.create_empty_file(os.path.join(self.bdir, fn))
1694 fn = os.fsdecode(fn)
1695 if fn in self.unicodefn:
1696 raise ValueError("duplicate filename")
1697 self.unicodefn.add(fn)
1698 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001699 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001700 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001701
Serhiy Storchaka43767632013-11-03 21:31:38 +02001702 def tearDown(self):
1703 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001704
Serhiy Storchaka43767632013-11-03 21:31:38 +02001705 def test_listdir(self):
1706 expected = self.unicodefn
1707 found = set(os.listdir(self.dir))
1708 self.assertEqual(found, expected)
1709 # test listdir without arguments
1710 current_directory = os.getcwd()
1711 try:
1712 os.chdir(os.sep)
1713 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1714 finally:
1715 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001716
Serhiy Storchaka43767632013-11-03 21:31:38 +02001717 def test_open(self):
1718 for fn in self.unicodefn:
1719 f = open(os.path.join(self.dir, fn), 'rb')
1720 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001721
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722 @unittest.skipUnless(hasattr(os, 'statvfs'),
1723 "need os.statvfs()")
1724 def test_statvfs(self):
1725 # issue #9645
1726 for fn in self.unicodefn:
1727 # should not fail with file not found error
1728 fullname = os.path.join(self.dir, fn)
1729 os.statvfs(fullname)
1730
1731 def test_stat(self):
1732 for fn in self.unicodefn:
1733 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001734
Brian Curtineb24d742010-04-12 17:16:38 +00001735@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1736class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001737 def _kill(self, sig):
1738 # Start sys.executable as a subprocess and communicate from the
1739 # subprocess to the parent that the interpreter is ready. When it
1740 # becomes ready, send *sig* via os.kill to the subprocess and check
1741 # that the return code is equal to *sig*.
1742 import ctypes
1743 from ctypes import wintypes
1744 import msvcrt
1745
1746 # Since we can't access the contents of the process' stdout until the
1747 # process has exited, use PeekNamedPipe to see what's inside stdout
1748 # without waiting. This is done so we can tell that the interpreter
1749 # is started and running at a point where it could handle a signal.
1750 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1751 PeekNamedPipe.restype = wintypes.BOOL
1752 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1753 ctypes.POINTER(ctypes.c_char), # stdout buf
1754 wintypes.DWORD, # Buffer size
1755 ctypes.POINTER(wintypes.DWORD), # bytes read
1756 ctypes.POINTER(wintypes.DWORD), # bytes avail
1757 ctypes.POINTER(wintypes.DWORD)) # bytes left
1758 msg = "running"
1759 proc = subprocess.Popen([sys.executable, "-c",
1760 "import sys;"
1761 "sys.stdout.write('{}');"
1762 "sys.stdout.flush();"
1763 "input()".format(msg)],
1764 stdout=subprocess.PIPE,
1765 stderr=subprocess.PIPE,
1766 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001767 self.addCleanup(proc.stdout.close)
1768 self.addCleanup(proc.stderr.close)
1769 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001770
1771 count, max = 0, 100
1772 while count < max and proc.poll() is None:
1773 # Create a string buffer to store the result of stdout from the pipe
1774 buf = ctypes.create_string_buffer(len(msg))
1775 # Obtain the text currently in proc.stdout
1776 # Bytes read/avail/left are left as NULL and unused
1777 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1778 buf, ctypes.sizeof(buf), None, None, None)
1779 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1780 if buf.value:
1781 self.assertEqual(msg, buf.value.decode())
1782 break
1783 time.sleep(0.1)
1784 count += 1
1785 else:
1786 self.fail("Did not receive communication from the subprocess")
1787
Brian Curtineb24d742010-04-12 17:16:38 +00001788 os.kill(proc.pid, sig)
1789 self.assertEqual(proc.wait(), sig)
1790
1791 def test_kill_sigterm(self):
1792 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001793 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001794
1795 def test_kill_int(self):
1796 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001797 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001798
1799 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001800 tagname = "test_os_%s" % uuid.uuid1()
1801 m = mmap.mmap(-1, 1, tagname)
1802 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001803 # Run a script which has console control handling enabled.
1804 proc = subprocess.Popen([sys.executable,
1805 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001806 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001807 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1808 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001809 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001810 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001811 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001812 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001813 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001814 count += 1
1815 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001816 # Forcefully kill the process if we weren't able to signal it.
1817 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001818 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001819 os.kill(proc.pid, event)
1820 # proc.send_signal(event) could also be done here.
1821 # Allow time for the signal to be passed and the process to exit.
1822 time.sleep(0.5)
1823 if not proc.poll():
1824 # Forcefully kill the process if we weren't able to signal it.
1825 os.kill(proc.pid, signal.SIGINT)
1826 self.fail("subprocess did not stop on {}".format(name))
1827
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001828 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001829 def test_CTRL_C_EVENT(self):
1830 from ctypes import wintypes
1831 import ctypes
1832
1833 # Make a NULL value by creating a pointer with no argument.
1834 NULL = ctypes.POINTER(ctypes.c_int)()
1835 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1836 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1837 wintypes.BOOL)
1838 SetConsoleCtrlHandler.restype = wintypes.BOOL
1839
1840 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001841 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001842 # by subprocesses.
1843 SetConsoleCtrlHandler(NULL, 0)
1844
1845 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1846
1847 def test_CTRL_BREAK_EVENT(self):
1848 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1849
1850
Brian Curtind40e6f72010-07-08 21:39:08 +00001851@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001852class Win32ListdirTests(unittest.TestCase):
1853 """Test listdir on Windows."""
1854
1855 def setUp(self):
1856 self.created_paths = []
1857 for i in range(2):
1858 dir_name = 'SUB%d' % i
1859 dir_path = os.path.join(support.TESTFN, dir_name)
1860 file_name = 'FILE%d' % i
1861 file_path = os.path.join(support.TESTFN, file_name)
1862 os.makedirs(dir_path)
1863 with open(file_path, 'w') as f:
1864 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1865 self.created_paths.extend([dir_name, file_name])
1866 self.created_paths.sort()
1867
1868 def tearDown(self):
1869 shutil.rmtree(support.TESTFN)
1870
1871 def test_listdir_no_extended_path(self):
1872 """Test when the path is not an "extended" path."""
1873 # unicode
1874 self.assertEqual(
1875 sorted(os.listdir(support.TESTFN)),
1876 self.created_paths)
1877 # bytes
1878 self.assertEqual(
1879 sorted(os.listdir(os.fsencode(support.TESTFN))),
1880 [os.fsencode(path) for path in self.created_paths])
1881
1882 def test_listdir_extended_path(self):
1883 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001884 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001885 # unicode
1886 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1887 self.assertEqual(
1888 sorted(os.listdir(path)),
1889 self.created_paths)
1890 # bytes
1891 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1892 self.assertEqual(
1893 sorted(os.listdir(path)),
1894 [os.fsencode(path) for path in self.created_paths])
1895
1896
1897@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001898@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001899class Win32SymlinkTests(unittest.TestCase):
1900 filelink = 'filelinktest'
1901 filelink_target = os.path.abspath(__file__)
1902 dirlink = 'dirlinktest'
1903 dirlink_target = os.path.dirname(filelink_target)
1904 missing_link = 'missing link'
1905
1906 def setUp(self):
1907 assert os.path.exists(self.dirlink_target)
1908 assert os.path.exists(self.filelink_target)
1909 assert not os.path.exists(self.dirlink)
1910 assert not os.path.exists(self.filelink)
1911 assert not os.path.exists(self.missing_link)
1912
1913 def tearDown(self):
1914 if os.path.exists(self.filelink):
1915 os.remove(self.filelink)
1916 if os.path.exists(self.dirlink):
1917 os.rmdir(self.dirlink)
1918 if os.path.lexists(self.missing_link):
1919 os.remove(self.missing_link)
1920
1921 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001922 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001923 self.assertTrue(os.path.exists(self.dirlink))
1924 self.assertTrue(os.path.isdir(self.dirlink))
1925 self.assertTrue(os.path.islink(self.dirlink))
1926 self.check_stat(self.dirlink, self.dirlink_target)
1927
1928 def test_file_link(self):
1929 os.symlink(self.filelink_target, self.filelink)
1930 self.assertTrue(os.path.exists(self.filelink))
1931 self.assertTrue(os.path.isfile(self.filelink))
1932 self.assertTrue(os.path.islink(self.filelink))
1933 self.check_stat(self.filelink, self.filelink_target)
1934
1935 def _create_missing_dir_link(self):
1936 'Create a "directory" link to a non-existent target'
1937 linkname = self.missing_link
1938 if os.path.lexists(linkname):
1939 os.remove(linkname)
1940 target = r'c:\\target does not exist.29r3c740'
1941 assert not os.path.exists(target)
1942 target_is_dir = True
1943 os.symlink(target, linkname, target_is_dir)
1944
1945 def test_remove_directory_link_to_missing_target(self):
1946 self._create_missing_dir_link()
1947 # For compatibility with Unix, os.remove will check the
1948 # directory status and call RemoveDirectory if the symlink
1949 # was created with target_is_dir==True.
1950 os.remove(self.missing_link)
1951
1952 @unittest.skip("currently fails; consider for improvement")
1953 def test_isdir_on_directory_link_to_missing_target(self):
1954 self._create_missing_dir_link()
1955 # consider having isdir return true for directory links
1956 self.assertTrue(os.path.isdir(self.missing_link))
1957
1958 @unittest.skip("currently fails; consider for improvement")
1959 def test_rmdir_on_directory_link_to_missing_target(self):
1960 self._create_missing_dir_link()
1961 # consider allowing rmdir to remove directory links
1962 os.rmdir(self.missing_link)
1963
1964 def check_stat(self, link, target):
1965 self.assertEqual(os.stat(link), os.stat(target))
1966 self.assertNotEqual(os.lstat(link), os.stat(link))
1967
Brian Curtind25aef52011-06-13 15:16:04 -05001968 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001969 with warnings.catch_warnings():
1970 warnings.simplefilter("ignore", DeprecationWarning)
1971 self.assertEqual(os.stat(bytes_link), os.stat(target))
1972 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001973
1974 def test_12084(self):
1975 level1 = os.path.abspath(support.TESTFN)
1976 level2 = os.path.join(level1, "level2")
1977 level3 = os.path.join(level2, "level3")
1978 try:
1979 os.mkdir(level1)
1980 os.mkdir(level2)
1981 os.mkdir(level3)
1982
1983 file1 = os.path.abspath(os.path.join(level1, "file1"))
1984
1985 with open(file1, "w") as f:
1986 f.write("file1")
1987
1988 orig_dir = os.getcwd()
1989 try:
1990 os.chdir(level2)
1991 link = os.path.join(level2, "link")
1992 os.symlink(os.path.relpath(file1), "link")
1993 self.assertIn("link", os.listdir(os.getcwd()))
1994
1995 # Check os.stat calls from the same dir as the link
1996 self.assertEqual(os.stat(file1), os.stat("link"))
1997
1998 # Check os.stat calls from a dir below the link
1999 os.chdir(level1)
2000 self.assertEqual(os.stat(file1),
2001 os.stat(os.path.relpath(link)))
2002
2003 # Check os.stat calls from a dir above the link
2004 os.chdir(level3)
2005 self.assertEqual(os.stat(file1),
2006 os.stat(os.path.relpath(link)))
2007 finally:
2008 os.chdir(orig_dir)
2009 except OSError as err:
2010 self.fail(err)
2011 finally:
2012 os.remove(file1)
2013 shutil.rmtree(level1)
2014
Brian Curtind40e6f72010-07-08 21:39:08 +00002015
Tim Golden0321cf22014-05-05 19:46:17 +01002016@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2017class Win32JunctionTests(unittest.TestCase):
2018 junction = 'junctiontest'
2019 junction_target = os.path.dirname(os.path.abspath(__file__))
2020
2021 def setUp(self):
2022 assert os.path.exists(self.junction_target)
2023 assert not os.path.exists(self.junction)
2024
2025 def tearDown(self):
2026 if os.path.exists(self.junction):
2027 # os.rmdir delegates to Windows' RemoveDirectoryW,
2028 # which removes junction points safely.
2029 os.rmdir(self.junction)
2030
2031 def test_create_junction(self):
2032 _winapi.CreateJunction(self.junction_target, self.junction)
2033 self.assertTrue(os.path.exists(self.junction))
2034 self.assertTrue(os.path.isdir(self.junction))
2035
2036 # Junctions are not recognized as links.
2037 self.assertFalse(os.path.islink(self.junction))
2038
2039 def test_unlink_removes_junction(self):
2040 _winapi.CreateJunction(self.junction_target, self.junction)
2041 self.assertTrue(os.path.exists(self.junction))
2042
2043 os.unlink(self.junction)
2044 self.assertFalse(os.path.exists(self.junction))
2045
2046
Jason R. Coombs3a092862013-05-27 23:21:28 -04002047@support.skip_unless_symlink
2048class NonLocalSymlinkTests(unittest.TestCase):
2049
2050 def setUp(self):
2051 """
2052 Create this structure:
2053
2054 base
2055 \___ some_dir
2056 """
2057 os.makedirs('base/some_dir')
2058
2059 def tearDown(self):
2060 shutil.rmtree('base')
2061
2062 def test_directory_link_nonlocal(self):
2063 """
2064 The symlink target should resolve relative to the link, not relative
2065 to the current directory.
2066
2067 Then, link base/some_link -> base/some_dir and ensure that some_link
2068 is resolved as a directory.
2069
2070 In issue13772, it was discovered that directory detection failed if
2071 the symlink target was not specified relative to the current
2072 directory, which was a defect in the implementation.
2073 """
2074 src = os.path.join('base', 'some_link')
2075 os.symlink('some_dir', src)
2076 assert os.path.isdir(src)
2077
2078
Victor Stinnere8d51452010-08-19 01:05:19 +00002079class FSEncodingTests(unittest.TestCase):
2080 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2082 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002083
Victor Stinnere8d51452010-08-19 01:05:19 +00002084 def test_identity(self):
2085 # assert fsdecode(fsencode(x)) == x
2086 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2087 try:
2088 bytesfn = os.fsencode(fn)
2089 except UnicodeEncodeError:
2090 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002091 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002092
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002093
Brett Cannonefb00c02012-02-29 18:31:31 -05002094
2095class DeviceEncodingTests(unittest.TestCase):
2096
2097 def test_bad_fd(self):
2098 # Return None when an fd doesn't actually exist.
2099 self.assertIsNone(os.device_encoding(123456))
2100
Philip Jenveye308b7c2012-02-29 16:16:15 -08002101 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2102 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002103 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002104 def test_device_encoding(self):
2105 encoding = os.device_encoding(0)
2106 self.assertIsNotNone(encoding)
2107 self.assertTrue(codecs.lookup(encoding))
2108
2109
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002110class PidTests(unittest.TestCase):
2111 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2112 def test_getppid(self):
2113 p = subprocess.Popen([sys.executable, '-c',
2114 'import os; print(os.getppid())'],
2115 stdout=subprocess.PIPE)
2116 stdout, _ = p.communicate()
2117 # We are the parent of our subprocess
2118 self.assertEqual(int(stdout), os.getpid())
2119
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002120 def test_waitpid(self):
2121 args = [sys.executable, '-c', 'pass']
2122 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2123 status = os.waitpid(pid, 0)
2124 self.assertEqual(status, (pid, 0))
2125
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002126
Brian Curtin0151b8e2010-09-24 13:43:43 +00002127# The introduction of this TestCase caused at least two different errors on
2128# *nix buildbots. Temporarily skip this to let the buildbots move along.
2129@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002130@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2131class LoginTests(unittest.TestCase):
2132 def test_getlogin(self):
2133 user_name = os.getlogin()
2134 self.assertNotEqual(len(user_name), 0)
2135
2136
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002137@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2138 "needs os.getpriority and os.setpriority")
2139class ProgramPriorityTests(unittest.TestCase):
2140 """Tests for os.getpriority() and os.setpriority()."""
2141
2142 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002143
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002144 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2145 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2146 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002147 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2148 if base >= 19 and new_prio <= 19:
2149 raise unittest.SkipTest(
2150 "unable to reliably test setpriority at current nice level of %s" % base)
2151 else:
2152 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002153 finally:
2154 try:
2155 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2156 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002157 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002158 raise
2159
2160
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002161if threading is not None:
2162 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002163
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002164 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002165
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002166 def __init__(self, conn):
2167 asynchat.async_chat.__init__(self, conn)
2168 self.in_buffer = []
2169 self.closed = False
2170 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002171
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002172 def handle_read(self):
2173 data = self.recv(4096)
2174 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002175
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002176 def get_data(self):
2177 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002178
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002179 def handle_close(self):
2180 self.close()
2181 self.closed = True
2182
2183 def handle_error(self):
2184 raise
2185
2186 def __init__(self, address):
2187 threading.Thread.__init__(self)
2188 asyncore.dispatcher.__init__(self)
2189 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2190 self.bind(address)
2191 self.listen(5)
2192 self.host, self.port = self.socket.getsockname()[:2]
2193 self.handler_instance = None
2194 self._active = False
2195 self._active_lock = threading.Lock()
2196
2197 # --- public API
2198
2199 @property
2200 def running(self):
2201 return self._active
2202
2203 def start(self):
2204 assert not self.running
2205 self.__flag = threading.Event()
2206 threading.Thread.start(self)
2207 self.__flag.wait()
2208
2209 def stop(self):
2210 assert self.running
2211 self._active = False
2212 self.join()
2213
2214 def wait(self):
2215 # wait for handler connection to be closed, then stop the server
2216 while not getattr(self.handler_instance, "closed", False):
2217 time.sleep(0.001)
2218 self.stop()
2219
2220 # --- internals
2221
2222 def run(self):
2223 self._active = True
2224 self.__flag.set()
2225 while self._active and asyncore.socket_map:
2226 self._active_lock.acquire()
2227 asyncore.loop(timeout=0.001, count=1)
2228 self._active_lock.release()
2229 asyncore.close_all()
2230
2231 def handle_accept(self):
2232 conn, addr = self.accept()
2233 self.handler_instance = self.Handler(conn)
2234
2235 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002236 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002237 handle_read = handle_connect
2238
2239 def writable(self):
2240 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002241
2242 def handle_error(self):
2243 raise
2244
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002245
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002246@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002247@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2248class TestSendfile(unittest.TestCase):
2249
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002250 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002251 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002252 not sys.platform.startswith("solaris") and \
2253 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002254 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2255 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002256
2257 @classmethod
2258 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002259 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002260 with open(support.TESTFN, "wb") as f:
2261 f.write(cls.DATA)
2262
2263 @classmethod
2264 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002265 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002266 support.unlink(support.TESTFN)
2267
2268 def setUp(self):
2269 self.server = SendfileTestServer((support.HOST, 0))
2270 self.server.start()
2271 self.client = socket.socket()
2272 self.client.connect((self.server.host, self.server.port))
2273 self.client.settimeout(1)
2274 # synchronize by waiting for "220 ready" response
2275 self.client.recv(1024)
2276 self.sockno = self.client.fileno()
2277 self.file = open(support.TESTFN, 'rb')
2278 self.fileno = self.file.fileno()
2279
2280 def tearDown(self):
2281 self.file.close()
2282 self.client.close()
2283 if self.server.running:
2284 self.server.stop()
2285
2286 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2287 """A higher level wrapper representing how an application is
2288 supposed to use sendfile().
2289 """
2290 while 1:
2291 try:
2292 if self.SUPPORT_HEADERS_TRAILERS:
2293 return os.sendfile(sock, file, offset, nbytes, headers,
2294 trailers)
2295 else:
2296 return os.sendfile(sock, file, offset, nbytes)
2297 except OSError as err:
2298 if err.errno == errno.ECONNRESET:
2299 # disconnected
2300 raise
2301 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2302 # we have to retry send data
2303 continue
2304 else:
2305 raise
2306
2307 def test_send_whole_file(self):
2308 # normal send
2309 total_sent = 0
2310 offset = 0
2311 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002312 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002313 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2314 if sent == 0:
2315 break
2316 offset += sent
2317 total_sent += sent
2318 self.assertTrue(sent <= nbytes)
2319 self.assertEqual(offset, total_sent)
2320
2321 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002322 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002323 self.client.close()
2324 self.server.wait()
2325 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002326 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002327 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002328
2329 def test_send_at_certain_offset(self):
2330 # start sending a file at a certain offset
2331 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002332 offset = len(self.DATA) // 2
2333 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002334 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002335 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002336 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2337 if sent == 0:
2338 break
2339 offset += sent
2340 total_sent += sent
2341 self.assertTrue(sent <= nbytes)
2342
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002343 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002344 self.client.close()
2345 self.server.wait()
2346 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002347 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002348 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002349 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002350 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002351
2352 def test_offset_overflow(self):
2353 # specify an offset > file size
2354 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002355 try:
2356 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2357 except OSError as e:
2358 # Solaris can raise EINVAL if offset >= file length, ignore.
2359 if e.errno != errno.EINVAL:
2360 raise
2361 else:
2362 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002363 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002364 self.client.close()
2365 self.server.wait()
2366 data = self.server.handler_instance.get_data()
2367 self.assertEqual(data, b'')
2368
2369 def test_invalid_offset(self):
2370 with self.assertRaises(OSError) as cm:
2371 os.sendfile(self.sockno, self.fileno, -1, 4096)
2372 self.assertEqual(cm.exception.errno, errno.EINVAL)
2373
Martin Panterbf19d162015-09-09 01:01:13 +00002374 def test_keywords(self):
2375 # Keyword arguments should be supported
2376 os.sendfile(out=self.sockno, offset=0, count=4096,
2377 **{'in': self.fileno})
2378 if self.SUPPORT_HEADERS_TRAILERS:
2379 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002380 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002381
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002382 # --- headers / trailers tests
2383
Serhiy Storchaka43767632013-11-03 21:31:38 +02002384 @requires_headers_trailers
2385 def test_headers(self):
2386 total_sent = 0
2387 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2388 headers=[b"x" * 512])
2389 total_sent += sent
2390 offset = 4096
2391 nbytes = 4096
2392 while 1:
2393 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2394 offset, nbytes)
2395 if sent == 0:
2396 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002397 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002398 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002399
Serhiy Storchaka43767632013-11-03 21:31:38 +02002400 expected_data = b"x" * 512 + self.DATA
2401 self.assertEqual(total_sent, len(expected_data))
2402 self.client.close()
2403 self.server.wait()
2404 data = self.server.handler_instance.get_data()
2405 self.assertEqual(hash(data), hash(expected_data))
2406
2407 @requires_headers_trailers
2408 def test_trailers(self):
2409 TESTFN2 = support.TESTFN + "2"
2410 file_data = b"abcdef"
2411 with open(TESTFN2, 'wb') as f:
2412 f.write(file_data)
2413 with open(TESTFN2, 'rb')as f:
2414 self.addCleanup(os.remove, TESTFN2)
2415 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2416 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002417 self.client.close()
2418 self.server.wait()
2419 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002420 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002421
Serhiy Storchaka43767632013-11-03 21:31:38 +02002422 @requires_headers_trailers
2423 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2424 'test needs os.SF_NODISKIO')
2425 def test_flags(self):
2426 try:
2427 os.sendfile(self.sockno, self.fileno, 0, 4096,
2428 flags=os.SF_NODISKIO)
2429 except OSError as err:
2430 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2431 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002432
2433
Larry Hastings9cf065c2012-06-22 16:30:09 -07002434def supports_extended_attributes():
2435 if not hasattr(os, "setxattr"):
2436 return False
2437 try:
2438 with open(support.TESTFN, "wb") as fp:
2439 try:
2440 os.setxattr(fp.fileno(), b"user.test", b"")
2441 except OSError:
2442 return False
2443 finally:
2444 support.unlink(support.TESTFN)
2445 # Kernels < 2.6.39 don't respect setxattr flags.
2446 kernel_version = platform.release()
2447 m = re.match("2.6.(\d{1,2})", kernel_version)
2448 return m is None or int(m.group(1)) >= 39
2449
2450
2451@unittest.skipUnless(supports_extended_attributes(),
2452 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002453class ExtendedAttributeTests(unittest.TestCase):
2454
2455 def tearDown(self):
2456 support.unlink(support.TESTFN)
2457
Larry Hastings9cf065c2012-06-22 16:30:09 -07002458 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002459 fn = support.TESTFN
2460 open(fn, "wb").close()
2461 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002462 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002463 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002464 init_xattr = listxattr(fn)
2465 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002466 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002467 xattr = set(init_xattr)
2468 xattr.add("user.test")
2469 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002470 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2471 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2472 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002473 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002474 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002475 self.assertEqual(cm.exception.errno, errno.EEXIST)
2476 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002477 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002478 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002479 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002480 xattr.add("user.test2")
2481 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002482 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002483 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002484 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002485 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002486 xattr.remove("user.test")
2487 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002488 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2489 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2490 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2491 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002492 many = sorted("user.test{}".format(i) for i in range(100))
2493 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002494 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002495 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002496
Larry Hastings9cf065c2012-06-22 16:30:09 -07002497 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002498 def make_bytes(s):
2499 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002500 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002501 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002502 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002503
2504 def test_simple(self):
2505 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2506 os.listxattr)
2507
2508 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002509 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2510 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002511
2512 def test_fds(self):
2513 def getxattr(path, *args):
2514 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002515 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002516 def setxattr(path, *args):
2517 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002518 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002519 def removexattr(path, *args):
2520 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002521 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002522 def listxattr(path, *args):
2523 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002524 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002525 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2526
2527
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002528@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2529class Win32DeprecatedBytesAPI(unittest.TestCase):
2530 def test_deprecated(self):
2531 import nt
2532 filename = os.fsencode(support.TESTFN)
2533 with warnings.catch_warnings():
2534 warnings.simplefilter("error", DeprecationWarning)
2535 for func, *args in (
2536 (nt._getfullpathname, filename),
2537 (nt._isdir, filename),
2538 (os.access, filename, os.R_OK),
2539 (os.chdir, filename),
2540 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002541 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002542 (os.link, filename, filename),
2543 (os.listdir, filename),
2544 (os.lstat, filename),
2545 (os.mkdir, filename),
2546 (os.open, filename, os.O_RDONLY),
2547 (os.rename, filename, filename),
2548 (os.rmdir, filename),
2549 (os.startfile, filename),
2550 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002551 (os.unlink, filename),
2552 (os.utime, filename),
2553 ):
2554 self.assertRaises(DeprecationWarning, func, *args)
2555
Victor Stinner28216442011-11-16 00:34:44 +01002556 @support.skip_unless_symlink
2557 def test_symlink(self):
2558 filename = os.fsencode(support.TESTFN)
2559 with warnings.catch_warnings():
2560 warnings.simplefilter("error", DeprecationWarning)
2561 self.assertRaises(DeprecationWarning,
2562 os.symlink, filename, filename)
2563
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002564
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002565@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2566class TermsizeTests(unittest.TestCase):
2567 def test_does_not_crash(self):
2568 """Check if get_terminal_size() returns a meaningful value.
2569
2570 There's no easy portable way to actually check the size of the
2571 terminal, so let's check if it returns something sensible instead.
2572 """
2573 try:
2574 size = os.get_terminal_size()
2575 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002576 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002577 # Under win32 a generic OSError can be thrown if the
2578 # handle cannot be retrieved
2579 self.skipTest("failed to query terminal size")
2580 raise
2581
Antoine Pitroucfade362012-02-08 23:48:59 +01002582 self.assertGreaterEqual(size.columns, 0)
2583 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002584
2585 def test_stty_match(self):
2586 """Check if stty returns the same results
2587
2588 stty actually tests stdin, so get_terminal_size is invoked on
2589 stdin explicitly. If stty succeeded, then get_terminal_size()
2590 should work too.
2591 """
2592 try:
2593 size = subprocess.check_output(['stty', 'size']).decode().split()
2594 except (FileNotFoundError, subprocess.CalledProcessError):
2595 self.skipTest("stty invocation failed")
2596 expected = (int(size[1]), int(size[0])) # reversed order
2597
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002598 try:
2599 actual = os.get_terminal_size(sys.__stdin__.fileno())
2600 except OSError as e:
2601 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2602 # Under win32 a generic OSError can be thrown if the
2603 # handle cannot be retrieved
2604 self.skipTest("failed to query terminal size")
2605 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002606 self.assertEqual(expected, actual)
2607
2608
Victor Stinner292c8352012-10-30 02:17:38 +01002609class OSErrorTests(unittest.TestCase):
2610 def setUp(self):
2611 class Str(str):
2612 pass
2613
Victor Stinnerafe17062012-10-31 22:47:43 +01002614 self.bytes_filenames = []
2615 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002616 if support.TESTFN_UNENCODABLE is not None:
2617 decoded = support.TESTFN_UNENCODABLE
2618 else:
2619 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002620 self.unicode_filenames.append(decoded)
2621 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002622 if support.TESTFN_UNDECODABLE is not None:
2623 encoded = support.TESTFN_UNDECODABLE
2624 else:
2625 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002626 self.bytes_filenames.append(encoded)
2627 self.bytes_filenames.append(memoryview(encoded))
2628
2629 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002630
2631 def test_oserror_filename(self):
2632 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002633 (self.filenames, os.chdir,),
2634 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002635 (self.filenames, os.lstat,),
2636 (self.filenames, os.open, os.O_RDONLY),
2637 (self.filenames, os.rmdir,),
2638 (self.filenames, os.stat,),
2639 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002640 ]
2641 if sys.platform == "win32":
2642 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002643 (self.bytes_filenames, os.rename, b"dst"),
2644 (self.bytes_filenames, os.replace, b"dst"),
2645 (self.unicode_filenames, os.rename, "dst"),
2646 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002647 # Issue #16414: Don't test undecodable names with listdir()
2648 # because of a Windows bug.
2649 #
2650 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2651 # empty list (instead of failing), whereas os.listdir(b'\xff')
2652 # raises a FileNotFoundError. It looks like a Windows bug:
2653 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2654 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2655 # ERROR_PATH_NOT_FOUND (3).
2656 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002657 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002658 else:
2659 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002660 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002661 (self.filenames, os.rename, "dst"),
2662 (self.filenames, os.replace, "dst"),
2663 ))
2664 if hasattr(os, "chown"):
2665 funcs.append((self.filenames, os.chown, 0, 0))
2666 if hasattr(os, "lchown"):
2667 funcs.append((self.filenames, os.lchown, 0, 0))
2668 if hasattr(os, "truncate"):
2669 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002670 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002671 funcs.append((self.filenames, os.chflags, 0))
2672 if hasattr(os, "lchflags"):
2673 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002674 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002675 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002676 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002677 if sys.platform == "win32":
2678 funcs.append((self.bytes_filenames, os.link, b"dst"))
2679 funcs.append((self.unicode_filenames, os.link, "dst"))
2680 else:
2681 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002682 if hasattr(os, "listxattr"):
2683 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002684 (self.filenames, os.listxattr,),
2685 (self.filenames, os.getxattr, "user.test"),
2686 (self.filenames, os.setxattr, "user.test", b'user'),
2687 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002688 ))
2689 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002690 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002691 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002692 if sys.platform == "win32":
2693 funcs.append((self.unicode_filenames, os.readlink,))
2694 else:
2695 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002696
Victor Stinnerafe17062012-10-31 22:47:43 +01002697 for filenames, func, *func_args in funcs:
2698 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002699 try:
2700 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002701 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002702 self.assertIs(err.filename, name)
2703 else:
2704 self.fail("No exception thrown by {}".format(func))
2705
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002706class CPUCountTests(unittest.TestCase):
2707 def test_cpu_count(self):
2708 cpus = os.cpu_count()
2709 if cpus is not None:
2710 self.assertIsInstance(cpus, int)
2711 self.assertGreater(cpus, 0)
2712 else:
2713 self.skipTest("Could not determine the number of CPUs")
2714
Victor Stinnerdaf45552013-08-28 00:53:59 +02002715
2716class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002717 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002718 fd = os.open(__file__, os.O_RDONLY)
2719 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002720 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002721
Victor Stinnerdaf45552013-08-28 00:53:59 +02002722 os.set_inheritable(fd, True)
2723 self.assertEqual(os.get_inheritable(fd), True)
2724
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002725 @unittest.skipIf(fcntl is None, "need fcntl")
2726 def test_get_inheritable_cloexec(self):
2727 fd = os.open(__file__, os.O_RDONLY)
2728 self.addCleanup(os.close, fd)
2729 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002730
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002731 # clear FD_CLOEXEC flag
2732 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2733 flags &= ~fcntl.FD_CLOEXEC
2734 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002735
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002736 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002737
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002738 @unittest.skipIf(fcntl is None, "need fcntl")
2739 def test_set_inheritable_cloexec(self):
2740 fd = os.open(__file__, os.O_RDONLY)
2741 self.addCleanup(os.close, fd)
2742 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2743 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002744
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002745 os.set_inheritable(fd, True)
2746 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2747 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002748
Victor Stinnerdaf45552013-08-28 00:53:59 +02002749 def test_open(self):
2750 fd = os.open(__file__, os.O_RDONLY)
2751 self.addCleanup(os.close, fd)
2752 self.assertEqual(os.get_inheritable(fd), False)
2753
2754 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2755 def test_pipe(self):
2756 rfd, wfd = os.pipe()
2757 self.addCleanup(os.close, rfd)
2758 self.addCleanup(os.close, wfd)
2759 self.assertEqual(os.get_inheritable(rfd), False)
2760 self.assertEqual(os.get_inheritable(wfd), False)
2761
2762 def test_dup(self):
2763 fd1 = os.open(__file__, os.O_RDONLY)
2764 self.addCleanup(os.close, fd1)
2765
2766 fd2 = os.dup(fd1)
2767 self.addCleanup(os.close, fd2)
2768 self.assertEqual(os.get_inheritable(fd2), False)
2769
2770 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2771 def test_dup2(self):
2772 fd = os.open(__file__, os.O_RDONLY)
2773 self.addCleanup(os.close, fd)
2774
2775 # inheritable by default
2776 fd2 = os.open(__file__, os.O_RDONLY)
2777 try:
2778 os.dup2(fd, fd2)
2779 self.assertEqual(os.get_inheritable(fd2), True)
2780 finally:
2781 os.close(fd2)
2782
2783 # force non-inheritable
2784 fd3 = os.open(__file__, os.O_RDONLY)
2785 try:
2786 os.dup2(fd, fd3, inheritable=False)
2787 self.assertEqual(os.get_inheritable(fd3), False)
2788 finally:
2789 os.close(fd3)
2790
2791 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2792 def test_openpty(self):
2793 master_fd, slave_fd = os.openpty()
2794 self.addCleanup(os.close, master_fd)
2795 self.addCleanup(os.close, slave_fd)
2796 self.assertEqual(os.get_inheritable(master_fd), False)
2797 self.assertEqual(os.get_inheritable(slave_fd), False)
2798
2799
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002800@unittest.skipUnless(hasattr(os, 'get_blocking'),
2801 'needs os.get_blocking() and os.set_blocking()')
2802class BlockingTests(unittest.TestCase):
2803 def test_blocking(self):
2804 fd = os.open(__file__, os.O_RDONLY)
2805 self.addCleanup(os.close, fd)
2806 self.assertEqual(os.get_blocking(fd), True)
2807
2808 os.set_blocking(fd, False)
2809 self.assertEqual(os.get_blocking(fd), False)
2810
2811 os.set_blocking(fd, True)
2812 self.assertEqual(os.get_blocking(fd), True)
2813
2814
Yury Selivanov97e2e062014-09-26 12:33:06 -04002815
2816class ExportsTests(unittest.TestCase):
2817 def test_os_all(self):
2818 self.assertIn('open', os.__all__)
2819 self.assertIn('walk', os.__all__)
2820
2821
Victor Stinner6036e442015-03-08 01:58:04 +01002822class TestScandir(unittest.TestCase):
2823 def setUp(self):
2824 self.path = os.path.realpath(support.TESTFN)
2825 self.addCleanup(support.rmtree, self.path)
2826 os.mkdir(self.path)
2827
2828 def create_file(self, name="file.txt"):
2829 filename = os.path.join(self.path, name)
2830 with open(filename, "wb") as fp:
2831 fp.write(b'python')
2832 return filename
2833
2834 def get_entries(self, names):
2835 entries = dict((entry.name, entry)
2836 for entry in os.scandir(self.path))
2837 self.assertEqual(sorted(entries.keys()), names)
2838 return entries
2839
2840 def assert_stat_equal(self, stat1, stat2, skip_fields):
2841 if skip_fields:
2842 for attr in dir(stat1):
2843 if not attr.startswith("st_"):
2844 continue
2845 if attr in ("st_dev", "st_ino", "st_nlink"):
2846 continue
2847 self.assertEqual(getattr(stat1, attr),
2848 getattr(stat2, attr),
2849 (stat1, stat2, attr))
2850 else:
2851 self.assertEqual(stat1, stat2)
2852
2853 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2854 self.assertEqual(entry.name, name)
2855 self.assertEqual(entry.path, os.path.join(self.path, name))
2856 self.assertEqual(entry.inode(),
2857 os.stat(entry.path, follow_symlinks=False).st_ino)
2858
2859 entry_stat = os.stat(entry.path)
2860 self.assertEqual(entry.is_dir(),
2861 stat.S_ISDIR(entry_stat.st_mode))
2862 self.assertEqual(entry.is_file(),
2863 stat.S_ISREG(entry_stat.st_mode))
2864 self.assertEqual(entry.is_symlink(),
2865 os.path.islink(entry.path))
2866
2867 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2868 self.assertEqual(entry.is_dir(follow_symlinks=False),
2869 stat.S_ISDIR(entry_lstat.st_mode))
2870 self.assertEqual(entry.is_file(follow_symlinks=False),
2871 stat.S_ISREG(entry_lstat.st_mode))
2872
2873 self.assert_stat_equal(entry.stat(),
2874 entry_stat,
2875 os.name == 'nt' and not is_symlink)
2876 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2877 entry_lstat,
2878 os.name == 'nt')
2879
2880 def test_attributes(self):
2881 link = hasattr(os, 'link')
2882 symlink = support.can_symlink()
2883
2884 dirname = os.path.join(self.path, "dir")
2885 os.mkdir(dirname)
2886 filename = self.create_file("file.txt")
2887 if link:
2888 os.link(filename, os.path.join(self.path, "link_file.txt"))
2889 if symlink:
2890 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2891 target_is_directory=True)
2892 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2893
2894 names = ['dir', 'file.txt']
2895 if link:
2896 names.append('link_file.txt')
2897 if symlink:
2898 names.extend(('symlink_dir', 'symlink_file.txt'))
2899 entries = self.get_entries(names)
2900
2901 entry = entries['dir']
2902 self.check_entry(entry, 'dir', True, False, False)
2903
2904 entry = entries['file.txt']
2905 self.check_entry(entry, 'file.txt', False, True, False)
2906
2907 if link:
2908 entry = entries['link_file.txt']
2909 self.check_entry(entry, 'link_file.txt', False, True, False)
2910
2911 if symlink:
2912 entry = entries['symlink_dir']
2913 self.check_entry(entry, 'symlink_dir', True, False, True)
2914
2915 entry = entries['symlink_file.txt']
2916 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2917
2918 def get_entry(self, name):
2919 entries = list(os.scandir(self.path))
2920 self.assertEqual(len(entries), 1)
2921
2922 entry = entries[0]
2923 self.assertEqual(entry.name, name)
2924 return entry
2925
2926 def create_file_entry(self):
2927 filename = self.create_file()
2928 return self.get_entry(os.path.basename(filename))
2929
2930 def test_current_directory(self):
2931 filename = self.create_file()
2932 old_dir = os.getcwd()
2933 try:
2934 os.chdir(self.path)
2935
2936 # call scandir() without parameter: it must list the content
2937 # of the current directory
2938 entries = dict((entry.name, entry) for entry in os.scandir())
2939 self.assertEqual(sorted(entries.keys()),
2940 [os.path.basename(filename)])
2941 finally:
2942 os.chdir(old_dir)
2943
2944 def test_repr(self):
2945 entry = self.create_file_entry()
2946 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2947
2948 def test_removed_dir(self):
2949 path = os.path.join(self.path, 'dir')
2950
2951 os.mkdir(path)
2952 entry = self.get_entry('dir')
2953 os.rmdir(path)
2954
2955 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2956 if os.name == 'nt':
2957 self.assertTrue(entry.is_dir())
2958 self.assertFalse(entry.is_file())
2959 self.assertFalse(entry.is_symlink())
2960 if os.name == 'nt':
2961 self.assertRaises(FileNotFoundError, entry.inode)
2962 # don't fail
2963 entry.stat()
2964 entry.stat(follow_symlinks=False)
2965 else:
2966 self.assertGreater(entry.inode(), 0)
2967 self.assertRaises(FileNotFoundError, entry.stat)
2968 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2969
2970 def test_removed_file(self):
2971 entry = self.create_file_entry()
2972 os.unlink(entry.path)
2973
2974 self.assertFalse(entry.is_dir())
2975 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2976 if os.name == 'nt':
2977 self.assertTrue(entry.is_file())
2978 self.assertFalse(entry.is_symlink())
2979 if os.name == 'nt':
2980 self.assertRaises(FileNotFoundError, entry.inode)
2981 # don't fail
2982 entry.stat()
2983 entry.stat(follow_symlinks=False)
2984 else:
2985 self.assertGreater(entry.inode(), 0)
2986 self.assertRaises(FileNotFoundError, entry.stat)
2987 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2988
2989 def test_broken_symlink(self):
2990 if not support.can_symlink():
2991 return self.skipTest('cannot create symbolic link')
2992
2993 filename = self.create_file("file.txt")
2994 os.symlink(filename,
2995 os.path.join(self.path, "symlink.txt"))
2996 entries = self.get_entries(['file.txt', 'symlink.txt'])
2997 entry = entries['symlink.txt']
2998 os.unlink(filename)
2999
3000 self.assertGreater(entry.inode(), 0)
3001 self.assertFalse(entry.is_dir())
3002 self.assertFalse(entry.is_file()) # broken symlink returns False
3003 self.assertFalse(entry.is_dir(follow_symlinks=False))
3004 self.assertFalse(entry.is_file(follow_symlinks=False))
3005 self.assertTrue(entry.is_symlink())
3006 self.assertRaises(FileNotFoundError, entry.stat)
3007 # don't fail
3008 entry.stat(follow_symlinks=False)
3009
3010 def test_bytes(self):
3011 if os.name == "nt":
3012 # On Windows, os.scandir(bytes) must raise an exception
3013 self.assertRaises(TypeError, os.scandir, b'.')
3014 return
3015
3016 self.create_file("file.txt")
3017
3018 path_bytes = os.fsencode(self.path)
3019 entries = list(os.scandir(path_bytes))
3020 self.assertEqual(len(entries), 1, entries)
3021 entry = entries[0]
3022
3023 self.assertEqual(entry.name, b'file.txt')
3024 self.assertEqual(entry.path,
3025 os.fsencode(os.path.join(self.path, 'file.txt')))
3026
3027 def test_empty_path(self):
3028 self.assertRaises(FileNotFoundError, os.scandir, '')
3029
3030 def test_consume_iterator_twice(self):
3031 self.create_file("file.txt")
3032 iterator = os.scandir(self.path)
3033
3034 entries = list(iterator)
3035 self.assertEqual(len(entries), 1, entries)
3036
3037 # check than consuming the iterator twice doesn't raise exception
3038 entries2 = list(iterator)
3039 self.assertEqual(len(entries2), 0, entries2)
3040
3041 def test_bad_path_type(self):
3042 for obj in [1234, 1.234, {}, []]:
3043 self.assertRaises(TypeError, os.scandir, obj)
3044
3045
Fred Drake2e2be372001-09-20 21:33:42 +00003046if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003047 unittest.main()