blob: 73db39c8258e915cfbc43eb9b57114b7a0d686a8 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner138adb82015-06-12 22:01:54 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner138adb82015-06-12 22:01:54 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner138adb82015-06-12 22:01:54 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner138adb82015-06-12 22:01:54 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner138adb82015-06-12 22:01:54 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner138adb82015-06-12 22:01:54 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200225
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226# Test attributes on return values from os.*stat* family.
227class StatAttributeTests(unittest.TestCase):
228 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 os.mkdir(support.TESTFN)
230 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000232 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235 def tearDown(self):
236 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000237 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238
Serhiy Storchaka43767632013-11-03 21:31:38 +0200239 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000240 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242
243 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(result[stat.ST_SIZE], 3)
245 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247 # Make sure all the attributes are there
248 members = dir(result)
249 for name in dir(stat):
250 if name[:3] == 'ST_':
251 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000252 if name.endswith("TIME"):
253 def trunc(x): return int(x)
254 else:
255 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
Larry Hastings6fe20b32012-04-19 15:07:49 -0700260 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700261 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 for name in 'st_atime st_mtime st_ctime'.split():
263 floaty = int(getattr(result, name) * 100000)
264 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700265 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700266
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 try:
268 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200269 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270 except IndexError:
271 pass
272
273 # Make sure that assignment fails
274 try:
275 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200276 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000277 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 pass
279
280 try:
281 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200282 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000283 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 pass
285
286 try:
287 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200288 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 except AttributeError:
290 pass
291
292 # Use the stat_result constructor with a too-short tuple.
293 try:
294 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except TypeError:
297 pass
298
Ezio Melotti42da6632011-03-15 05:18:48 +0200299 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 try:
301 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
302 except TypeError:
303 pass
304
Antoine Pitrou38425292010-09-21 18:19:07 +0000305 def test_stat_attributes(self):
306 self.check_stat_attributes(self.fname)
307
308 def test_stat_attributes_bytes(self):
309 try:
310 fname = self.fname.encode(sys.getfilesystemencoding())
311 except UnicodeEncodeError:
312 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100313 with warnings.catch_warnings():
314 warnings.simplefilter("ignore", DeprecationWarning)
315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 try:
330 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000331 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000332 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000333 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200334 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
336 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000337 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000338
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000339 # Make sure all the attributes are there.
340 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
341 'ffree', 'favail', 'flag', 'namemax')
342 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344
345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
374 try:
375 result = os.statvfs(self.fname)
376 except OSError as e:
377 # On AtheOS, glibc always returns ENOSYS
378 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200379 self.skipTest('os.statvfs() failed with ENOSYS')
380
Serhiy Storchakabad12572014-12-15 14:03:42 +0200381 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
382 p = pickle.dumps(result, proto)
383 self.assertIn(b'statvfs_result', p)
384 if proto < 4:
385 self.assertIn(b'cos\nstatvfs_result\n', p)
386 unpickled = pickle.loads(p)
387 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200388
Serhiy Storchaka43767632013-11-03 21:31:38 +0200389 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
390 def test_1686475(self):
391 # Verify that an open file can be stat'ed
392 try:
393 os.stat(r"c:\pagefile.sys")
394 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600395 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200396 except OSError as e:
397 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000398
Serhiy Storchaka43767632013-11-03 21:31:38 +0200399 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
400 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
401 def test_15261(self):
402 # Verify that stat'ing a closed fd does not cause crash
403 r, w = os.pipe()
404 try:
405 os.stat(r) # should not raise error
406 finally:
407 os.close(r)
408 os.close(w)
409 with self.assertRaises(OSError) as ctx:
410 os.stat(r)
411 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100412
Zachary Ware63f277b2014-06-19 09:46:37 -0500413 def check_file_attributes(self, result):
414 self.assertTrue(hasattr(result, 'st_file_attributes'))
415 self.assertTrue(isinstance(result.st_file_attributes, int))
416 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
417
418 @unittest.skipUnless(sys.platform == "win32",
419 "st_file_attributes is Win32 specific")
420 def test_file_attributes(self):
421 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
422 result = os.stat(self.fname)
423 self.check_file_attributes(result)
424 self.assertEqual(
425 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
426 0)
427
428 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
429 result = os.stat(support.TESTFN)
430 self.check_file_attributes(result)
431 self.assertEqual(
432 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
433 stat.FILE_ATTRIBUTE_DIRECTORY)
434
Victor Stinner138adb82015-06-12 22:01:54 +0200435
436class UtimeTests(unittest.TestCase):
437 def setUp(self):
438 self.dirname = support.TESTFN
439 self.fname = os.path.join(self.dirname, "f1")
440
441 self.addCleanup(support.rmtree, self.dirname)
442 os.mkdir(self.dirname)
443 with open(self.fname, 'wb') as fp:
444 fp.write(b"ABC")
445
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200446 def restore_float_times(state):
447 with warnings.catch_warnings():
448 warnings.simplefilter("ignore", DeprecationWarning)
449
450 os.stat_float_times(state)
451
Victor Stinner138adb82015-06-12 22:01:54 +0200452 # ensure that st_atime and st_mtime are float
453 with warnings.catch_warnings():
454 warnings.simplefilter("ignore", DeprecationWarning)
455
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200456 old_float_times = os.stat_float_times(-1)
457 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner138adb82015-06-12 22:01:54 +0200458
459 os.stat_float_times(True)
460
461 def support_subsecond(self, filename):
462 # Heuristic to check if the filesystem supports timestamp with
463 # subsecond resolution: check if float and int timestamps are different
464 st = os.stat(filename)
465 return ((st.st_atime != st[7])
466 or (st.st_mtime != st[8])
467 or (st.st_ctime != st[9]))
468
469 def _test_utime(self, set_time, filename=None):
470 if not filename:
471 filename = self.fname
472
473 support_subsecond = self.support_subsecond(filename)
474 if support_subsecond:
475 # Timestamp with a resolution of 1 microsecond (10^-6).
476 #
477 # The resolution of the C internal function used by os.utime()
478 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
479 # test with a resolution of 1 ns requires more work:
480 # see the issue #15745.
481 atime_ns = 1002003000 # 1.002003 seconds
482 mtime_ns = 4005006000 # 4.005006 seconds
483 else:
484 # use a resolution of 1 second
485 atime_ns = 5 * 10**9
486 mtime_ns = 8 * 10**9
487
488 set_time(filename, (atime_ns, mtime_ns))
489 st = os.stat(filename)
490
491 if support_subsecond:
492 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
493 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
494 else:
495 self.assertEqual(st.st_atime, atime_ns * 1e-9)
496 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
497 self.assertEqual(st.st_atime_ns, atime_ns)
498 self.assertEqual(st.st_mtime_ns, mtime_ns)
499
500 def test_utime(self):
501 def set_time(filename, ns):
502 # test the ns keyword parameter
503 os.utime(filename, ns=ns)
504 self._test_utime(set_time)
505
506 @staticmethod
507 def ns_to_sec(ns):
508 # Convert a number of nanosecond (int) to a number of seconds (float).
509 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
510 # issue, os.utime() rounds towards minus infinity.
511 return (ns * 1e-9) + 0.5e-9
512
513 def test_utime_by_indexed(self):
514 # pass times as floating point seconds as the second indexed parameter
515 def set_time(filename, ns):
516 atime_ns, mtime_ns = ns
517 atime = self.ns_to_sec(atime_ns)
518 mtime = self.ns_to_sec(mtime_ns)
519 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
520 # or utime(time_t)
521 os.utime(filename, (atime, mtime))
522 self._test_utime(set_time)
523
524 def test_utime_by_times(self):
525 def set_time(filename, ns):
526 atime_ns, mtime_ns = ns
527 atime = self.ns_to_sec(atime_ns)
528 mtime = self.ns_to_sec(mtime_ns)
529 # test the times keyword parameter
530 os.utime(filename, times=(atime, mtime))
531 self._test_utime(set_time)
532
533 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
534 "follow_symlinks support for utime required "
535 "for this test.")
536 def test_utime_nofollow_symlinks(self):
537 def set_time(filename, ns):
538 # use follow_symlinks=False to test utimensat(timespec)
539 # or lutimes(timeval)
540 os.utime(filename, ns=ns, follow_symlinks=False)
541 self._test_utime(set_time)
542
543 @unittest.skipUnless(os.utime in os.supports_fd,
544 "fd support for utime required for this test.")
545 def test_utime_fd(self):
546 def set_time(filename, ns):
547 with open(filename, 'wb') as fp:
548 # use a file descriptor to test futimens(timespec)
549 # or futimes(timeval)
550 os.utime(fp.fileno(), ns=ns)
551 self._test_utime(set_time)
552
553 @unittest.skipUnless(os.utime in os.supports_dir_fd,
554 "dir_fd support for utime required for this test.")
555 def test_utime_dir_fd(self):
556 def set_time(filename, ns):
557 dirname, name = os.path.split(filename)
558 dirfd = os.open(dirname, os.O_RDONLY)
559 try:
560 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
561 os.utime(name, dir_fd=dirfd, ns=ns)
562 finally:
563 os.close(dirfd)
564 self._test_utime(set_time)
565
566 def test_utime_directory(self):
567 def set_time(filename, ns):
568 # test calling os.utime() on a directory
569 os.utime(filename, ns=ns)
570 self._test_utime(set_time, filename=self.dirname)
571
572 def _test_utime_current(self, set_time):
573 # Get the system clock
574 current = time.time()
575
576 # Call os.utime() to set the timestamp to the current system clock
577 set_time(self.fname)
578
579 if not self.support_subsecond(self.fname):
580 delta = 1.0
581 else:
582 # On Windows, the usual resolution of time.time() is 15.6 ms
583 delta = 0.020
584 st = os.stat(self.fname)
585 msg = ("st_time=%r, current=%r, dt=%r"
586 % (st.st_mtime, current, st.st_mtime - current))
587 self.assertAlmostEqual(st.st_mtime, current,
588 delta=delta, msg=msg)
589
590 def test_utime_current(self):
591 def set_time(filename):
592 # Set to the current time in the new way
593 os.utime(self.fname)
594 self._test_utime_current(set_time)
595
596 def test_utime_current_old(self):
597 def set_time(filename):
598 # Set to the current time in the old explicit way.
599 os.utime(self.fname, None)
600 self._test_utime_current(set_time)
601
602 def get_file_system(self, path):
603 if sys.platform == 'win32':
604 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
605 import ctypes
606 kernel32 = ctypes.windll.kernel32
607 buf = ctypes.create_unicode_buffer("", 100)
608 ok = kernel32.GetVolumeInformationW(root, None, 0,
609 None, None, None,
610 buf, len(buf))
611 if ok:
612 return buf.value
613 # return None if the filesystem is unknown
614
615 def test_large_time(self):
616 # Many filesystems are limited to the year 2038. At least, the test
617 # pass with NTFS filesystem.
618 if self.get_file_system(self.dirname) != "NTFS":
619 self.skipTest("requires NTFS")
620
621 large = 5000000000 # some day in 2128
622 os.utime(self.fname, (large, large))
623 self.assertEqual(os.stat(self.fname).st_mtime, large)
624
625 def test_utime_invalid_arguments(self):
626 # seconds and nanoseconds parameters are mutually exclusive
627 with self.assertRaises(ValueError):
628 os.utime(self.fname, (5, 5), ns=(5, 5))
629
630
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000631from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000632
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000633class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000634 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000635 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000636
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000637 def setUp(self):
638 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000639 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000640 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000641 for key, value in self._reference().items():
642 os.environ[key] = value
643
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000644 def tearDown(self):
645 os.environ.clear()
646 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000647 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000648 os.environb.clear()
649 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000650
Christian Heimes90333392007-11-01 19:08:42 +0000651 def _reference(self):
652 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
653
654 def _empty_mapping(self):
655 os.environ.clear()
656 return os.environ
657
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000658 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300659 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000660 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000661 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300662 os.environ.update(HELLO="World")
663 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
664 value = popen.read().strip()
665 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000666
Ezio Melottic7e139b2012-09-26 20:01:34 +0300667 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000668 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300669 with os.popen(
670 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
671 it = iter(popen)
672 self.assertEqual(next(it), "line1\n")
673 self.assertEqual(next(it), "line2\n")
674 self.assertEqual(next(it), "line3\n")
675 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000676
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000677 # Verify environ keys and values from the OS are of the
678 # correct str type.
679 def test_keyvalue_types(self):
680 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000681 self.assertEqual(type(key), str)
682 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000683
Christian Heimes90333392007-11-01 19:08:42 +0000684 def test_items(self):
685 for key, value in self._reference().items():
686 self.assertEqual(os.environ.get(key), value)
687
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000688 # Issue 7310
689 def test___repr__(self):
690 """Check that the repr() of os.environ looks like environ({...})."""
691 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000692 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
693 '{!r}: {!r}'.format(key, value)
694 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000695
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000696 def test_get_exec_path(self):
697 defpath_list = os.defpath.split(os.pathsep)
698 test_path = ['/monty', '/python', '', '/flying/circus']
699 test_env = {'PATH': os.pathsep.join(test_path)}
700
701 saved_environ = os.environ
702 try:
703 os.environ = dict(test_env)
704 # Test that defaulting to os.environ works.
705 self.assertSequenceEqual(test_path, os.get_exec_path())
706 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
707 finally:
708 os.environ = saved_environ
709
710 # No PATH environment variable
711 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
712 # Empty PATH environment variable
713 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
714 # Supplied PATH environment variable
715 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
716
Victor Stinnerb745a742010-05-18 17:17:23 +0000717 if os.supports_bytes_environ:
718 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000719 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000720 # ignore BytesWarning warning
721 with warnings.catch_warnings(record=True):
722 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000723 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000724 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000725 pass
726 else:
727 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000728
729 # bytes key and/or value
730 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
731 ['abc'])
732 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
733 ['abc'])
734 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
735 ['abc'])
736
737 @unittest.skipUnless(os.supports_bytes_environ,
738 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000739 def test_environb(self):
740 # os.environ -> os.environb
741 value = 'euro\u20ac'
742 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000743 value_bytes = value.encode(sys.getfilesystemencoding(),
744 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000745 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000746 msg = "U+20AC character is not encodable to %s" % (
747 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000748 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000749 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(os.environ['unicode'], value)
751 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000752
753 # os.environb -> os.environ
754 value = b'\xff'
755 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000757 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000758 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000759
Charles-François Natali2966f102011-11-26 11:32:46 +0100760 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
761 # #13415).
762 @support.requires_freebsd_version(7)
763 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100764 def test_unset_error(self):
765 if sys.platform == "win32":
766 # an environment variable is limited to 32,767 characters
767 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100768 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100769 else:
770 # "=" is not allowed in a variable name
771 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100772 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100773
Victor Stinner6d101392013-04-14 16:35:04 +0200774 def test_key_type(self):
775 missing = 'missingkey'
776 self.assertNotIn(missing, os.environ)
777
Victor Stinner839e5ea2013-04-14 16:43:03 +0200778 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200779 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200780 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200781 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200782
Victor Stinner839e5ea2013-04-14 16:43:03 +0200783 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200784 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200785 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200786 self.assertTrue(cm.exception.__suppress_context__)
787
Victor Stinner6d101392013-04-14 16:35:04 +0200788
Tim Petersc4e09402003-04-25 07:11:48 +0000789class WalkTests(unittest.TestCase):
790 """Tests for os.walk()."""
791
Victor Stinner0561c532015-03-12 10:28:24 +0100792 # Wrapper to hide minor differences between os.walk and os.fwalk
793 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200794 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200795 if 'follow_symlinks' in kwargs:
796 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200797 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100798
Charles-François Natali7372b062012-02-05 15:15:38 +0100799 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100800 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000801
802 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000803 # TESTFN/
804 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000805 # tmp1
806 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000807 # tmp2
808 # SUB11/ no kids
809 # SUB2/ a file kid and a dirsymlink kid
810 # tmp3
811 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200812 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000813 # TEST2/
814 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100815 self.walk_path = join(support.TESTFN, "TEST1")
816 self.sub1_path = join(self.walk_path, "SUB1")
817 self.sub11_path = join(self.sub1_path, "SUB11")
818 sub2_path = join(self.walk_path, "SUB2")
819 tmp1_path = join(self.walk_path, "tmp1")
820 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000821 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100822 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000823 t2_path = join(support.TESTFN, "TEST2")
824 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200825 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000826
827 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100828 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000829 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000830 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100831
Guido van Rossumd8faa362007-04-27 19:54:29 +0000832 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000833 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000834 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
835 f.close()
836
Victor Stinner0561c532015-03-12 10:28:24 +0100837 if support.can_symlink():
838 os.symlink(os.path.abspath(t2_path), self.link_path)
839 os.symlink('broken', broken_link_path, True)
840 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
841 else:
842 self.sub2_tree = (sub2_path, [], ["tmp3"])
843
844 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000845 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100846 all = list(os.walk(self.walk_path))
847
Tim Petersc4e09402003-04-25 07:11:48 +0000848 self.assertEqual(len(all), 4)
849 # We can't know which order SUB1 and SUB2 will appear in.
850 # Not flipped: TESTFN, SUB1, SUB11, SUB2
851 # flipped: TESTFN, SUB2, SUB1, SUB11
852 flipped = all[0][1][0] != "SUB1"
853 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200854 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100855 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
856 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
857 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
858 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000859
Victor Stinner0561c532015-03-12 10:28:24 +0100860 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000861 # Prune the search.
862 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100863 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000864 all.append((root, dirs, files))
865 # Don't descend into SUB1.
866 if 'SUB1' in dirs:
867 # Note that this also mutates the dirs we appended to all!
868 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000869
Victor Stinner0561c532015-03-12 10:28:24 +0100870 self.assertEqual(len(all), 2)
871 self.assertEqual(all[0],
872 (self.walk_path, ["SUB2"], ["tmp1"]))
873
874 all[1][-1].sort()
875 self.assertEqual(all[1], self.sub2_tree)
876
877 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000878 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100879 all = list(self.walk(self.walk_path, topdown=False))
880
Tim Petersc4e09402003-04-25 07:11:48 +0000881 self.assertEqual(len(all), 4)
882 # We can't know which order SUB1 and SUB2 will appear in.
883 # Not flipped: SUB11, SUB1, SUB2, TESTFN
884 # flipped: SUB2, SUB11, SUB1, TESTFN
885 flipped = all[3][1][0] != "SUB1"
886 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200887 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100888 self.assertEqual(all[3],
889 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
890 self.assertEqual(all[flipped],
891 (self.sub11_path, [], []))
892 self.assertEqual(all[flipped + 1],
893 (self.sub1_path, ["SUB11"], ["tmp2"]))
894 self.assertEqual(all[2 - 2 * flipped],
895 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000896
Victor Stinner0561c532015-03-12 10:28:24 +0100897 def test_walk_symlink(self):
898 if not support.can_symlink():
899 self.skipTest("need symlink support")
900
901 # Walk, following symlinks.
902 walk_it = self.walk(self.walk_path, follow_symlinks=True)
903 for root, dirs, files in walk_it:
904 if root == self.link_path:
905 self.assertEqual(dirs, [])
906 self.assertEqual(files, ["tmp4"])
907 break
908 else:
909 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000910
911 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000912 # Tear everything down. This is a decent use for bottom-up on
913 # Windows, which doesn't have a recursive delete command. The
914 # (not so) subtlety is that rmdir will fail unless the dir's
915 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000916 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000917 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000918 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000919 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000920 dirname = os.path.join(root, name)
921 if not os.path.islink(dirname):
922 os.rmdir(dirname)
923 else:
924 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000925 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000926
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200927 def test_walk_bad_dir(self):
928 # Walk top-down.
929 errors = []
930 walk_it = self.walk(self.walk_path, onerror=errors.append)
931 root, dirs, files = next(walk_it)
932 self.assertFalse(errors)
933 dir1 = dirs[0]
934 dir1new = dir1 + '.new'
935 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
936 roots = [r for r, d, f in walk_it]
937 self.assertTrue(errors)
938 self.assertNotIn(os.path.join(root, dir1), roots)
939 self.assertNotIn(os.path.join(root, dir1new), roots)
940 for dir2 in dirs[1:]:
941 self.assertIn(os.path.join(root, dir2), roots)
942
Charles-François Natali7372b062012-02-05 15:15:38 +0100943
944@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
945class FwalkTests(WalkTests):
946 """Tests for os.fwalk()."""
947
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200948 def walk(self, top, **kwargs):
949 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100950 yield (root, dirs, files)
951
Larry Hastingsc48fe982012-06-25 04:49:05 -0700952 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
953 """
954 compare with walk() results.
955 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700956 walk_kwargs = walk_kwargs.copy()
957 fwalk_kwargs = fwalk_kwargs.copy()
958 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
959 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
960 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700961
Charles-François Natali7372b062012-02-05 15:15:38 +0100962 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700963 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100964 expected[root] = (set(dirs), set(files))
965
Larry Hastingsc48fe982012-06-25 04:49:05 -0700966 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100967 self.assertIn(root, expected)
968 self.assertEqual(expected[root], (set(dirs), set(files)))
969
Larry Hastingsc48fe982012-06-25 04:49:05 -0700970 def test_compare_to_walk(self):
971 kwargs = {'top': support.TESTFN}
972 self._compare_to_walk(kwargs, kwargs)
973
Charles-François Natali7372b062012-02-05 15:15:38 +0100974 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700975 try:
976 fd = os.open(".", os.O_RDONLY)
977 walk_kwargs = {'top': support.TESTFN}
978 fwalk_kwargs = walk_kwargs.copy()
979 fwalk_kwargs['dir_fd'] = fd
980 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
981 finally:
982 os.close(fd)
983
984 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100985 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700986 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
987 args = support.TESTFN, topdown, None
988 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100989 # check that the FD is valid
990 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700991 # redundant check
992 os.stat(rootfd)
993 # check that listdir() returns consistent information
994 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100995
996 def test_fd_leak(self):
997 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
998 # we both check that calling fwalk() a large number of times doesn't
999 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1000 minfd = os.dup(1)
1001 os.close(minfd)
1002 for i in range(256):
1003 for x in os.fwalk(support.TESTFN):
1004 pass
1005 newfd = os.dup(1)
1006 self.addCleanup(os.close, newfd)
1007 self.assertEqual(newfd, minfd)
1008
1009 def tearDown(self):
1010 # cleanup
1011 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1012 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001013 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001014 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001015 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001016 if stat.S_ISDIR(st.st_mode):
1017 os.rmdir(name, dir_fd=rootfd)
1018 else:
1019 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001020 os.rmdir(support.TESTFN)
1021
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001022class BytesWalkTests(WalkTests):
1023 """Tests for os.walk() with bytes."""
1024 def walk(self, top, **kwargs):
1025 if 'follow_symlinks' in kwargs:
1026 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1027 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1028 root = os.fsdecode(broot)
1029 dirs = list(map(os.fsdecode, bdirs))
1030 files = list(map(os.fsdecode, bfiles))
1031 yield (root, dirs, files)
1032 bdirs[:] = list(map(os.fsencode, dirs))
1033 bfiles[:] = list(map(os.fsencode, files))
1034
Charles-François Natali7372b062012-02-05 15:15:38 +01001035
Guido van Rossume7ba4952007-06-06 23:52:48 +00001036class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001037 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001038 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001039
1040 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001041 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001042 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1043 os.makedirs(path) # Should work
1044 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1045 os.makedirs(path)
1046
1047 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001048 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001049 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1050 os.makedirs(path)
1051 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1052 'dir5', 'dir6')
1053 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001054
Terry Reedy5a22b652010-12-02 07:05:56 +00001055 def test_exist_ok_existing_directory(self):
1056 path = os.path.join(support.TESTFN, 'dir1')
1057 mode = 0o777
1058 old_mask = os.umask(0o022)
1059 os.makedirs(path, mode)
1060 self.assertRaises(OSError, os.makedirs, path, mode)
1061 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001062 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001063 os.makedirs(path, mode=mode, exist_ok=True)
1064 os.umask(old_mask)
1065
Martin Pantera82642f2015-11-19 04:48:44 +00001066 # Issue #25583: A drive root could raise PermissionError on Windows
1067 os.makedirs(os.path.abspath('/'), exist_ok=True)
1068
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001069 def test_exist_ok_s_isgid_directory(self):
1070 path = os.path.join(support.TESTFN, 'dir1')
1071 S_ISGID = stat.S_ISGID
1072 mode = 0o777
1073 old_mask = os.umask(0o022)
1074 try:
1075 existing_testfn_mode = stat.S_IMODE(
1076 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001077 try:
1078 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001079 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001080 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001081 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1082 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1083 # The os should apply S_ISGID from the parent dir for us, but
1084 # this test need not depend on that behavior. Be explicit.
1085 os.makedirs(path, mode | S_ISGID)
1086 # http://bugs.python.org/issue14992
1087 # Should not fail when the bit is already set.
1088 os.makedirs(path, mode, exist_ok=True)
1089 # remove the bit.
1090 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001091 # May work even when the bit is not already set when demanded.
1092 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001093 finally:
1094 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001095
1096 def test_exist_ok_existing_regular_file(self):
1097 base = support.TESTFN
1098 path = os.path.join(support.TESTFN, 'dir1')
1099 f = open(path, 'w')
1100 f.write('abc')
1101 f.close()
1102 self.assertRaises(OSError, os.makedirs, path)
1103 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1104 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1105 os.remove(path)
1106
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001107 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109 'dir4', 'dir5', 'dir6')
1110 # If the tests failed, the bottom-most directory ('../dir6')
1111 # may not have been created, so we look for the outermost directory
1112 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001113 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001114 path = os.path.dirname(path)
1115
1116 os.removedirs(path)
1117
Andrew Svetlov405faed2012-12-25 12:18:09 +02001118
R David Murrayf2ad1732014-12-25 18:36:56 -05001119@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1120class ChownFileTests(unittest.TestCase):
1121
Berker Peksag036a71b2015-07-21 09:29:48 +03001122 @classmethod
1123 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001124 os.mkdir(support.TESTFN)
1125
1126 def test_chown_uid_gid_arguments_must_be_index(self):
1127 stat = os.stat(support.TESTFN)
1128 uid = stat.st_uid
1129 gid = stat.st_gid
1130 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1131 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1132 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1133 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1134 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1135
1136 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1137 def test_chown(self):
1138 gid_1, gid_2 = groups[:2]
1139 uid = os.stat(support.TESTFN).st_uid
1140 os.chown(support.TESTFN, uid, gid_1)
1141 gid = os.stat(support.TESTFN).st_gid
1142 self.assertEqual(gid, gid_1)
1143 os.chown(support.TESTFN, uid, gid_2)
1144 gid = os.stat(support.TESTFN).st_gid
1145 self.assertEqual(gid, gid_2)
1146
1147 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1148 "test needs root privilege and more than one user")
1149 def test_chown_with_root(self):
1150 uid_1, uid_2 = all_users[:2]
1151 gid = os.stat(support.TESTFN).st_gid
1152 os.chown(support.TESTFN, uid_1, gid)
1153 uid = os.stat(support.TESTFN).st_uid
1154 self.assertEqual(uid, uid_1)
1155 os.chown(support.TESTFN, uid_2, gid)
1156 uid = os.stat(support.TESTFN).st_uid
1157 self.assertEqual(uid, uid_2)
1158
1159 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1160 "test needs non-root account and more than one user")
1161 def test_chown_without_permission(self):
1162 uid_1, uid_2 = all_users[:2]
1163 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001164 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001165 os.chown(support.TESTFN, uid_1, gid)
1166 os.chown(support.TESTFN, uid_2, gid)
1167
Berker Peksag036a71b2015-07-21 09:29:48 +03001168 @classmethod
1169 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001170 os.rmdir(support.TESTFN)
1171
1172
Andrew Svetlov405faed2012-12-25 12:18:09 +02001173class RemoveDirsTests(unittest.TestCase):
1174 def setUp(self):
1175 os.makedirs(support.TESTFN)
1176
1177 def tearDown(self):
1178 support.rmtree(support.TESTFN)
1179
1180 def test_remove_all(self):
1181 dira = os.path.join(support.TESTFN, 'dira')
1182 os.mkdir(dira)
1183 dirb = os.path.join(dira, 'dirb')
1184 os.mkdir(dirb)
1185 os.removedirs(dirb)
1186 self.assertFalse(os.path.exists(dirb))
1187 self.assertFalse(os.path.exists(dira))
1188 self.assertFalse(os.path.exists(support.TESTFN))
1189
1190 def test_remove_partial(self):
1191 dira = os.path.join(support.TESTFN, 'dira')
1192 os.mkdir(dira)
1193 dirb = os.path.join(dira, 'dirb')
1194 os.mkdir(dirb)
1195 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1196 f.write('text')
1197 os.removedirs(dirb)
1198 self.assertFalse(os.path.exists(dirb))
1199 self.assertTrue(os.path.exists(dira))
1200 self.assertTrue(os.path.exists(support.TESTFN))
1201
1202 def test_remove_nothing(self):
1203 dira = os.path.join(support.TESTFN, 'dira')
1204 os.mkdir(dira)
1205 dirb = os.path.join(dira, 'dirb')
1206 os.mkdir(dirb)
1207 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1208 f.write('text')
1209 with self.assertRaises(OSError):
1210 os.removedirs(dirb)
1211 self.assertTrue(os.path.exists(dirb))
1212 self.assertTrue(os.path.exists(dira))
1213 self.assertTrue(os.path.exists(support.TESTFN))
1214
1215
Guido van Rossume7ba4952007-06-06 23:52:48 +00001216class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001217 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001218 with open(os.devnull, 'wb') as f:
1219 f.write(b'hello')
1220 f.close()
1221 with open(os.devnull, 'rb') as f:
1222 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001223
Andrew Svetlov405faed2012-12-25 12:18:09 +02001224
Guido van Rossume7ba4952007-06-06 23:52:48 +00001225class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001226 def test_urandom_length(self):
1227 self.assertEqual(len(os.urandom(0)), 0)
1228 self.assertEqual(len(os.urandom(1)), 1)
1229 self.assertEqual(len(os.urandom(10)), 10)
1230 self.assertEqual(len(os.urandom(100)), 100)
1231 self.assertEqual(len(os.urandom(1000)), 1000)
1232
1233 def test_urandom_value(self):
1234 data1 = os.urandom(16)
1235 data2 = os.urandom(16)
1236 self.assertNotEqual(data1, data2)
1237
1238 def get_urandom_subprocess(self, count):
1239 code = '\n'.join((
1240 'import os, sys',
1241 'data = os.urandom(%s)' % count,
1242 'sys.stdout.buffer.write(data)',
1243 'sys.stdout.buffer.flush()'))
1244 out = assert_python_ok('-c', code)
1245 stdout = out[1]
1246 self.assertEqual(len(stdout), 16)
1247 return stdout
1248
1249 def test_urandom_subprocess(self):
1250 data1 = self.get_urandom_subprocess(16)
1251 data2 = self.get_urandom_subprocess(16)
1252 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001253
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001254
Victor Stinnerbae2d622015-10-01 09:47:30 +02001255# os.urandom() doesn't use a file descriptor when it is implemented with the
1256# getentropy() function, the getrandom() function or the getrandom() syscall
1257OS_URANDOM_DONT_USE_FD = (
1258 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1259 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1260 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001261
Victor Stinnerbae2d622015-10-01 09:47:30 +02001262@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1263 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001264class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001265 @unittest.skipUnless(resource, "test requires the resource module")
1266 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001267 # Check urandom() failing when it is not able to open /dev/random.
1268 # We spawn a new process to make the test more robust (if getrlimit()
1269 # failed to restore the file descriptor limit after this, the whole
1270 # test suite would crash; this actually happened on the OS X Tiger
1271 # buildbot).
1272 code = """if 1:
1273 import errno
1274 import os
1275 import resource
1276
1277 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1278 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1279 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001280 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001281 except OSError as e:
1282 assert e.errno == errno.EMFILE, e.errno
1283 else:
1284 raise AssertionError("OSError not raised")
1285 """
1286 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001287
Antoine Pitroue472aea2014-04-26 14:33:03 +02001288 def test_urandom_fd_closed(self):
1289 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1290 # closed.
1291 code = """if 1:
1292 import os
1293 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001294 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001295 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001296 with test.support.SuppressCrashReport():
1297 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001298 sys.stdout.buffer.write(os.urandom(4))
1299 """
1300 rc, out, err = assert_python_ok('-Sc', code)
1301
1302 def test_urandom_fd_reopened(self):
1303 # Issue #21207: urandom() should detect its fd to /dev/urandom
1304 # changed to something else, and reopen it.
1305 with open(support.TESTFN, 'wb') as f:
1306 f.write(b"x" * 256)
1307 self.addCleanup(os.unlink, support.TESTFN)
1308 code = """if 1:
1309 import os
1310 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001311 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001312 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001313 with test.support.SuppressCrashReport():
1314 for fd in range(3, 256):
1315 try:
1316 os.close(fd)
1317 except OSError:
1318 pass
1319 else:
1320 # Found the urandom fd (XXX hopefully)
1321 break
1322 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001323 with open({TESTFN!r}, 'rb') as f:
1324 os.dup2(f.fileno(), fd)
1325 sys.stdout.buffer.write(os.urandom(4))
1326 sys.stdout.buffer.write(os.urandom(4))
1327 """.format(TESTFN=support.TESTFN)
1328 rc, out, err = assert_python_ok('-Sc', code)
1329 self.assertEqual(len(out), 8)
1330 self.assertNotEqual(out[0:4], out[4:8])
1331 rc, out2, err2 = assert_python_ok('-Sc', code)
1332 self.assertEqual(len(out2), 8)
1333 self.assertNotEqual(out2, out)
1334
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001335
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001336@contextlib.contextmanager
1337def _execvpe_mockup(defpath=None):
1338 """
1339 Stubs out execv and execve functions when used as context manager.
1340 Records exec calls. The mock execv and execve functions always raise an
1341 exception as they would normally never return.
1342 """
1343 # A list of tuples containing (function name, first arg, args)
1344 # of calls to execv or execve that have been made.
1345 calls = []
1346
1347 def mock_execv(name, *args):
1348 calls.append(('execv', name, args))
1349 raise RuntimeError("execv called")
1350
1351 def mock_execve(name, *args):
1352 calls.append(('execve', name, args))
1353 raise OSError(errno.ENOTDIR, "execve called")
1354
1355 try:
1356 orig_execv = os.execv
1357 orig_execve = os.execve
1358 orig_defpath = os.defpath
1359 os.execv = mock_execv
1360 os.execve = mock_execve
1361 if defpath is not None:
1362 os.defpath = defpath
1363 yield calls
1364 finally:
1365 os.execv = orig_execv
1366 os.execve = orig_execve
1367 os.defpath = orig_defpath
1368
Guido van Rossume7ba4952007-06-06 23:52:48 +00001369class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001370 @unittest.skipIf(USING_LINUXTHREADS,
1371 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001372 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001373 self.assertRaises(OSError, os.execvpe, 'no such app-',
1374 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001375
Thomas Heller6790d602007-08-30 17:15:14 +00001376 def test_execvpe_with_bad_arglist(self):
1377 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1378
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001379 @unittest.skipUnless(hasattr(os, '_execvpe'),
1380 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001381 def _test_internal_execvpe(self, test_type):
1382 program_path = os.sep + 'absolutepath'
1383 if test_type is bytes:
1384 program = b'executable'
1385 fullpath = os.path.join(os.fsencode(program_path), program)
1386 native_fullpath = fullpath
1387 arguments = [b'progname', 'arg1', 'arg2']
1388 else:
1389 program = 'executable'
1390 arguments = ['progname', 'arg1', 'arg2']
1391 fullpath = os.path.join(program_path, program)
1392 if os.name != "nt":
1393 native_fullpath = os.fsencode(fullpath)
1394 else:
1395 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001396 env = {'spam': 'beans'}
1397
Victor Stinnerb745a742010-05-18 17:17:23 +00001398 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001399 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001400 self.assertRaises(RuntimeError,
1401 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001402 self.assertEqual(len(calls), 1)
1403 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1404
Victor Stinnerb745a742010-05-18 17:17:23 +00001405 # test os._execvpe() with a relative path:
1406 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001407 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001408 self.assertRaises(OSError,
1409 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001410 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001411 self.assertSequenceEqual(calls[0],
1412 ('execve', native_fullpath, (arguments, env)))
1413
1414 # test os._execvpe() with a relative path:
1415 # os.get_exec_path() reads the 'PATH' variable
1416 with _execvpe_mockup() as calls:
1417 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001418 if test_type is bytes:
1419 env_path[b'PATH'] = program_path
1420 else:
1421 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001422 self.assertRaises(OSError,
1423 os._execvpe, program, arguments, env=env_path)
1424 self.assertEqual(len(calls), 1)
1425 self.assertSequenceEqual(calls[0],
1426 ('execve', native_fullpath, (arguments, env_path)))
1427
1428 def test_internal_execvpe_str(self):
1429 self._test_internal_execvpe(str)
1430 if os.name != "nt":
1431 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001432
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001433
Serhiy Storchaka43767632013-11-03 21:31:38 +02001434@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001435class Win32ErrorTests(unittest.TestCase):
1436 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001437 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001438
1439 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001440 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001441
1442 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001443 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001444
1445 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001446 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001447 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001448 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001449 finally:
1450 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001451 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001452
1453 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001454 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001455
Thomas Wouters477c8d52006-05-27 19:21:47 +00001456 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001457 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001458
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001459class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001460 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001461 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1462 #singles.append("close")
1463 #We omit close because it doesn'r raise an exception on some platforms
1464 def get_single(f):
1465 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001466 if hasattr(os, f):
1467 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001468 return helper
1469 for f in singles:
1470 locals()["test_"+f] = get_single(f)
1471
Benjamin Peterson7522c742009-01-19 21:00:09 +00001472 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001473 try:
1474 f(support.make_bad_fd(), *args)
1475 except OSError as e:
1476 self.assertEqual(e.errno, errno.EBADF)
1477 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001478 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001479 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001480
Serhiy Storchaka43767632013-11-03 21:31:38 +02001481 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001482 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001483 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001484
Serhiy Storchaka43767632013-11-03 21:31:38 +02001485 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001486 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001487 fd = support.make_bad_fd()
1488 # Make sure none of the descriptors we are about to close are
1489 # currently valid (issue 6542).
1490 for i in range(10):
1491 try: os.fstat(fd+i)
1492 except OSError:
1493 pass
1494 else:
1495 break
1496 if i < 2:
1497 raise unittest.SkipTest(
1498 "Unable to acquire a range of invalid file descriptors")
1499 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001500
Serhiy Storchaka43767632013-11-03 21:31:38 +02001501 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001502 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001503 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001504
Serhiy Storchaka43767632013-11-03 21:31:38 +02001505 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001506 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001507 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001508
Serhiy Storchaka43767632013-11-03 21:31:38 +02001509 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001510 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001511 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001512
Serhiy Storchaka43767632013-11-03 21:31:38 +02001513 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001514 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001515 self.check(os.pathconf, "PC_NAME_MAX")
1516 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001517
Serhiy Storchaka43767632013-11-03 21:31:38 +02001518 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001519 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001520 self.check(os.truncate, 0)
1521 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001522
Serhiy Storchaka43767632013-11-03 21:31:38 +02001523 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001524 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001525 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001526
Serhiy Storchaka43767632013-11-03 21:31:38 +02001527 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001528 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001529 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001530
Victor Stinner57ddf782014-01-08 15:21:28 +01001531 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1532 def test_readv(self):
1533 buf = bytearray(10)
1534 self.check(os.readv, [buf])
1535
Serhiy Storchaka43767632013-11-03 21:31:38 +02001536 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001537 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001538 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001539
Serhiy Storchaka43767632013-11-03 21:31:38 +02001540 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001541 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001542 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001543
Victor Stinner57ddf782014-01-08 15:21:28 +01001544 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1545 def test_writev(self):
1546 self.check(os.writev, [b'abc'])
1547
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001548 def test_inheritable(self):
1549 self.check(os.get_inheritable)
1550 self.check(os.set_inheritable, True)
1551
1552 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1553 'needs os.get_blocking() and os.set_blocking()')
1554 def test_blocking(self):
1555 self.check(os.get_blocking)
1556 self.check(os.set_blocking, True)
1557
Brian Curtin1b9df392010-11-24 20:24:31 +00001558
1559class LinkTests(unittest.TestCase):
1560 def setUp(self):
1561 self.file1 = support.TESTFN
1562 self.file2 = os.path.join(support.TESTFN + "2")
1563
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001564 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001565 for file in (self.file1, self.file2):
1566 if os.path.exists(file):
1567 os.unlink(file)
1568
Brian Curtin1b9df392010-11-24 20:24:31 +00001569 def _test_link(self, file1, file2):
1570 with open(file1, "w") as f1:
1571 f1.write("test")
1572
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001573 with warnings.catch_warnings():
1574 warnings.simplefilter("ignore", DeprecationWarning)
1575 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001576 with open(file1, "r") as f1, open(file2, "r") as f2:
1577 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1578
1579 def test_link(self):
1580 self._test_link(self.file1, self.file2)
1581
1582 def test_link_bytes(self):
1583 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1584 bytes(self.file2, sys.getfilesystemencoding()))
1585
Brian Curtinf498b752010-11-30 15:54:04 +00001586 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001587 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001588 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001589 except UnicodeError:
1590 raise unittest.SkipTest("Unable to encode for this platform.")
1591
Brian Curtinf498b752010-11-30 15:54:04 +00001592 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001593 self.file2 = self.file1 + "2"
1594 self._test_link(self.file1, self.file2)
1595
Serhiy Storchaka43767632013-11-03 21:31:38 +02001596@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1597class PosixUidGidTests(unittest.TestCase):
1598 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1599 def test_setuid(self):
1600 if os.getuid() != 0:
1601 self.assertRaises(OSError, os.setuid, 0)
1602 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001603
Serhiy Storchaka43767632013-11-03 21:31:38 +02001604 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1605 def test_setgid(self):
1606 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1607 self.assertRaises(OSError, os.setgid, 0)
1608 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001609
Serhiy Storchaka43767632013-11-03 21:31:38 +02001610 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1611 def test_seteuid(self):
1612 if os.getuid() != 0:
1613 self.assertRaises(OSError, os.seteuid, 0)
1614 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001615
Serhiy Storchaka43767632013-11-03 21:31:38 +02001616 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1617 def test_setegid(self):
1618 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1619 self.assertRaises(OSError, os.setegid, 0)
1620 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001621
Serhiy Storchaka43767632013-11-03 21:31:38 +02001622 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1623 def test_setreuid(self):
1624 if os.getuid() != 0:
1625 self.assertRaises(OSError, os.setreuid, 0, 0)
1626 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1627 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001628
Serhiy Storchaka43767632013-11-03 21:31:38 +02001629 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1630 def test_setreuid_neg1(self):
1631 # Needs to accept -1. We run this in a subprocess to avoid
1632 # altering the test runner's process state (issue8045).
1633 subprocess.check_call([
1634 sys.executable, '-c',
1635 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001636
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1638 def test_setregid(self):
1639 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1640 self.assertRaises(OSError, os.setregid, 0, 0)
1641 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1642 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001643
Serhiy Storchaka43767632013-11-03 21:31:38 +02001644 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1645 def test_setregid_neg1(self):
1646 # Needs to accept -1. We run this in a subprocess to avoid
1647 # altering the test runner's process state (issue8045).
1648 subprocess.check_call([
1649 sys.executable, '-c',
1650 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001651
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1653class Pep383Tests(unittest.TestCase):
1654 def setUp(self):
1655 if support.TESTFN_UNENCODABLE:
1656 self.dir = support.TESTFN_UNENCODABLE
1657 elif support.TESTFN_NONASCII:
1658 self.dir = support.TESTFN_NONASCII
1659 else:
1660 self.dir = support.TESTFN
1661 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001662
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 bytesfn = []
1664 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001665 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001666 fn = os.fsencode(fn)
1667 except UnicodeEncodeError:
1668 return
1669 bytesfn.append(fn)
1670 add_filename(support.TESTFN_UNICODE)
1671 if support.TESTFN_UNENCODABLE:
1672 add_filename(support.TESTFN_UNENCODABLE)
1673 if support.TESTFN_NONASCII:
1674 add_filename(support.TESTFN_NONASCII)
1675 if not bytesfn:
1676 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001677
Serhiy Storchaka43767632013-11-03 21:31:38 +02001678 self.unicodefn = set()
1679 os.mkdir(self.dir)
1680 try:
1681 for fn in bytesfn:
1682 support.create_empty_file(os.path.join(self.bdir, fn))
1683 fn = os.fsdecode(fn)
1684 if fn in self.unicodefn:
1685 raise ValueError("duplicate filename")
1686 self.unicodefn.add(fn)
1687 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001688 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001689 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001690
Serhiy Storchaka43767632013-11-03 21:31:38 +02001691 def tearDown(self):
1692 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001693
Serhiy Storchaka43767632013-11-03 21:31:38 +02001694 def test_listdir(self):
1695 expected = self.unicodefn
1696 found = set(os.listdir(self.dir))
1697 self.assertEqual(found, expected)
1698 # test listdir without arguments
1699 current_directory = os.getcwd()
1700 try:
1701 os.chdir(os.sep)
1702 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1703 finally:
1704 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001705
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 def test_open(self):
1707 for fn in self.unicodefn:
1708 f = open(os.path.join(self.dir, fn), 'rb')
1709 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001710
Serhiy Storchaka43767632013-11-03 21:31:38 +02001711 @unittest.skipUnless(hasattr(os, 'statvfs'),
1712 "need os.statvfs()")
1713 def test_statvfs(self):
1714 # issue #9645
1715 for fn in self.unicodefn:
1716 # should not fail with file not found error
1717 fullname = os.path.join(self.dir, fn)
1718 os.statvfs(fullname)
1719
1720 def test_stat(self):
1721 for fn in self.unicodefn:
1722 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001723
Brian Curtineb24d742010-04-12 17:16:38 +00001724@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1725class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001726 def _kill(self, sig):
1727 # Start sys.executable as a subprocess and communicate from the
1728 # subprocess to the parent that the interpreter is ready. When it
1729 # becomes ready, send *sig* via os.kill to the subprocess and check
1730 # that the return code is equal to *sig*.
1731 import ctypes
1732 from ctypes import wintypes
1733 import msvcrt
1734
1735 # Since we can't access the contents of the process' stdout until the
1736 # process has exited, use PeekNamedPipe to see what's inside stdout
1737 # without waiting. This is done so we can tell that the interpreter
1738 # is started and running at a point where it could handle a signal.
1739 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1740 PeekNamedPipe.restype = wintypes.BOOL
1741 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1742 ctypes.POINTER(ctypes.c_char), # stdout buf
1743 wintypes.DWORD, # Buffer size
1744 ctypes.POINTER(wintypes.DWORD), # bytes read
1745 ctypes.POINTER(wintypes.DWORD), # bytes avail
1746 ctypes.POINTER(wintypes.DWORD)) # bytes left
1747 msg = "running"
1748 proc = subprocess.Popen([sys.executable, "-c",
1749 "import sys;"
1750 "sys.stdout.write('{}');"
1751 "sys.stdout.flush();"
1752 "input()".format(msg)],
1753 stdout=subprocess.PIPE,
1754 stderr=subprocess.PIPE,
1755 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001756 self.addCleanup(proc.stdout.close)
1757 self.addCleanup(proc.stderr.close)
1758 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001759
1760 count, max = 0, 100
1761 while count < max and proc.poll() is None:
1762 # Create a string buffer to store the result of stdout from the pipe
1763 buf = ctypes.create_string_buffer(len(msg))
1764 # Obtain the text currently in proc.stdout
1765 # Bytes read/avail/left are left as NULL and unused
1766 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1767 buf, ctypes.sizeof(buf), None, None, None)
1768 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1769 if buf.value:
1770 self.assertEqual(msg, buf.value.decode())
1771 break
1772 time.sleep(0.1)
1773 count += 1
1774 else:
1775 self.fail("Did not receive communication from the subprocess")
1776
Brian Curtineb24d742010-04-12 17:16:38 +00001777 os.kill(proc.pid, sig)
1778 self.assertEqual(proc.wait(), sig)
1779
1780 def test_kill_sigterm(self):
1781 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001782 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001783
1784 def test_kill_int(self):
1785 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001786 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001787
1788 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001789 tagname = "test_os_%s" % uuid.uuid1()
1790 m = mmap.mmap(-1, 1, tagname)
1791 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001792 # Run a script which has console control handling enabled.
1793 proc = subprocess.Popen([sys.executable,
1794 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001795 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001796 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1797 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001798 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001799 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001800 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001801 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001802 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001803 count += 1
1804 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001805 # Forcefully kill the process if we weren't able to signal it.
1806 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001807 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001808 os.kill(proc.pid, event)
1809 # proc.send_signal(event) could also be done here.
1810 # Allow time for the signal to be passed and the process to exit.
1811 time.sleep(0.5)
1812 if not proc.poll():
1813 # Forcefully kill the process if we weren't able to signal it.
1814 os.kill(proc.pid, signal.SIGINT)
1815 self.fail("subprocess did not stop on {}".format(name))
1816
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001817 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001818 def test_CTRL_C_EVENT(self):
1819 from ctypes import wintypes
1820 import ctypes
1821
1822 # Make a NULL value by creating a pointer with no argument.
1823 NULL = ctypes.POINTER(ctypes.c_int)()
1824 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1825 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1826 wintypes.BOOL)
1827 SetConsoleCtrlHandler.restype = wintypes.BOOL
1828
1829 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001830 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001831 # by subprocesses.
1832 SetConsoleCtrlHandler(NULL, 0)
1833
1834 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1835
1836 def test_CTRL_BREAK_EVENT(self):
1837 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1838
1839
Brian Curtind40e6f72010-07-08 21:39:08 +00001840@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001841class Win32ListdirTests(unittest.TestCase):
1842 """Test listdir on Windows."""
1843
1844 def setUp(self):
1845 self.created_paths = []
1846 for i in range(2):
1847 dir_name = 'SUB%d' % i
1848 dir_path = os.path.join(support.TESTFN, dir_name)
1849 file_name = 'FILE%d' % i
1850 file_path = os.path.join(support.TESTFN, file_name)
1851 os.makedirs(dir_path)
1852 with open(file_path, 'w') as f:
1853 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1854 self.created_paths.extend([dir_name, file_name])
1855 self.created_paths.sort()
1856
1857 def tearDown(self):
1858 shutil.rmtree(support.TESTFN)
1859
1860 def test_listdir_no_extended_path(self):
1861 """Test when the path is not an "extended" path."""
1862 # unicode
1863 self.assertEqual(
1864 sorted(os.listdir(support.TESTFN)),
1865 self.created_paths)
1866 # bytes
1867 self.assertEqual(
1868 sorted(os.listdir(os.fsencode(support.TESTFN))),
1869 [os.fsencode(path) for path in self.created_paths])
1870
1871 def test_listdir_extended_path(self):
1872 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001873 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001874 # unicode
1875 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1876 self.assertEqual(
1877 sorted(os.listdir(path)),
1878 self.created_paths)
1879 # bytes
1880 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1881 self.assertEqual(
1882 sorted(os.listdir(path)),
1883 [os.fsencode(path) for path in self.created_paths])
1884
1885
1886@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001887@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001888class Win32SymlinkTests(unittest.TestCase):
1889 filelink = 'filelinktest'
1890 filelink_target = os.path.abspath(__file__)
1891 dirlink = 'dirlinktest'
1892 dirlink_target = os.path.dirname(filelink_target)
1893 missing_link = 'missing link'
1894
1895 def setUp(self):
1896 assert os.path.exists(self.dirlink_target)
1897 assert os.path.exists(self.filelink_target)
1898 assert not os.path.exists(self.dirlink)
1899 assert not os.path.exists(self.filelink)
1900 assert not os.path.exists(self.missing_link)
1901
1902 def tearDown(self):
1903 if os.path.exists(self.filelink):
1904 os.remove(self.filelink)
1905 if os.path.exists(self.dirlink):
1906 os.rmdir(self.dirlink)
1907 if os.path.lexists(self.missing_link):
1908 os.remove(self.missing_link)
1909
1910 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001911 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001912 self.assertTrue(os.path.exists(self.dirlink))
1913 self.assertTrue(os.path.isdir(self.dirlink))
1914 self.assertTrue(os.path.islink(self.dirlink))
1915 self.check_stat(self.dirlink, self.dirlink_target)
1916
1917 def test_file_link(self):
1918 os.symlink(self.filelink_target, self.filelink)
1919 self.assertTrue(os.path.exists(self.filelink))
1920 self.assertTrue(os.path.isfile(self.filelink))
1921 self.assertTrue(os.path.islink(self.filelink))
1922 self.check_stat(self.filelink, self.filelink_target)
1923
1924 def _create_missing_dir_link(self):
1925 'Create a "directory" link to a non-existent target'
1926 linkname = self.missing_link
1927 if os.path.lexists(linkname):
1928 os.remove(linkname)
1929 target = r'c:\\target does not exist.29r3c740'
1930 assert not os.path.exists(target)
1931 target_is_dir = True
1932 os.symlink(target, linkname, target_is_dir)
1933
1934 def test_remove_directory_link_to_missing_target(self):
1935 self._create_missing_dir_link()
1936 # For compatibility with Unix, os.remove will check the
1937 # directory status and call RemoveDirectory if the symlink
1938 # was created with target_is_dir==True.
1939 os.remove(self.missing_link)
1940
1941 @unittest.skip("currently fails; consider for improvement")
1942 def test_isdir_on_directory_link_to_missing_target(self):
1943 self._create_missing_dir_link()
1944 # consider having isdir return true for directory links
1945 self.assertTrue(os.path.isdir(self.missing_link))
1946
1947 @unittest.skip("currently fails; consider for improvement")
1948 def test_rmdir_on_directory_link_to_missing_target(self):
1949 self._create_missing_dir_link()
1950 # consider allowing rmdir to remove directory links
1951 os.rmdir(self.missing_link)
1952
1953 def check_stat(self, link, target):
1954 self.assertEqual(os.stat(link), os.stat(target))
1955 self.assertNotEqual(os.lstat(link), os.stat(link))
1956
Brian Curtind25aef52011-06-13 15:16:04 -05001957 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001958 with warnings.catch_warnings():
1959 warnings.simplefilter("ignore", DeprecationWarning)
1960 self.assertEqual(os.stat(bytes_link), os.stat(target))
1961 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001962
1963 def test_12084(self):
1964 level1 = os.path.abspath(support.TESTFN)
1965 level2 = os.path.join(level1, "level2")
1966 level3 = os.path.join(level2, "level3")
1967 try:
1968 os.mkdir(level1)
1969 os.mkdir(level2)
1970 os.mkdir(level3)
1971
1972 file1 = os.path.abspath(os.path.join(level1, "file1"))
1973
1974 with open(file1, "w") as f:
1975 f.write("file1")
1976
1977 orig_dir = os.getcwd()
1978 try:
1979 os.chdir(level2)
1980 link = os.path.join(level2, "link")
1981 os.symlink(os.path.relpath(file1), "link")
1982 self.assertIn("link", os.listdir(os.getcwd()))
1983
1984 # Check os.stat calls from the same dir as the link
1985 self.assertEqual(os.stat(file1), os.stat("link"))
1986
1987 # Check os.stat calls from a dir below the link
1988 os.chdir(level1)
1989 self.assertEqual(os.stat(file1),
1990 os.stat(os.path.relpath(link)))
1991
1992 # Check os.stat calls from a dir above the link
1993 os.chdir(level3)
1994 self.assertEqual(os.stat(file1),
1995 os.stat(os.path.relpath(link)))
1996 finally:
1997 os.chdir(orig_dir)
1998 except OSError as err:
1999 self.fail(err)
2000 finally:
2001 os.remove(file1)
2002 shutil.rmtree(level1)
2003
Brian Curtind40e6f72010-07-08 21:39:08 +00002004
Tim Golden0321cf22014-05-05 19:46:17 +01002005@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2006class Win32JunctionTests(unittest.TestCase):
2007 junction = 'junctiontest'
2008 junction_target = os.path.dirname(os.path.abspath(__file__))
2009
2010 def setUp(self):
2011 assert os.path.exists(self.junction_target)
2012 assert not os.path.exists(self.junction)
2013
2014 def tearDown(self):
2015 if os.path.exists(self.junction):
2016 # os.rmdir delegates to Windows' RemoveDirectoryW,
2017 # which removes junction points safely.
2018 os.rmdir(self.junction)
2019
2020 def test_create_junction(self):
2021 _winapi.CreateJunction(self.junction_target, self.junction)
2022 self.assertTrue(os.path.exists(self.junction))
2023 self.assertTrue(os.path.isdir(self.junction))
2024
2025 # Junctions are not recognized as links.
2026 self.assertFalse(os.path.islink(self.junction))
2027
2028 def test_unlink_removes_junction(self):
2029 _winapi.CreateJunction(self.junction_target, self.junction)
2030 self.assertTrue(os.path.exists(self.junction))
2031
2032 os.unlink(self.junction)
2033 self.assertFalse(os.path.exists(self.junction))
2034
2035
Jason R. Coombs3a092862013-05-27 23:21:28 -04002036@support.skip_unless_symlink
2037class NonLocalSymlinkTests(unittest.TestCase):
2038
2039 def setUp(self):
2040 """
2041 Create this structure:
2042
2043 base
2044 \___ some_dir
2045 """
2046 os.makedirs('base/some_dir')
2047
2048 def tearDown(self):
2049 shutil.rmtree('base')
2050
2051 def test_directory_link_nonlocal(self):
2052 """
2053 The symlink target should resolve relative to the link, not relative
2054 to the current directory.
2055
2056 Then, link base/some_link -> base/some_dir and ensure that some_link
2057 is resolved as a directory.
2058
2059 In issue13772, it was discovered that directory detection failed if
2060 the symlink target was not specified relative to the current
2061 directory, which was a defect in the implementation.
2062 """
2063 src = os.path.join('base', 'some_link')
2064 os.symlink('some_dir', src)
2065 assert os.path.isdir(src)
2066
2067
Victor Stinnere8d51452010-08-19 01:05:19 +00002068class FSEncodingTests(unittest.TestCase):
2069 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2071 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002072
Victor Stinnere8d51452010-08-19 01:05:19 +00002073 def test_identity(self):
2074 # assert fsdecode(fsencode(x)) == x
2075 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2076 try:
2077 bytesfn = os.fsencode(fn)
2078 except UnicodeEncodeError:
2079 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002080 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002081
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002082
Brett Cannonefb00c02012-02-29 18:31:31 -05002083
2084class DeviceEncodingTests(unittest.TestCase):
2085
2086 def test_bad_fd(self):
2087 # Return None when an fd doesn't actually exist.
2088 self.assertIsNone(os.device_encoding(123456))
2089
Philip Jenveye308b7c2012-02-29 16:16:15 -08002090 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2091 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002092 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002093 def test_device_encoding(self):
2094 encoding = os.device_encoding(0)
2095 self.assertIsNotNone(encoding)
2096 self.assertTrue(codecs.lookup(encoding))
2097
2098
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002099class PidTests(unittest.TestCase):
2100 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2101 def test_getppid(self):
2102 p = subprocess.Popen([sys.executable, '-c',
2103 'import os; print(os.getppid())'],
2104 stdout=subprocess.PIPE)
2105 stdout, _ = p.communicate()
2106 # We are the parent of our subprocess
2107 self.assertEqual(int(stdout), os.getpid())
2108
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002109 def test_waitpid(self):
2110 args = [sys.executable, '-c', 'pass']
2111 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2112 status = os.waitpid(pid, 0)
2113 self.assertEqual(status, (pid, 0))
2114
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002115
Brian Curtin0151b8e2010-09-24 13:43:43 +00002116# The introduction of this TestCase caused at least two different errors on
2117# *nix buildbots. Temporarily skip this to let the buildbots move along.
2118@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002119@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2120class LoginTests(unittest.TestCase):
2121 def test_getlogin(self):
2122 user_name = os.getlogin()
2123 self.assertNotEqual(len(user_name), 0)
2124
2125
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002126@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2127 "needs os.getpriority and os.setpriority")
2128class ProgramPriorityTests(unittest.TestCase):
2129 """Tests for os.getpriority() and os.setpriority()."""
2130
2131 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002132
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002133 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2134 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2135 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002136 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2137 if base >= 19 and new_prio <= 19:
2138 raise unittest.SkipTest(
2139 "unable to reliably test setpriority at current nice level of %s" % base)
2140 else:
2141 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002142 finally:
2143 try:
2144 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2145 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002146 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002147 raise
2148
2149
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002150if threading is not None:
2151 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002152
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002153 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002154
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002155 def __init__(self, conn):
2156 asynchat.async_chat.__init__(self, conn)
2157 self.in_buffer = []
2158 self.closed = False
2159 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002160
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002161 def handle_read(self):
2162 data = self.recv(4096)
2163 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002164
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002165 def get_data(self):
2166 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002167
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002168 def handle_close(self):
2169 self.close()
2170 self.closed = True
2171
2172 def handle_error(self):
2173 raise
2174
2175 def __init__(self, address):
2176 threading.Thread.__init__(self)
2177 asyncore.dispatcher.__init__(self)
2178 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2179 self.bind(address)
2180 self.listen(5)
2181 self.host, self.port = self.socket.getsockname()[:2]
2182 self.handler_instance = None
2183 self._active = False
2184 self._active_lock = threading.Lock()
2185
2186 # --- public API
2187
2188 @property
2189 def running(self):
2190 return self._active
2191
2192 def start(self):
2193 assert not self.running
2194 self.__flag = threading.Event()
2195 threading.Thread.start(self)
2196 self.__flag.wait()
2197
2198 def stop(self):
2199 assert self.running
2200 self._active = False
2201 self.join()
2202
2203 def wait(self):
2204 # wait for handler connection to be closed, then stop the server
2205 while not getattr(self.handler_instance, "closed", False):
2206 time.sleep(0.001)
2207 self.stop()
2208
2209 # --- internals
2210
2211 def run(self):
2212 self._active = True
2213 self.__flag.set()
2214 while self._active and asyncore.socket_map:
2215 self._active_lock.acquire()
2216 asyncore.loop(timeout=0.001, count=1)
2217 self._active_lock.release()
2218 asyncore.close_all()
2219
2220 def handle_accept(self):
2221 conn, addr = self.accept()
2222 self.handler_instance = self.Handler(conn)
2223
2224 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002225 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002226 handle_read = handle_connect
2227
2228 def writable(self):
2229 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002230
2231 def handle_error(self):
2232 raise
2233
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002234
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002235@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002236@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2237class TestSendfile(unittest.TestCase):
2238
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002239 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002240 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002241 not sys.platform.startswith("solaris") and \
2242 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002243 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2244 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002245
2246 @classmethod
2247 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002248 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002249 with open(support.TESTFN, "wb") as f:
2250 f.write(cls.DATA)
2251
2252 @classmethod
2253 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002254 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002255 support.unlink(support.TESTFN)
2256
2257 def setUp(self):
2258 self.server = SendfileTestServer((support.HOST, 0))
2259 self.server.start()
2260 self.client = socket.socket()
2261 self.client.connect((self.server.host, self.server.port))
2262 self.client.settimeout(1)
2263 # synchronize by waiting for "220 ready" response
2264 self.client.recv(1024)
2265 self.sockno = self.client.fileno()
2266 self.file = open(support.TESTFN, 'rb')
2267 self.fileno = self.file.fileno()
2268
2269 def tearDown(self):
2270 self.file.close()
2271 self.client.close()
2272 if self.server.running:
2273 self.server.stop()
2274
2275 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2276 """A higher level wrapper representing how an application is
2277 supposed to use sendfile().
2278 """
2279 while 1:
2280 try:
2281 if self.SUPPORT_HEADERS_TRAILERS:
2282 return os.sendfile(sock, file, offset, nbytes, headers,
2283 trailers)
2284 else:
2285 return os.sendfile(sock, file, offset, nbytes)
2286 except OSError as err:
2287 if err.errno == errno.ECONNRESET:
2288 # disconnected
2289 raise
2290 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2291 # we have to retry send data
2292 continue
2293 else:
2294 raise
2295
2296 def test_send_whole_file(self):
2297 # normal send
2298 total_sent = 0
2299 offset = 0
2300 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002301 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002302 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2303 if sent == 0:
2304 break
2305 offset += sent
2306 total_sent += sent
2307 self.assertTrue(sent <= nbytes)
2308 self.assertEqual(offset, total_sent)
2309
2310 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002311 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002312 self.client.close()
2313 self.server.wait()
2314 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002315 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002316 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002317
2318 def test_send_at_certain_offset(self):
2319 # start sending a file at a certain offset
2320 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002321 offset = len(self.DATA) // 2
2322 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002323 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002324 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002325 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2326 if sent == 0:
2327 break
2328 offset += sent
2329 total_sent += sent
2330 self.assertTrue(sent <= nbytes)
2331
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002332 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002333 self.client.close()
2334 self.server.wait()
2335 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002336 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002337 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002338 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002339 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002340
2341 def test_offset_overflow(self):
2342 # specify an offset > file size
2343 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002344 try:
2345 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2346 except OSError as e:
2347 # Solaris can raise EINVAL if offset >= file length, ignore.
2348 if e.errno != errno.EINVAL:
2349 raise
2350 else:
2351 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002352 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002353 self.client.close()
2354 self.server.wait()
2355 data = self.server.handler_instance.get_data()
2356 self.assertEqual(data, b'')
2357
2358 def test_invalid_offset(self):
2359 with self.assertRaises(OSError) as cm:
2360 os.sendfile(self.sockno, self.fileno, -1, 4096)
2361 self.assertEqual(cm.exception.errno, errno.EINVAL)
2362
Martin Panterbf19d162015-09-09 01:01:13 +00002363 def test_keywords(self):
2364 # Keyword arguments should be supported
2365 os.sendfile(out=self.sockno, offset=0, count=4096,
2366 **{'in': self.fileno})
2367 if self.SUPPORT_HEADERS_TRAILERS:
2368 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002369 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002370
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002371 # --- headers / trailers tests
2372
Serhiy Storchaka43767632013-11-03 21:31:38 +02002373 @requires_headers_trailers
2374 def test_headers(self):
2375 total_sent = 0
2376 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2377 headers=[b"x" * 512])
2378 total_sent += sent
2379 offset = 4096
2380 nbytes = 4096
2381 while 1:
2382 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2383 offset, nbytes)
2384 if sent == 0:
2385 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002386 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002387 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002388
Serhiy Storchaka43767632013-11-03 21:31:38 +02002389 expected_data = b"x" * 512 + self.DATA
2390 self.assertEqual(total_sent, len(expected_data))
2391 self.client.close()
2392 self.server.wait()
2393 data = self.server.handler_instance.get_data()
2394 self.assertEqual(hash(data), hash(expected_data))
2395
2396 @requires_headers_trailers
2397 def test_trailers(self):
2398 TESTFN2 = support.TESTFN + "2"
2399 file_data = b"abcdef"
2400 with open(TESTFN2, 'wb') as f:
2401 f.write(file_data)
2402 with open(TESTFN2, 'rb')as f:
2403 self.addCleanup(os.remove, TESTFN2)
2404 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2405 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002406 self.client.close()
2407 self.server.wait()
2408 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002409 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002410
Serhiy Storchaka43767632013-11-03 21:31:38 +02002411 @requires_headers_trailers
2412 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2413 'test needs os.SF_NODISKIO')
2414 def test_flags(self):
2415 try:
2416 os.sendfile(self.sockno, self.fileno, 0, 4096,
2417 flags=os.SF_NODISKIO)
2418 except OSError as err:
2419 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2420 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002421
2422
Larry Hastings9cf065c2012-06-22 16:30:09 -07002423def supports_extended_attributes():
2424 if not hasattr(os, "setxattr"):
2425 return False
2426 try:
2427 with open(support.TESTFN, "wb") as fp:
2428 try:
2429 os.setxattr(fp.fileno(), b"user.test", b"")
2430 except OSError:
2431 return False
2432 finally:
2433 support.unlink(support.TESTFN)
2434 # Kernels < 2.6.39 don't respect setxattr flags.
2435 kernel_version = platform.release()
2436 m = re.match("2.6.(\d{1,2})", kernel_version)
2437 return m is None or int(m.group(1)) >= 39
2438
2439
2440@unittest.skipUnless(supports_extended_attributes(),
2441 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002442class ExtendedAttributeTests(unittest.TestCase):
2443
2444 def tearDown(self):
2445 support.unlink(support.TESTFN)
2446
Larry Hastings9cf065c2012-06-22 16:30:09 -07002447 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002448 fn = support.TESTFN
2449 open(fn, "wb").close()
2450 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002451 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002452 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002453 init_xattr = listxattr(fn)
2454 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002455 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002456 xattr = set(init_xattr)
2457 xattr.add("user.test")
2458 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002459 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2460 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2461 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002462 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002463 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002464 self.assertEqual(cm.exception.errno, errno.EEXIST)
2465 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002466 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002467 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002468 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002469 xattr.add("user.test2")
2470 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002471 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002472 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002473 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002474 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002475 xattr.remove("user.test")
2476 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002477 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2478 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2479 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2480 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002481 many = sorted("user.test{}".format(i) for i in range(100))
2482 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002483 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002484 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002485
Larry Hastings9cf065c2012-06-22 16:30:09 -07002486 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002487 def make_bytes(s):
2488 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002489 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002490 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002491 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002492
2493 def test_simple(self):
2494 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2495 os.listxattr)
2496
2497 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002498 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2499 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002500
2501 def test_fds(self):
2502 def getxattr(path, *args):
2503 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002504 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002505 def setxattr(path, *args):
2506 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002507 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002508 def removexattr(path, *args):
2509 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002510 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002511 def listxattr(path, *args):
2512 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002513 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002514 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2515
2516
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002517@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2518class Win32DeprecatedBytesAPI(unittest.TestCase):
2519 def test_deprecated(self):
2520 import nt
2521 filename = os.fsencode(support.TESTFN)
2522 with warnings.catch_warnings():
2523 warnings.simplefilter("error", DeprecationWarning)
2524 for func, *args in (
2525 (nt._getfullpathname, filename),
2526 (nt._isdir, filename),
2527 (os.access, filename, os.R_OK),
2528 (os.chdir, filename),
2529 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002530 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002531 (os.link, filename, filename),
2532 (os.listdir, filename),
2533 (os.lstat, filename),
2534 (os.mkdir, filename),
2535 (os.open, filename, os.O_RDONLY),
2536 (os.rename, filename, filename),
2537 (os.rmdir, filename),
2538 (os.startfile, filename),
2539 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002540 (os.unlink, filename),
2541 (os.utime, filename),
2542 ):
2543 self.assertRaises(DeprecationWarning, func, *args)
2544
Victor Stinner28216442011-11-16 00:34:44 +01002545 @support.skip_unless_symlink
2546 def test_symlink(self):
2547 filename = os.fsencode(support.TESTFN)
2548 with warnings.catch_warnings():
2549 warnings.simplefilter("error", DeprecationWarning)
2550 self.assertRaises(DeprecationWarning,
2551 os.symlink, filename, filename)
2552
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002553
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002554@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2555class TermsizeTests(unittest.TestCase):
2556 def test_does_not_crash(self):
2557 """Check if get_terminal_size() returns a meaningful value.
2558
2559 There's no easy portable way to actually check the size of the
2560 terminal, so let's check if it returns something sensible instead.
2561 """
2562 try:
2563 size = os.get_terminal_size()
2564 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002565 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002566 # Under win32 a generic OSError can be thrown if the
2567 # handle cannot be retrieved
2568 self.skipTest("failed to query terminal size")
2569 raise
2570
Antoine Pitroucfade362012-02-08 23:48:59 +01002571 self.assertGreaterEqual(size.columns, 0)
2572 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002573
2574 def test_stty_match(self):
2575 """Check if stty returns the same results
2576
2577 stty actually tests stdin, so get_terminal_size is invoked on
2578 stdin explicitly. If stty succeeded, then get_terminal_size()
2579 should work too.
2580 """
2581 try:
2582 size = subprocess.check_output(['stty', 'size']).decode().split()
2583 except (FileNotFoundError, subprocess.CalledProcessError):
2584 self.skipTest("stty invocation failed")
2585 expected = (int(size[1]), int(size[0])) # reversed order
2586
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002587 try:
2588 actual = os.get_terminal_size(sys.__stdin__.fileno())
2589 except OSError as e:
2590 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2591 # Under win32 a generic OSError can be thrown if the
2592 # handle cannot be retrieved
2593 self.skipTest("failed to query terminal size")
2594 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002595 self.assertEqual(expected, actual)
2596
2597
Victor Stinner292c8352012-10-30 02:17:38 +01002598class OSErrorTests(unittest.TestCase):
2599 def setUp(self):
2600 class Str(str):
2601 pass
2602
Victor Stinnerafe17062012-10-31 22:47:43 +01002603 self.bytes_filenames = []
2604 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002605 if support.TESTFN_UNENCODABLE is not None:
2606 decoded = support.TESTFN_UNENCODABLE
2607 else:
2608 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002609 self.unicode_filenames.append(decoded)
2610 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002611 if support.TESTFN_UNDECODABLE is not None:
2612 encoded = support.TESTFN_UNDECODABLE
2613 else:
2614 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002615 self.bytes_filenames.append(encoded)
2616 self.bytes_filenames.append(memoryview(encoded))
2617
2618 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002619
2620 def test_oserror_filename(self):
2621 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002622 (self.filenames, os.chdir,),
2623 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002624 (self.filenames, os.lstat,),
2625 (self.filenames, os.open, os.O_RDONLY),
2626 (self.filenames, os.rmdir,),
2627 (self.filenames, os.stat,),
2628 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002629 ]
2630 if sys.platform == "win32":
2631 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002632 (self.bytes_filenames, os.rename, b"dst"),
2633 (self.bytes_filenames, os.replace, b"dst"),
2634 (self.unicode_filenames, os.rename, "dst"),
2635 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002636 # Issue #16414: Don't test undecodable names with listdir()
2637 # because of a Windows bug.
2638 #
2639 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2640 # empty list (instead of failing), whereas os.listdir(b'\xff')
2641 # raises a FileNotFoundError. It looks like a Windows bug:
2642 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2643 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2644 # ERROR_PATH_NOT_FOUND (3).
2645 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002646 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002647 else:
2648 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002649 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002650 (self.filenames, os.rename, "dst"),
2651 (self.filenames, os.replace, "dst"),
2652 ))
2653 if hasattr(os, "chown"):
2654 funcs.append((self.filenames, os.chown, 0, 0))
2655 if hasattr(os, "lchown"):
2656 funcs.append((self.filenames, os.lchown, 0, 0))
2657 if hasattr(os, "truncate"):
2658 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002659 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002660 funcs.append((self.filenames, os.chflags, 0))
2661 if hasattr(os, "lchflags"):
2662 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002663 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002664 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002665 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002666 if sys.platform == "win32":
2667 funcs.append((self.bytes_filenames, os.link, b"dst"))
2668 funcs.append((self.unicode_filenames, os.link, "dst"))
2669 else:
2670 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002671 if hasattr(os, "listxattr"):
2672 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002673 (self.filenames, os.listxattr,),
2674 (self.filenames, os.getxattr, "user.test"),
2675 (self.filenames, os.setxattr, "user.test", b'user'),
2676 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002677 ))
2678 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002679 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002680 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002681 if sys.platform == "win32":
2682 funcs.append((self.unicode_filenames, os.readlink,))
2683 else:
2684 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002685
Victor Stinnerafe17062012-10-31 22:47:43 +01002686 for filenames, func, *func_args in funcs:
2687 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002688 try:
2689 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002690 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002691 self.assertIs(err.filename, name)
2692 else:
2693 self.fail("No exception thrown by {}".format(func))
2694
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002695class CPUCountTests(unittest.TestCase):
2696 def test_cpu_count(self):
2697 cpus = os.cpu_count()
2698 if cpus is not None:
2699 self.assertIsInstance(cpus, int)
2700 self.assertGreater(cpus, 0)
2701 else:
2702 self.skipTest("Could not determine the number of CPUs")
2703
Victor Stinnerdaf45552013-08-28 00:53:59 +02002704
2705class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002706 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002707 fd = os.open(__file__, os.O_RDONLY)
2708 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002709 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002710
Victor Stinnerdaf45552013-08-28 00:53:59 +02002711 os.set_inheritable(fd, True)
2712 self.assertEqual(os.get_inheritable(fd), True)
2713
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002714 @unittest.skipIf(fcntl is None, "need fcntl")
2715 def test_get_inheritable_cloexec(self):
2716 fd = os.open(__file__, os.O_RDONLY)
2717 self.addCleanup(os.close, fd)
2718 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002719
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002720 # clear FD_CLOEXEC flag
2721 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2722 flags &= ~fcntl.FD_CLOEXEC
2723 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002724
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002725 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002726
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002727 @unittest.skipIf(fcntl is None, "need fcntl")
2728 def test_set_inheritable_cloexec(self):
2729 fd = os.open(__file__, os.O_RDONLY)
2730 self.addCleanup(os.close, fd)
2731 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2732 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002733
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002734 os.set_inheritable(fd, True)
2735 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2736 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002737
Victor Stinnerdaf45552013-08-28 00:53:59 +02002738 def test_open(self):
2739 fd = os.open(__file__, os.O_RDONLY)
2740 self.addCleanup(os.close, fd)
2741 self.assertEqual(os.get_inheritable(fd), False)
2742
2743 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2744 def test_pipe(self):
2745 rfd, wfd = os.pipe()
2746 self.addCleanup(os.close, rfd)
2747 self.addCleanup(os.close, wfd)
2748 self.assertEqual(os.get_inheritable(rfd), False)
2749 self.assertEqual(os.get_inheritable(wfd), False)
2750
2751 def test_dup(self):
2752 fd1 = os.open(__file__, os.O_RDONLY)
2753 self.addCleanup(os.close, fd1)
2754
2755 fd2 = os.dup(fd1)
2756 self.addCleanup(os.close, fd2)
2757 self.assertEqual(os.get_inheritable(fd2), False)
2758
2759 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2760 def test_dup2(self):
2761 fd = os.open(__file__, os.O_RDONLY)
2762 self.addCleanup(os.close, fd)
2763
2764 # inheritable by default
2765 fd2 = os.open(__file__, os.O_RDONLY)
2766 try:
2767 os.dup2(fd, fd2)
2768 self.assertEqual(os.get_inheritable(fd2), True)
2769 finally:
2770 os.close(fd2)
2771
2772 # force non-inheritable
2773 fd3 = os.open(__file__, os.O_RDONLY)
2774 try:
2775 os.dup2(fd, fd3, inheritable=False)
2776 self.assertEqual(os.get_inheritable(fd3), False)
2777 finally:
2778 os.close(fd3)
2779
2780 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2781 def test_openpty(self):
2782 master_fd, slave_fd = os.openpty()
2783 self.addCleanup(os.close, master_fd)
2784 self.addCleanup(os.close, slave_fd)
2785 self.assertEqual(os.get_inheritable(master_fd), False)
2786 self.assertEqual(os.get_inheritable(slave_fd), False)
2787
2788
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002789@unittest.skipUnless(hasattr(os, 'get_blocking'),
2790 'needs os.get_blocking() and os.set_blocking()')
2791class BlockingTests(unittest.TestCase):
2792 def test_blocking(self):
2793 fd = os.open(__file__, os.O_RDONLY)
2794 self.addCleanup(os.close, fd)
2795 self.assertEqual(os.get_blocking(fd), True)
2796
2797 os.set_blocking(fd, False)
2798 self.assertEqual(os.get_blocking(fd), False)
2799
2800 os.set_blocking(fd, True)
2801 self.assertEqual(os.get_blocking(fd), True)
2802
2803
Yury Selivanov97e2e062014-09-26 12:33:06 -04002804
2805class ExportsTests(unittest.TestCase):
2806 def test_os_all(self):
2807 self.assertIn('open', os.__all__)
2808 self.assertIn('walk', os.__all__)
2809
2810
Victor Stinner6036e442015-03-08 01:58:04 +01002811class TestScandir(unittest.TestCase):
2812 def setUp(self):
2813 self.path = os.path.realpath(support.TESTFN)
2814 self.addCleanup(support.rmtree, self.path)
2815 os.mkdir(self.path)
2816
2817 def create_file(self, name="file.txt"):
2818 filename = os.path.join(self.path, name)
2819 with open(filename, "wb") as fp:
2820 fp.write(b'python')
2821 return filename
2822
2823 def get_entries(self, names):
2824 entries = dict((entry.name, entry)
2825 for entry in os.scandir(self.path))
2826 self.assertEqual(sorted(entries.keys()), names)
2827 return entries
2828
2829 def assert_stat_equal(self, stat1, stat2, skip_fields):
2830 if skip_fields:
2831 for attr in dir(stat1):
2832 if not attr.startswith("st_"):
2833 continue
2834 if attr in ("st_dev", "st_ino", "st_nlink"):
2835 continue
2836 self.assertEqual(getattr(stat1, attr),
2837 getattr(stat2, attr),
2838 (stat1, stat2, attr))
2839 else:
2840 self.assertEqual(stat1, stat2)
2841
2842 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2843 self.assertEqual(entry.name, name)
2844 self.assertEqual(entry.path, os.path.join(self.path, name))
2845 self.assertEqual(entry.inode(),
2846 os.stat(entry.path, follow_symlinks=False).st_ino)
2847
2848 entry_stat = os.stat(entry.path)
2849 self.assertEqual(entry.is_dir(),
2850 stat.S_ISDIR(entry_stat.st_mode))
2851 self.assertEqual(entry.is_file(),
2852 stat.S_ISREG(entry_stat.st_mode))
2853 self.assertEqual(entry.is_symlink(),
2854 os.path.islink(entry.path))
2855
2856 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2857 self.assertEqual(entry.is_dir(follow_symlinks=False),
2858 stat.S_ISDIR(entry_lstat.st_mode))
2859 self.assertEqual(entry.is_file(follow_symlinks=False),
2860 stat.S_ISREG(entry_lstat.st_mode))
2861
2862 self.assert_stat_equal(entry.stat(),
2863 entry_stat,
2864 os.name == 'nt' and not is_symlink)
2865 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2866 entry_lstat,
2867 os.name == 'nt')
2868
2869 def test_attributes(self):
2870 link = hasattr(os, 'link')
2871 symlink = support.can_symlink()
2872
2873 dirname = os.path.join(self.path, "dir")
2874 os.mkdir(dirname)
2875 filename = self.create_file("file.txt")
2876 if link:
2877 os.link(filename, os.path.join(self.path, "link_file.txt"))
2878 if symlink:
2879 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2880 target_is_directory=True)
2881 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2882
2883 names = ['dir', 'file.txt']
2884 if link:
2885 names.append('link_file.txt')
2886 if symlink:
2887 names.extend(('symlink_dir', 'symlink_file.txt'))
2888 entries = self.get_entries(names)
2889
2890 entry = entries['dir']
2891 self.check_entry(entry, 'dir', True, False, False)
2892
2893 entry = entries['file.txt']
2894 self.check_entry(entry, 'file.txt', False, True, False)
2895
2896 if link:
2897 entry = entries['link_file.txt']
2898 self.check_entry(entry, 'link_file.txt', False, True, False)
2899
2900 if symlink:
2901 entry = entries['symlink_dir']
2902 self.check_entry(entry, 'symlink_dir', True, False, True)
2903
2904 entry = entries['symlink_file.txt']
2905 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2906
2907 def get_entry(self, name):
2908 entries = list(os.scandir(self.path))
2909 self.assertEqual(len(entries), 1)
2910
2911 entry = entries[0]
2912 self.assertEqual(entry.name, name)
2913 return entry
2914
2915 def create_file_entry(self):
2916 filename = self.create_file()
2917 return self.get_entry(os.path.basename(filename))
2918
2919 def test_current_directory(self):
2920 filename = self.create_file()
2921 old_dir = os.getcwd()
2922 try:
2923 os.chdir(self.path)
2924
2925 # call scandir() without parameter: it must list the content
2926 # of the current directory
2927 entries = dict((entry.name, entry) for entry in os.scandir())
2928 self.assertEqual(sorted(entries.keys()),
2929 [os.path.basename(filename)])
2930 finally:
2931 os.chdir(old_dir)
2932
2933 def test_repr(self):
2934 entry = self.create_file_entry()
2935 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2936
2937 def test_removed_dir(self):
2938 path = os.path.join(self.path, 'dir')
2939
2940 os.mkdir(path)
2941 entry = self.get_entry('dir')
2942 os.rmdir(path)
2943
2944 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2945 if os.name == 'nt':
2946 self.assertTrue(entry.is_dir())
2947 self.assertFalse(entry.is_file())
2948 self.assertFalse(entry.is_symlink())
2949 if os.name == 'nt':
2950 self.assertRaises(FileNotFoundError, entry.inode)
2951 # don't fail
2952 entry.stat()
2953 entry.stat(follow_symlinks=False)
2954 else:
2955 self.assertGreater(entry.inode(), 0)
2956 self.assertRaises(FileNotFoundError, entry.stat)
2957 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2958
2959 def test_removed_file(self):
2960 entry = self.create_file_entry()
2961 os.unlink(entry.path)
2962
2963 self.assertFalse(entry.is_dir())
2964 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2965 if os.name == 'nt':
2966 self.assertTrue(entry.is_file())
2967 self.assertFalse(entry.is_symlink())
2968 if os.name == 'nt':
2969 self.assertRaises(FileNotFoundError, entry.inode)
2970 # don't fail
2971 entry.stat()
2972 entry.stat(follow_symlinks=False)
2973 else:
2974 self.assertGreater(entry.inode(), 0)
2975 self.assertRaises(FileNotFoundError, entry.stat)
2976 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2977
2978 def test_broken_symlink(self):
2979 if not support.can_symlink():
2980 return self.skipTest('cannot create symbolic link')
2981
2982 filename = self.create_file("file.txt")
2983 os.symlink(filename,
2984 os.path.join(self.path, "symlink.txt"))
2985 entries = self.get_entries(['file.txt', 'symlink.txt'])
2986 entry = entries['symlink.txt']
2987 os.unlink(filename)
2988
2989 self.assertGreater(entry.inode(), 0)
2990 self.assertFalse(entry.is_dir())
2991 self.assertFalse(entry.is_file()) # broken symlink returns False
2992 self.assertFalse(entry.is_dir(follow_symlinks=False))
2993 self.assertFalse(entry.is_file(follow_symlinks=False))
2994 self.assertTrue(entry.is_symlink())
2995 self.assertRaises(FileNotFoundError, entry.stat)
2996 # don't fail
2997 entry.stat(follow_symlinks=False)
2998
2999 def test_bytes(self):
3000 if os.name == "nt":
3001 # On Windows, os.scandir(bytes) must raise an exception
3002 self.assertRaises(TypeError, os.scandir, b'.')
3003 return
3004
3005 self.create_file("file.txt")
3006
3007 path_bytes = os.fsencode(self.path)
3008 entries = list(os.scandir(path_bytes))
3009 self.assertEqual(len(entries), 1, entries)
3010 entry = entries[0]
3011
3012 self.assertEqual(entry.name, b'file.txt')
3013 self.assertEqual(entry.path,
3014 os.fsencode(os.path.join(self.path, 'file.txt')))
3015
3016 def test_empty_path(self):
3017 self.assertRaises(FileNotFoundError, os.scandir, '')
3018
3019 def test_consume_iterator_twice(self):
3020 self.create_file("file.txt")
3021 iterator = os.scandir(self.path)
3022
3023 entries = list(iterator)
3024 self.assertEqual(len(entries), 1, entries)
3025
3026 # check than consuming the iterator twice doesn't raise exception
3027 entries2 = list(iterator)
3028 self.assertEqual(len(entries2), 0, entries2)
3029
3030 def test_bad_path_type(self):
3031 for obj in [1234, 1.234, {}, []]:
3032 self.assertRaises(TypeError, os.scandir, obj)
3033
3034
Fred Drake2e2be372001-09-20 21:33:42 +00003035if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003036 unittest.main()